Files
dcrouter/ts/remoteingress/classes.tunnel-manager.ts

193 lines
5.6 KiB
TypeScript

import * as plugins from '../plugins.js';
import type { IRemoteIngressStatus } from '../../ts_interfaces/data/remoteingress.js';
import type { RemoteIngressManager } from './classes.remoteingress-manager.js';
export interface ITunnelManagerConfig {
tunnelPort?: number;
targetHost?: string;
tls?: {
certPem?: string;
keyPem?: string;
};
}
/**
* Manages the RemoteIngressHub instance and tracks connected edge statuses.
*/
export class TunnelManager {
private hub: InstanceType<typeof plugins.remoteingress.RemoteIngressHub>;
private manager: RemoteIngressManager;
private config: ITunnelManagerConfig;
private edgeStatuses: Map<string, IRemoteIngressStatus> = new Map();
private reconcileInterval: ReturnType<typeof setInterval> | null = null;
constructor(manager: RemoteIngressManager, config: ITunnelManagerConfig = {}) {
this.manager = manager;
this.config = config;
this.hub = new plugins.remoteingress.RemoteIngressHub();
// Listen for edge connect/disconnect events
this.hub.on('edgeConnected', (data: { edgeId: string; peerAddr: string }) => {
this.edgeStatuses.set(data.edgeId, {
edgeId: data.edgeId,
connected: true,
publicIp: data.peerAddr || null,
activeTunnels: 0,
lastHeartbeat: Date.now(),
connectedAt: Date.now(),
});
});
this.hub.on('edgeDisconnected', (data: { edgeId: string }) => {
this.edgeStatuses.delete(data.edgeId);
});
this.hub.on('streamOpened', (data: { edgeId: string; streamId: number }) => {
const existing = this.edgeStatuses.get(data.edgeId);
if (existing) {
existing.activeTunnels++;
existing.lastHeartbeat = Date.now();
}
});
this.hub.on('streamClosed', (data: { edgeId: string; streamId: number }) => {
const existing = this.edgeStatuses.get(data.edgeId);
if (existing && existing.activeTunnels > 0) {
existing.activeTunnels--;
}
});
}
/**
* Start the tunnel hub and load allowed edges.
*/
public async start(): Promise<void> {
await this.hub.start({
tunnelPort: this.config.tunnelPort ?? 8443,
targetHost: this.config.targetHost ?? '127.0.0.1',
tls: this.config.tls,
});
// Send allowed edges to the hub
await this.syncAllowedEdges();
// Periodically reconcile with authoritative Rust hub status
this.reconcileInterval = setInterval(() => {
this.reconcile().catch(() => {});
}, 15_000);
}
/**
* Stop the tunnel hub.
*/
public async stop(): Promise<void> {
if (this.reconcileInterval) {
clearInterval(this.reconcileInterval);
this.reconcileInterval = null;
}
// Remove event listeners before stopping to prevent leaks
this.hub.removeAllListeners();
await this.hub.stop();
this.edgeStatuses.clear();
}
/**
* Reconcile TS-side edge statuses with the authoritative Rust hub status.
* Overwrites event-derived activeTunnels with the real activeStreams count.
*/
private async reconcile(): Promise<void> {
const hubStatus = await this.hub.getStatus();
if (!hubStatus || !hubStatus.connectedEdges) return;
const rustEdgeIds = new Set<string>();
for (const rustEdge of hubStatus.connectedEdges) {
rustEdgeIds.add(rustEdge.edgeId);
const existing = this.edgeStatuses.get(rustEdge.edgeId);
if (existing) {
existing.activeTunnels = rustEdge.activeStreams;
existing.lastHeartbeat = Date.now();
// Update peer address if available from Rust hub
if (rustEdge.peerAddr) {
existing.publicIp = rustEdge.peerAddr;
}
} else {
// Missed edgeConnected event — add entry
this.edgeStatuses.set(rustEdge.edgeId, {
edgeId: rustEdge.edgeId,
connected: true,
publicIp: rustEdge.peerAddr || null,
activeTunnels: rustEdge.activeStreams,
lastHeartbeat: Date.now(),
connectedAt: rustEdge.connectedAt * 1000,
});
}
}
// Remove entries for edges no longer connected in Rust (missed edgeDisconnected)
for (const edgeId of this.edgeStatuses.keys()) {
if (!rustEdgeIds.has(edgeId)) {
this.edgeStatuses.delete(edgeId);
}
}
}
/**
* Sync allowed edges from the manager to the hub.
* Call this after creating/deleting/updating edges.
*/
public async syncAllowedEdges(): Promise<void> {
const edges = this.manager.getAllowedEdges();
await this.hub.updateAllowedEdges(edges);
}
/**
* Get runtime statuses for all known edges.
*/
public getEdgeStatuses(): IRemoteIngressStatus[] {
return Array.from(this.edgeStatuses.values());
}
/**
* Get status for a specific edge.
*/
public getEdgeStatus(edgeId: string): IRemoteIngressStatus | undefined {
return this.edgeStatuses.get(edgeId);
}
/**
* Get the count of connected edges.
*/
public getConnectedCount(): number {
let count = 0;
for (const status of this.edgeStatuses.values()) {
if (status.connected) count++;
}
return count;
}
/**
* Get the public IPs of all connected edges.
*/
public getConnectedEdgeIps(): string[] {
const ips: string[] = [];
for (const status of this.edgeStatuses.values()) {
if (status.connected && status.publicIp) {
ips.push(status.publicIp);
}
}
return ips;
}
/**
* Get the total number of active tunnels across all edges.
*/
public getTotalActiveTunnels(): number {
let total = 0;
for (const status of this.edgeStatuses.values()) {
total += status.activeTunnels;
}
return total;
}
}