fix(remoteingress): Reconcile tunnel manager edge statuses with authoritative Rust hub periodically; update active tunnel counts and heartbeats, add missed edges, remove stale entries, and clear reconcile interval on stop
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@serve.zone/dcrouter',
|
||||
version: '9.1.4',
|
||||
version: '9.1.5',
|
||||
description: 'A multifaceted routing service handling mail and SMS delivery functions.'
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ export class TunnelManager {
|
||||
private manager: RemoteIngressManager;
|
||||
private config: ITunnelManagerConfig;
|
||||
private edgeStatuses: Map<string, IRemoteIngressStatus> = new Map();
|
||||
private reconcileInterval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
constructor(manager: RemoteIngressManager, config: ITunnelManagerConfig = {}) {
|
||||
this.manager = manager;
|
||||
@@ -65,16 +66,62 @@ export class TunnelManager {
|
||||
|
||||
// Send allowed edges to the hub
|
||||
await this.syncAllowedEdges();
|
||||
|
||||
// Periodically reconcile with authoritative Rust hub status
|
||||
this.reconcileInterval = setInterval(() => {
|
||||
this.reconcile().catch(() => {});
|
||||
}, 15_000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the tunnel hub.
|
||||
*/
|
||||
public async stop(): Promise<void> {
|
||||
if (this.reconcileInterval) {
|
||||
clearInterval(this.reconcileInterval);
|
||||
this.reconcileInterval = null;
|
||||
}
|
||||
await this.hub.stop();
|
||||
this.edgeStatuses.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile TS-side edge statuses with the authoritative Rust hub status.
|
||||
* Overwrites event-derived activeTunnels with the real activeStreams count.
|
||||
*/
|
||||
private async reconcile(): Promise<void> {
|
||||
const hubStatus = await this.hub.getStatus();
|
||||
if (!hubStatus || !hubStatus.connectedEdges) return;
|
||||
|
||||
const rustEdgeIds = new Set<string>();
|
||||
|
||||
for (const rustEdge of hubStatus.connectedEdges) {
|
||||
rustEdgeIds.add(rustEdge.edgeId);
|
||||
const existing = this.edgeStatuses.get(rustEdge.edgeId);
|
||||
if (existing) {
|
||||
existing.activeTunnels = rustEdge.activeStreams;
|
||||
existing.lastHeartbeat = Date.now();
|
||||
} else {
|
||||
// Missed edgeConnected event — add entry
|
||||
this.edgeStatuses.set(rustEdge.edgeId, {
|
||||
edgeId: rustEdge.edgeId,
|
||||
connected: true,
|
||||
publicIp: null,
|
||||
activeTunnels: rustEdge.activeStreams,
|
||||
lastHeartbeat: Date.now(),
|
||||
connectedAt: rustEdge.connectedAt * 1000,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Remove entries for edges no longer connected in Rust (missed edgeDisconnected)
|
||||
for (const edgeId of this.edgeStatuses.keys()) {
|
||||
if (!rustEdgeIds.has(edgeId)) {
|
||||
this.edgeStatuses.delete(edgeId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync allowed edges from the manager to the hub.
|
||||
* Call this after creating/deleting/updating edges.
|
||||
|
||||
Reference in New Issue
Block a user