172 lines
5.1 KiB
TypeScript
172 lines
5.1 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { IRemoteIngressStatus } from '../../ts_interfaces/data/remoteingress.js';
|
|
import type { RemoteIngressManager } from './classes.remoteingress-manager.js';
|
|
|
|
export interface ITunnelManagerConfig {
|
|
tunnelPort?: number;
|
|
targetHost?: string;
|
|
}
|
|
|
|
/**
|
|
* Manages the RemoteIngressHub instance and tracks connected edge statuses.
|
|
*/
|
|
export class TunnelManager {
|
|
private hub: InstanceType<typeof plugins.remoteingress.RemoteIngressHub>;
|
|
private manager: RemoteIngressManager;
|
|
private config: ITunnelManagerConfig;
|
|
private edgeStatuses: Map<string, IRemoteIngressStatus> = new Map();
|
|
private reconcileInterval: ReturnType<typeof setInterval> | null = null;
|
|
|
|
constructor(manager: RemoteIngressManager, config: ITunnelManagerConfig = {}) {
|
|
this.manager = manager;
|
|
this.config = config;
|
|
this.hub = new plugins.remoteingress.RemoteIngressHub();
|
|
|
|
// Listen for edge connect/disconnect events
|
|
this.hub.on('edgeConnected', (data: { edgeId: string }) => {
|
|
const existing = this.edgeStatuses.get(data.edgeId);
|
|
this.edgeStatuses.set(data.edgeId, {
|
|
edgeId: data.edgeId,
|
|
connected: true,
|
|
publicIp: existing?.publicIp ?? null,
|
|
activeTunnels: 0,
|
|
lastHeartbeat: Date.now(),
|
|
connectedAt: Date.now(),
|
|
});
|
|
});
|
|
|
|
this.hub.on('edgeDisconnected', (data: { edgeId: string }) => {
|
|
this.edgeStatuses.delete(data.edgeId);
|
|
});
|
|
|
|
this.hub.on('streamOpened', (data: { edgeId: string; streamId: number }) => {
|
|
const existing = this.edgeStatuses.get(data.edgeId);
|
|
if (existing) {
|
|
existing.activeTunnels++;
|
|
existing.lastHeartbeat = Date.now();
|
|
}
|
|
});
|
|
|
|
this.hub.on('streamClosed', (data: { edgeId: string; streamId: number }) => {
|
|
const existing = this.edgeStatuses.get(data.edgeId);
|
|
if (existing && existing.activeTunnels > 0) {
|
|
existing.activeTunnels--;
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Start the tunnel hub and load allowed edges.
|
|
*/
|
|
public async start(): Promise<void> {
|
|
await this.hub.start({
|
|
tunnelPort: this.config.tunnelPort ?? 8443,
|
|
targetHost: this.config.targetHost ?? '127.0.0.1',
|
|
});
|
|
|
|
// Send allowed edges to the hub
|
|
await this.syncAllowedEdges();
|
|
|
|
// Periodically reconcile with authoritative Rust hub status
|
|
this.reconcileInterval = setInterval(() => {
|
|
this.reconcile().catch(() => {});
|
|
}, 15_000);
|
|
}
|
|
|
|
/**
|
|
* Stop the tunnel hub.
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
if (this.reconcileInterval) {
|
|
clearInterval(this.reconcileInterval);
|
|
this.reconcileInterval = null;
|
|
}
|
|
// Remove event listeners before stopping to prevent leaks
|
|
this.hub.removeAllListeners();
|
|
await this.hub.stop();
|
|
this.edgeStatuses.clear();
|
|
}
|
|
|
|
/**
|
|
* Reconcile TS-side edge statuses with the authoritative Rust hub status.
|
|
* Overwrites event-derived activeTunnels with the real activeStreams count.
|
|
*/
|
|
private async reconcile(): Promise<void> {
|
|
const hubStatus = await this.hub.getStatus();
|
|
if (!hubStatus || !hubStatus.connectedEdges) return;
|
|
|
|
const rustEdgeIds = new Set<string>();
|
|
|
|
for (const rustEdge of hubStatus.connectedEdges) {
|
|
rustEdgeIds.add(rustEdge.edgeId);
|
|
const existing = this.edgeStatuses.get(rustEdge.edgeId);
|
|
if (existing) {
|
|
existing.activeTunnels = rustEdge.activeStreams;
|
|
existing.lastHeartbeat = Date.now();
|
|
} else {
|
|
// Missed edgeConnected event — add entry
|
|
this.edgeStatuses.set(rustEdge.edgeId, {
|
|
edgeId: rustEdge.edgeId,
|
|
connected: true,
|
|
publicIp: null,
|
|
activeTunnels: rustEdge.activeStreams,
|
|
lastHeartbeat: Date.now(),
|
|
connectedAt: rustEdge.connectedAt * 1000,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Remove entries for edges no longer connected in Rust (missed edgeDisconnected)
|
|
for (const edgeId of this.edgeStatuses.keys()) {
|
|
if (!rustEdgeIds.has(edgeId)) {
|
|
this.edgeStatuses.delete(edgeId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sync allowed edges from the manager to the hub.
|
|
* Call this after creating/deleting/updating edges.
|
|
*/
|
|
public async syncAllowedEdges(): Promise<void> {
|
|
const edges = this.manager.getAllowedEdges();
|
|
await this.hub.updateAllowedEdges(edges);
|
|
}
|
|
|
|
/**
|
|
* Get runtime statuses for all known edges.
|
|
*/
|
|
public getEdgeStatuses(): IRemoteIngressStatus[] {
|
|
return Array.from(this.edgeStatuses.values());
|
|
}
|
|
|
|
/**
|
|
* Get status for a specific edge.
|
|
*/
|
|
public getEdgeStatus(edgeId: string): IRemoteIngressStatus | undefined {
|
|
return this.edgeStatuses.get(edgeId);
|
|
}
|
|
|
|
/**
|
|
* Get the count of connected edges.
|
|
*/
|
|
public getConnectedCount(): number {
|
|
let count = 0;
|
|
for (const status of this.edgeStatuses.values()) {
|
|
if (status.connected) count++;
|
|
}
|
|
return count;
|
|
}
|
|
|
|
/**
|
|
* Get the total number of active tunnels across all edges.
|
|
*/
|
|
public getTotalActiveTunnels(): number {
|
|
let total = 0;
|
|
for (const status of this.edgeStatuses.values()) {
|
|
total += status.activeTunnels;
|
|
}
|
|
return total;
|
|
}
|
|
}
|