import * as plugins from '../plugins.js'; import type { IRemoteIngressStatus } from '../../ts_interfaces/data/remoteingress.js'; import type { RemoteIngressManager } from './classes.remoteingress-manager.js'; export interface ITunnelManagerConfig { tunnelPort?: number; targetHost?: string; tls?: { certPem?: string; keyPem?: string; }; performance?: import('../../ts_interfaces/data/remoteingress.js').IRemoteIngressPerformanceConfig; } /** * Manages the RemoteIngressHub instance and tracks connected edge statuses. */ export class TunnelManager { private hub: InstanceType; private manager: RemoteIngressManager; private config: ITunnelManagerConfig; private edgeStatuses: Map = new Map(); private reconcileInterval: ReturnType | null = null; private syncChain: Promise = Promise.resolve(); constructor(manager: RemoteIngressManager, config: ITunnelManagerConfig = {}) { this.manager = manager; this.config = config; this.hub = new plugins.remoteingress.RemoteIngressHub(); // Listen for edge connect/disconnect events this.hub.on('edgeConnected', (data: { edgeId: string; peerAddr: string }) => { this.edgeStatuses.set(data.edgeId, { edgeId: data.edgeId, connected: true, publicIp: data.peerAddr || null, activeTunnels: 0, lastHeartbeat: Date.now(), connectedAt: Date.now(), }); }); this.hub.on('edgeDisconnected', (data: { edgeId: string }) => { this.edgeStatuses.delete(data.edgeId); }); this.hub.on('streamOpened', (data: { edgeId: string; streamId: number }) => { const existing = this.edgeStatuses.get(data.edgeId); if (existing) { existing.activeTunnels++; existing.lastHeartbeat = Date.now(); } }); this.hub.on('streamClosed', (data: { edgeId: string; streamId: number }) => { const existing = this.edgeStatuses.get(data.edgeId); if (existing && existing.activeTunnels > 0) { existing.activeTunnels--; } }); } /** * Start the tunnel hub and load allowed edges. */ public async start(): Promise { await this.hub.start({ tunnelPort: this.config.tunnelPort ?? 8443, targetHost: this.config.targetHost ?? '127.0.0.1', tls: this.config.tls, ...(this.config.performance ? { performance: this.config.performance } : {}), } as any); // Send allowed edges to the hub await this.syncAllowedEdges(); // Periodically reconcile with authoritative Rust hub status this.reconcileInterval = setInterval(() => { this.reconcile().catch(() => {}); }, 15_000); } /** * Stop the tunnel hub. */ public async stop(): Promise { if (this.reconcileInterval) { clearInterval(this.reconcileInterval); this.reconcileInterval = null; } // Remove event listeners before stopping to prevent leaks this.hub.removeAllListeners(); await this.hub.stop(); this.edgeStatuses.clear(); } /** * Reconcile TS-side edge statuses with the authoritative Rust hub status. * Overwrites event-derived activeTunnels with the real activeStreams count. */ private async reconcile(): Promise { const hubStatus = await this.hub.getStatus(); if (!hubStatus || !hubStatus.connectedEdges) return; const rustEdgeIds = new Set(); for (const rustEdge of hubStatus.connectedEdges) { rustEdgeIds.add(rustEdge.edgeId); const existing = this.edgeStatuses.get(rustEdge.edgeId); if (existing) { existing.activeTunnels = rustEdge.activeStreams; existing.lastHeartbeat = Date.now(); this.applyRustStatus(existing, rustEdge); // Update peer address if available from Rust hub if (rustEdge.peerAddr) { existing.publicIp = rustEdge.peerAddr; } } else { // Missed edgeConnected event — add entry const status: IRemoteIngressStatus = { edgeId: rustEdge.edgeId, connected: true, publicIp: rustEdge.peerAddr || null, activeTunnels: rustEdge.activeStreams, lastHeartbeat: Date.now(), connectedAt: rustEdge.connectedAt * 1000, }; this.applyRustStatus(status, rustEdge); this.edgeStatuses.set(rustEdge.edgeId, status); } } // Remove entries for edges no longer connected in Rust (missed edgeDisconnected) for (const edgeId of this.edgeStatuses.keys()) { if (!rustEdgeIds.has(edgeId)) { this.edgeStatuses.delete(edgeId); } } } /** * Sync allowed edges from the manager to the hub. * Call this after creating/deleting/updating edges. */ public async syncAllowedEdges(): Promise { const run = this.syncChain.catch(() => {}).then(async () => { const edges = this.manager.getAllowedEdges(); await this.hub.updateAllowedEdges(edges as any); }); this.syncChain = run; await run; } private applyRustStatus(status: IRemoteIngressStatus, rustEdge: any): void { status.transportMode = rustEdge.transportMode; status.fallbackUsed = rustEdge.fallbackUsed; status.performance = rustEdge.performance; status.flowControl = rustEdge.flowControl; status.queues = rustEdge.queues; status.traffic = rustEdge.traffic; status.udp = rustEdge.udp; } /** * Get runtime statuses for all known edges. */ public getEdgeStatuses(): IRemoteIngressStatus[] { return Array.from(this.edgeStatuses.values()); } /** * Get status for a specific edge. */ public getEdgeStatus(edgeId: string): IRemoteIngressStatus | undefined { return this.edgeStatuses.get(edgeId); } /** * Get the count of connected edges. */ public getConnectedCount(): number { let count = 0; for (const status of this.edgeStatuses.values()) { if (status.connected) count++; } return count; } /** * Get the public IPs of all connected edges. */ public getConnectedEdgeIps(): string[] { const ips: string[] = []; for (const status of this.edgeStatuses.values()) { if (status.connected && status.publicIp) { ips.push(status.publicIp); } } return ips; } /** * Get the total number of active tunnels across all edges. */ public getTotalActiveTunnels(): number { let total = 0; for (const status of this.edgeStatuses.values()) { total += status.activeTunnels; } return total; } }