From ac993dd5a3d0840e707c20716ca151bd987b76a4 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Fri, 27 Mar 2026 11:34:31 +0000 Subject: [PATCH] fix(core): harden UDP session handling, QUIC control message validation, and bridge process cleanup --- changelog.md | 7 +++ rust/crates/remoteingress-core/src/edge.rs | 10 +++- rust/crates/remoteingress-core/src/hub.rs | 7 +++ .../remoteingress-core/src/transport/quic.rs | 11 ++++ .../remoteingress-core/src/udp_session.rs | 59 +++++++++++++++---- ts/00_commitinfo_data.ts | 2 +- ts/classes.remoteingressedge.ts | 30 +++++++--- ts/classes.remoteingresshub.ts | 28 ++++++--- 8 files changed, 124 insertions(+), 30 deletions(-) diff --git a/changelog.md b/changelog.md index 74fb564..06916bf 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-27 - 4.15.3 - fix(core) +harden UDP session handling, QUIC control message validation, and bridge process cleanup + +- cap UDP session creation and drop excess datagrams with warnings to prevent unbounded session growth +- periodically prune closed datagram sessions on the hub and reject oversized QUIC control messages to avoid resource exhaustion +- clean up spawned edge and hub bridge processes on startup failure, remove listeners on stop, and avoid restarting after shutdown during backoff + ## 2026-03-26 - 4.15.2 - fix(readme) adjust tunnel diagram alignment in the README diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index c9c03fe..372fe0a 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -954,7 +954,10 @@ fn apply_udp_port_config( } else { // New session — allocate stream_id and send UDP_OPEN let sid = next_stream_id.fetch_add(1, Ordering::Relaxed); - sessions.insert(key, sid); + if sessions.insert(key, sid).is_none() { + log::warn!("UDP session limit reached, dropping datagram from {}", client_addr); + continue; + } let client_ip = client_addr.ip().to_string(); let client_port = client_addr.port(); @@ -1681,7 +1684,10 @@ fn apply_udp_port_config_quic( } else { // New session — send PROXY v2 header via control-style datagram let sid = next_stream_id.fetch_add(1, Ordering::Relaxed); - sessions.insert(key, sid); + if sessions.insert(key, sid).is_none() { + log::warn!("QUIC UDP session limit reached, dropping datagram from {}", client_addr); + continue; + } let client_ip = client_addr.ip().to_string(); let client_port = client_addr.port(); diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 6f2913b..19e8d67 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -1374,8 +1374,15 @@ async fn handle_edge_connection_quic( let dgram_edge_id = edge_id.clone(); let dgram_token = edge_token.clone(); let dgram_handle = tokio::spawn(async move { + let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30)); + cleanup_interval.tick().await; // consume initial tick loop { tokio::select! { + // Periodic sweep: prune sessions whose task has exited (receiver dropped) + _ = cleanup_interval.tick() => { + let mut s = dgram_sessions.lock().await; + s.retain(|_id, tx| !tx.is_closed()); + } datagram = dgram_conn.read_datagram() => { match datagram { Ok(data) => { diff --git a/rust/crates/remoteingress-core/src/transport/quic.rs b/rust/crates/remoteingress-core/src/transport/quic.rs index 902ff7c..77d7898 100644 --- a/rust/crates/remoteingress-core/src/transport/quic.rs +++ b/rust/crates/remoteingress-core/src/transport/quic.rs @@ -76,6 +76,11 @@ pub async fn write_ctrl_message( Ok(()) } +/// Maximum size for a QUIC control message payload (64 KB). +/// Control messages (CONFIG, PING, PONG) are small; this guards against +/// a malicious peer sending a crafted length field to trigger OOM. +const MAX_CTRL_MESSAGE_SIZE: usize = 65536; + /// Read a control message from a QUIC recv stream. /// Returns (msg_type, payload). Returns None on EOF. pub async fn read_ctrl_message( @@ -93,6 +98,12 @@ pub async fn read_ctrl_message( } let msg_type = header[0]; let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize; + if len > MAX_CTRL_MESSAGE_SIZE { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("control message too large: {} bytes (max {})", len, MAX_CTRL_MESSAGE_SIZE), + )); + } let mut payload = vec![0u8; len]; if len > 0 { recv.read_exact(&mut payload).await.map_err(|e| { diff --git a/rust/crates/remoteingress-core/src/udp_session.rs b/rust/crates/remoteingress-core/src/udp_session.rs index 5946b6f..c8a106b 100644 --- a/rust/crates/remoteingress-core/src/udp_session.rs +++ b/rust/crates/remoteingress-core/src/udp_session.rs @@ -17,7 +17,7 @@ pub struct UdpSession { pub last_activity: Instant, } -/// Manages UDP sessions with idle timeout expiry. +/// Manages UDP sessions with idle timeout expiry and a maximum session count. pub struct UdpSessionManager { /// Forward map: session key → session data. sessions: HashMap, @@ -25,14 +25,21 @@ pub struct UdpSessionManager { by_stream_id: HashMap, /// Idle timeout duration. idle_timeout: std::time::Duration, + /// Maximum number of concurrent sessions (prevents unbounded growth from floods). + max_sessions: usize, } impl UdpSessionManager { pub fn new(idle_timeout: std::time::Duration) -> Self { + Self::with_max_sessions(idle_timeout, 65536) + } + + pub fn with_max_sessions(idle_timeout: std::time::Duration, max_sessions: usize) -> Self { Self { sessions: HashMap::new(), by_stream_id: HashMap::new(), idle_timeout, + max_sessions, } } @@ -57,8 +64,12 @@ impl UdpSessionManager { Some(session) } - /// Insert a new session. Returns a mutable reference to it. - pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession { + /// Insert a new session. Returns `None` if the session limit has been reached. + pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> Option<&mut UdpSession> { + // Allow re-insertion of existing keys (update), but reject truly new sessions at capacity + if !self.sessions.contains_key(&key) && self.sessions.len() >= self.max_sessions { + return None; + } let session = UdpSession { stream_id, client_addr: key.client_addr, @@ -66,7 +77,7 @@ impl UdpSessionManager { last_activity: Instant::now(), }; self.by_stream_id.insert(stream_id, key); - self.sessions.entry(key).or_insert(session) + Some(self.sessions.entry(key).or_insert(session)) } /// Remove a session by stream_id. @@ -118,7 +129,7 @@ mod tests { fn test_insert_and_lookup() { let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; - mgr.insert(key, 1); + assert!(mgr.insert(key, 1).is_some()); assert_eq!(mgr.len(), 1); assert!(mgr.get_mut(&key).is_some()); @@ -129,7 +140,7 @@ mod tests { fn test_client_addr_for_stream() { let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; - mgr.insert(key, 42); + assert!(mgr.insert(key, 42).is_some()); assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000))); assert_eq!(mgr.client_addr_for_stream(99), None); @@ -139,7 +150,7 @@ mod tests { fn test_remove_by_stream_id() { let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; - mgr.insert(key, 1); + assert!(mgr.insert(key, 1).is_some()); let removed = mgr.remove_by_stream_id(1); assert!(removed.is_some()); @@ -159,8 +170,8 @@ mod tests { let mut mgr = UdpSessionManager::new(Duration::from_millis(50)); let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 }; - mgr.insert(key1, 1); - mgr.insert(key2, 2); + assert!(mgr.insert(key1, 1).is_some()); + assert!(mgr.insert(key2, 2).is_some()); // Nothing expired yet assert!(mgr.expire_idle().is_empty()); @@ -178,7 +189,7 @@ mod tests { async fn test_activity_prevents_expiry() { let mut mgr = UdpSessionManager::new(Duration::from_millis(100)); let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; - mgr.insert(key, 1); + assert!(mgr.insert(key, 1).is_some()); // Touch session at 50ms (before 100ms timeout) tokio::time::sleep(Duration::from_millis(50)).await; @@ -200,11 +211,35 @@ mod tests { let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 }; - mgr.insert(key1, 1); - mgr.insert(key2, 2); + assert!(mgr.insert(key1, 1).is_some()); + assert!(mgr.insert(key2, 2).is_some()); assert_eq!(mgr.len(), 2); assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1); assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2); } + + #[test] + fn test_max_sessions_limit() { + let mut mgr = UdpSessionManager::with_max_sessions(Duration::from_secs(60), 2); + let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 }; + let key3 = UdpSessionKey { client_addr: addr(5002), dest_port: 53 }; + + assert!(mgr.insert(key1, 1).is_some()); + assert!(mgr.insert(key2, 2).is_some()); + // Third insert should be rejected (at capacity) + assert!(mgr.insert(key3, 3).is_none()); + assert_eq!(mgr.len(), 2); + + // Re-inserting an existing key should succeed (update, not new) + assert!(mgr.insert(key1, 1).is_some()); + assert_eq!(mgr.len(), 2); + + // After removing one, a new insert should succeed + mgr.remove_by_stream_id(1); + assert_eq!(mgr.len(), 1); + assert!(mgr.insert(key3, 3).is_some()); + assert_eq!(mgr.len(), 2); + } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 23b010f..efd458d 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.15.2', + version: '4.15.3', description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.' } diff --git a/ts/classes.remoteingressedge.ts b/ts/classes.remoteingressedge.ts index e944eb4..8a1c7c4 100644 --- a/ts/classes.remoteingressedge.ts +++ b/ts/classes.remoteingressedge.ts @@ -222,14 +222,21 @@ export class RemoteIngressEdge extends EventEmitter { this.bridge.removeListener('exit', this.handleCrashRecovery); this.bridge.on('exit', this.handleCrashRecovery); - await this.bridge.sendCommand('startEdge', { - hubHost: edgeConfig.hubHost, - hubPort: edgeConfig.hubPort ?? 8443, - edgeId: edgeConfig.edgeId, - secret: edgeConfig.secret, - ...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}), - ...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}), - }); + try { + await this.bridge.sendCommand('startEdge', { + hubHost: edgeConfig.hubHost, + hubPort: edgeConfig.hubPort ?? 8443, + edgeId: edgeConfig.edgeId, + secret: edgeConfig.secret, + ...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}), + ...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}), + }); + } catch (err) { + // Clean up the spawned process to avoid orphaning it + this.bridge.removeListener('exit', this.handleCrashRecovery); + this.bridge.kill(); + throw err; + } this.started = true; this.restartAttempts = 0; @@ -282,6 +289,9 @@ export class RemoteIngressEdge extends EventEmitter { this.started = false; } this.savedConfig = null; + // Remove all listeners to prevent memory buildup + this.bridge.removeAllListeners(); + this.removeAllListeners(); } /** @@ -326,6 +336,10 @@ export class RemoteIngressEdge extends EventEmitter { } await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs)); + // Re-check after backoff — stop() may have been called during the wait + if (this.stopping || !this.savedConfig) { + return; + } this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS); this.restartAttempts++; diff --git a/ts/classes.remoteingresshub.ts b/ts/classes.remoteingresshub.ts index a79a456..df71efa 100644 --- a/ts/classes.remoteingresshub.ts +++ b/ts/classes.remoteingresshub.ts @@ -156,13 +156,20 @@ export class RemoteIngressHub extends EventEmitter { this.bridge.removeListener('exit', this.handleCrashRecovery); this.bridge.on('exit', this.handleCrashRecovery); - await this.bridge.sendCommand('startHub', { - tunnelPort: config.tunnelPort ?? 8443, - targetHost: config.targetHost ?? '127.0.0.1', - ...(config.tls?.certPem && config.tls?.keyPem - ? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem } - : {}), - }); + try { + await this.bridge.sendCommand('startHub', { + tunnelPort: config.tunnelPort ?? 8443, + targetHost: config.targetHost ?? '127.0.0.1', + ...(config.tls?.certPem && config.tls?.keyPem + ? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem } + : {}), + }); + } catch (err) { + // Clean up the spawned process to avoid orphaning it + this.bridge.removeListener('exit', this.handleCrashRecovery); + this.bridge.kill(); + throw err; + } this.started = true; this.restartAttempts = 0; @@ -186,6 +193,9 @@ export class RemoteIngressHub extends EventEmitter { } this.savedConfig = null; this.savedEdges = []; + // Remove all listeners to prevent memory buildup + this.bridge.removeAllListeners(); + this.removeAllListeners(); } /** @@ -232,6 +242,10 @@ export class RemoteIngressHub extends EventEmitter { } await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs)); + // Re-check after backoff — stop() may have been called during the wait + if (this.stopping || !this.savedConfig) { + return; + } this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS); this.restartAttempts++;