diff --git a/changelog.md b/changelog.md index 9f456f6..f10eb97 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-03-03 - 4.4.0 - feat(remoteingress) +add heartbeat PING/PONG and liveness timeouts; implement fast-reconnect/backoff reset and JS crash-recovery auto-restart + +- protocol: add FRAME_PING and FRAME_PONG and unit tests for ping/pong frames +- edge (Rust): reset backoff after successful connection, respond to PING with PONG, track liveness via deadline and reconnect on timeout, use Duration/Instant helpers +- hub (Rust): send periodic PING to edges, handle PONGs, enforce liveness timeout and disconnect inactive edges, use tokio interval and time utilities +- ts: RemoteIngressEdge and RemoteIngressHub: add crash-recovery auto-restart with exponential backoff and max attempts, save/restore config and allowed edges, register/remove exit handlers, ensure stop() marks stopping and cleans up listeners +- minor API/typing: introduce TAllowedEdge alias and persist allowed edges for restart recovery + ## 2026-02-26 - 4.3.0 - feat(hub) add optional TLS certificate/key support to hub start config and bridge diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 21e0fc9..d19734e 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -1,10 +1,12 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex, RwLock}; use tokio::task::JoinHandle; +use tokio::time::{Instant, sleep_until}; use tokio_rustls::TlsConnector; use tokio_util::sync::CancellationToken; use serde::{Deserialize, Serialize}; @@ -202,6 +204,13 @@ async fn edge_main_loop( // Cancel connection token to kill all orphaned tasks from this cycle connection_token.cancel(); + // Reset backoff after a successful connection for fast reconnect + let was_connected = *connected.read().await; + if was_connected { + backoff_ms = 1000; + log::info!("Was connected; resetting backoff to {}ms for fast reconnect", backoff_ms); + } + *connected.write().await = false; let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected); active_streams.store(0, Ordering::Relaxed); @@ -214,7 +223,7 @@ async fn edge_main_loop( EdgeLoopResult::Reconnect => { log::info!("Reconnecting in {}ms...", backoff_ms); tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)) => {} + _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} _ = cancel_token.cancelled() => break, _ = shutdown_rx.recv() => break, } @@ -336,7 +345,7 @@ async fn connect_to_hub_and_run( _ = stun_token.cancelled() => break, } tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(stun_interval)) => {} + _ = tokio::time::sleep(Duration::from_secs(stun_interval)) => {} _ = stun_token.cancelled() => break, } } @@ -380,6 +389,11 @@ async fn connect_to_hub_and_run( connection_token, ); + // Heartbeat: liveness timeout detects silent hub failures + let liveness_timeout_dur = Duration::from_secs(45); + let mut last_activity = Instant::now(); + let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur)); + // Read frames from hub let mut frame_reader = FrameReader::new(buf_reader); let result = loop { @@ -387,6 +401,10 @@ async fn connect_to_hub_and_run( frame_result = frame_reader.next_frame() => { match frame_result { Ok(Some(frame)) => { + // Reset liveness on any received frame + last_activity = Instant::now(); + liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); + match frame.frame_type { FRAME_DATA_BACK => { // A1: Non-blocking send to prevent head-of-line blocking @@ -420,6 +438,14 @@ async fn connect_to_hub_and_run( ); } } + FRAME_PING => { + let pong_frame = encode_frame(0, FRAME_PONG, &[]); + if tunnel_writer_tx.try_send(pong_frame).is_err() { + log::warn!("Failed to send PONG, writer channel full/closed"); + break EdgeLoopResult::Reconnect; + } + log::trace!("Received PING from hub, sent PONG"); + } _ => { log::warn!("Unexpected frame type {} from hub", frame.frame_type); } @@ -435,6 +461,11 @@ async fn connect_to_hub_and_run( } } } + _ = &mut liveness_deadline => { + log::warn!("Hub liveness timeout (no frames for {}s), reconnecting", + liveness_timeout_dur.as_secs()); + break EdgeLoopResult::Reconnect; + } _ = connection_token.cancelled() => { log::info!("Connection cancelled"); break EdgeLoopResult::Shutdown; diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index cdbba5b..42211ac 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex, RwLock, Semaphore}; +use tokio::time::{interval, sleep_until, Instant}; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use serde::{Deserialize, Serialize}; @@ -407,6 +409,14 @@ async fn handle_edge_connection( // A4: Semaphore to limit concurrent streams per edge let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE)); + // Heartbeat: periodic PING and liveness timeout + let ping_interval_dur = Duration::from_secs(15); + let liveness_timeout_dur = Duration::from_secs(45); + let mut ping_ticker = interval(ping_interval_dur); + ping_ticker.tick().await; // consume the immediate first tick + let mut last_activity = Instant::now(); + let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur)); + // Frame reading loop let mut frame_reader = FrameReader::new(buf_reader); @@ -415,6 +425,10 @@ async fn handle_edge_connection( frame_result = frame_reader.next_frame() => { match frame_result { Ok(Some(frame)) => { + // Reset liveness on any received frame + last_activity = Instant::now(); + liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); + match frame.frame_type { FRAME_OPEN => { // A4: Check stream limit before processing @@ -462,7 +476,7 @@ async fn handle_edge_connection( let result = async { // A2: Connect to SmartProxy with timeout let mut upstream = tokio::time::timeout( - std::time::Duration::from_secs(10), + Duration::from_secs(10), TcpStream::connect((target.as_str(), dest_port)), ) .await @@ -571,6 +585,9 @@ async fn handle_edge_connection( }); } } + FRAME_PONG => { + log::debug!("Received PONG from edge {}", edge_id); + } _ => { log::warn!("Unexpected frame type {} from edge", frame.frame_type); } @@ -586,6 +603,19 @@ async fn handle_edge_connection( } } } + _ = ping_ticker.tick() => { + let ping_frame = encode_frame(0, FRAME_PING, &[]); + if frame_writer_tx.try_send(ping_frame).is_err() { + log::warn!("Failed to send PING to edge {}, writer channel full/closed", edge_id); + break; + } + log::trace!("Sent PING to edge {}", edge_id); + } + _ = &mut liveness_deadline => { + log::warn!("Edge {} liveness timeout (no frames for {}s), disconnecting", + edge_id, liveness_timeout_dur.as_secs()); + break; + } _ = edge_token.cancelled() => { log::info!("Edge {} cancelled by hub", edge_id); break; diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 786517d..4db3e97 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -7,6 +7,8 @@ pub const FRAME_CLOSE: u8 = 0x03; pub const FRAME_DATA_BACK: u8 = 0x04; pub const FRAME_CLOSE_BACK: u8 = 0x05; pub const FRAME_CONFIG: u8 = 0x06; // Hub -> Edge: configuration update +pub const FRAME_PING: u8 = 0x07; // Hub -> Edge: heartbeat probe +pub const FRAME_PONG: u8 = 0x08; // Edge -> Hub: heartbeat response // Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes pub const FRAME_HEADER_SIZE: usize = 9; @@ -261,6 +263,8 @@ mod tests { FRAME_DATA_BACK, FRAME_CLOSE_BACK, FRAME_CONFIG, + FRAME_PING, + FRAME_PONG, ]; let mut data = Vec::new(); @@ -293,4 +297,19 @@ mod tests { assert_eq!(frame.frame_type, FRAME_CLOSE); assert!(frame.payload.is_empty()); } + + #[test] + fn test_encode_frame_ping_pong() { + // PING: stream_id=0, empty payload (control frame) + let ping = encode_frame(0, FRAME_PING, &[]); + assert_eq!(ping[4], FRAME_PING); + assert_eq!(&ping[0..4], &0u32.to_be_bytes()); + assert_eq!(ping.len(), FRAME_HEADER_SIZE); + + // PONG: stream_id=0, empty payload (control frame) + let pong = encode_frame(0, FRAME_PONG, &[]); + assert_eq!(pong[4], FRAME_PONG); + assert_eq!(&pong[0..4], &0u32.to_be_bytes()); + assert_eq!(pong.len(), FRAME_HEADER_SIZE); + } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 62e6f58..95d599b 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.3.0', + version: '4.4.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' } diff --git a/ts/classes.remoteingressedge.ts b/ts/classes.remoteingressedge.ts index 9c4c0f6..77f792e 100644 --- a/ts/classes.remoteingressedge.ts +++ b/ts/classes.remoteingressedge.ts @@ -40,9 +40,16 @@ export interface IEdgeConfig { secret: string; } +const MAX_RESTART_ATTEMPTS = 10; +const MAX_RESTART_BACKOFF_MS = 30_000; + export class RemoteIngressEdge extends EventEmitter { private bridge: InstanceType>; private started = false; + private stopping = false; + private savedConfig: IEdgeConfig | null = null; + private restartBackoffMs = 1000; + private restartAttempts = 0; private statusInterval: ReturnType | undefined; constructor() { @@ -109,11 +116,17 @@ export class RemoteIngressEdge extends EventEmitter { edgeConfig = config; } + this.savedConfig = edgeConfig; + this.stopping = false; + const spawned = await this.bridge.spawn(); if (!spawned) { throw new Error('Failed to spawn remoteingress-bin'); } + // Register crash recovery handler + this.bridge.on('exit', this.handleCrashRecovery); + await this.bridge.sendCommand('startEdge', { hubHost: edgeConfig.hubHost, hubPort: edgeConfig.hubPort ?? 8443, @@ -122,6 +135,8 @@ export class RemoteIngressEdge extends EventEmitter { }); this.started = true; + this.restartAttempts = 0; + this.restartBackoffMs = 1000; // Start periodic status logging this.statusInterval = setInterval(async () => { @@ -142,6 +157,7 @@ export class RemoteIngressEdge extends EventEmitter { * Stop the edge and kill the Rust process. */ public async stop(): Promise { + this.stopping = true; if (this.statusInterval) { clearInterval(this.statusInterval); this.statusInterval = undefined; @@ -152,6 +168,7 @@ export class RemoteIngressEdge extends EventEmitter { } catch { // Process may already be dead } + this.bridge.removeListener('exit', this.handleCrashRecovery); this.bridge.kill(); this.started = false; } @@ -170,4 +187,55 @@ export class RemoteIngressEdge extends EventEmitter { public get running(): boolean { return this.bridge.running; } + + /** + * Handle unexpected Rust binary crash — auto-restart with backoff. + */ + private handleCrashRecovery = async (code: number | null, signal: string | null) => { + if (this.stopping || !this.started || !this.savedConfig) { + return; + } + + console.error( + `[RemoteIngressEdge] Rust binary crashed (code=${code}, signal=${signal}), ` + + `attempt ${this.restartAttempts + 1}/${MAX_RESTART_ATTEMPTS}` + ); + + this.started = false; + + if (this.restartAttempts >= MAX_RESTART_ATTEMPTS) { + console.error('[RemoteIngressEdge] Max restart attempts reached, giving up'); + this.emit('crashRecoveryFailed'); + return; + } + + await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs)); + this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS); + this.restartAttempts++; + + try { + const spawned = await this.bridge.spawn(); + if (!spawned) { + console.error('[RemoteIngressEdge] Failed to respawn binary'); + return; + } + + this.bridge.on('exit', this.handleCrashRecovery); + + await this.bridge.sendCommand('startEdge', { + hubHost: this.savedConfig.hubHost, + hubPort: this.savedConfig.hubPort ?? 8443, + edgeId: this.savedConfig.edgeId, + secret: this.savedConfig.secret, + }); + + this.started = true; + this.restartAttempts = 0; + this.restartBackoffMs = 1000; + console.log('[RemoteIngressEdge] Successfully recovered from crash'); + this.emit('crashRecovered'); + } catch (err) { + console.error(`[RemoteIngressEdge] Crash recovery failed: ${err}`); + } + }; } diff --git a/ts/classes.remoteingresshub.ts b/ts/classes.remoteingresshub.ts index b1486f8..d1a79af 100644 --- a/ts/classes.remoteingresshub.ts +++ b/ts/classes.remoteingresshub.ts @@ -50,9 +50,19 @@ export interface IHubConfig { }; } +type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }; + +const MAX_RESTART_ATTEMPTS = 10; +const MAX_RESTART_BACKOFF_MS = 30_000; + export class RemoteIngressHub extends EventEmitter { private bridge: InstanceType>; private started = false; + private stopping = false; + private savedConfig: IHubConfig | null = null; + private savedEdges: TAllowedEdge[] = []; + private restartBackoffMs = 1000; + private restartAttempts = 0; constructor() { super(); @@ -98,11 +108,17 @@ export class RemoteIngressHub extends EventEmitter { * Start the hub — spawns the Rust binary and starts the tunnel server. */ public async start(config: IHubConfig = {}): Promise { + this.savedConfig = config; + this.stopping = false; + const spawned = await this.bridge.spawn(); if (!spawned) { throw new Error('Failed to spawn remoteingress-bin'); } + // Register crash recovery handler + this.bridge.on('exit', this.handleCrashRecovery); + await this.bridge.sendCommand('startHub', { tunnelPort: config.tunnelPort ?? 8443, targetHost: config.targetHost ?? '127.0.0.1', @@ -112,18 +128,22 @@ export class RemoteIngressHub extends EventEmitter { }); this.started = true; + this.restartAttempts = 0; + this.restartBackoffMs = 1000; } /** * Stop the hub and kill the Rust process. */ public async stop(): Promise { + this.stopping = true; if (this.started) { try { await this.bridge.sendCommand('stopHub', {} as Record); } catch { // Process may already be dead } + this.bridge.removeListener('exit', this.handleCrashRecovery); this.bridge.kill(); this.started = false; } @@ -132,7 +152,8 @@ export class RemoteIngressHub extends EventEmitter { /** * Update the list of allowed edges that can connect to this hub. */ - public async updateAllowedEdges(edges: Array<{ id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }>): Promise { + public async updateAllowedEdges(edges: TAllowedEdge[]): Promise { + this.savedEdges = edges; await this.bridge.sendCommand('updateAllowedEdges', { edges }); } @@ -149,4 +170,62 @@ export class RemoteIngressHub extends EventEmitter { public get running(): boolean { return this.bridge.running; } + + /** + * Handle unexpected Rust binary crash — auto-restart with backoff. + */ + private handleCrashRecovery = async (code: number | null, signal: string | null) => { + if (this.stopping || !this.started || !this.savedConfig) { + return; + } + + console.error( + `[RemoteIngressHub] Rust binary crashed (code=${code}, signal=${signal}), ` + + `attempt ${this.restartAttempts + 1}/${MAX_RESTART_ATTEMPTS}` + ); + + this.started = false; + + if (this.restartAttempts >= MAX_RESTART_ATTEMPTS) { + console.error('[RemoteIngressHub] Max restart attempts reached, giving up'); + this.emit('crashRecoveryFailed'); + return; + } + + await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs)); + this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS); + this.restartAttempts++; + + try { + const spawned = await this.bridge.spawn(); + if (!spawned) { + console.error('[RemoteIngressHub] Failed to respawn binary'); + return; + } + + this.bridge.on('exit', this.handleCrashRecovery); + + const config = this.savedConfig; + await this.bridge.sendCommand('startHub', { + tunnelPort: config.tunnelPort ?? 8443, + targetHost: config.targetHost ?? '127.0.0.1', + ...(config.tls?.certPem && config.tls?.keyPem + ? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem } + : {}), + }); + + // Restore allowed edges + if (this.savedEdges.length > 0) { + await this.bridge.sendCommand('updateAllowedEdges', { edges: this.savedEdges }); + } + + this.started = true; + this.restartAttempts = 0; + this.restartBackoffMs = 1000; + console.log('[RemoteIngressHub] Successfully recovered from crash'); + this.emit('crashRecovered'); + } catch (err) { + console.error(`[RemoteIngressHub] Crash recovery failed: ${err}`); + } + }; }