From 1afd0e53472e1df134b97acd204eca9052ceb13e Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 17 Mar 2026 00:15:10 +0000 Subject: [PATCH] fix(remoteingress-core): improve tunnel failure detection and reconnect handling --- changelog.md | 7 ++++ rust/crates/remoteingress-core/src/edge.rs | 40 +++++++++++++++++++--- rust/crates/remoteingress-core/src/hub.rs | 25 +++++++++++--- ts/00_commitinfo_data.ts | 2 +- 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/changelog.md b/changelog.md index a64deae..1749f0b 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.7.1 - fix(remoteingress-core) +improve tunnel failure detection and reconnect handling + +- Enable TCP keepalive on edge and hub connections to detect silent network failures sooner +- Trigger immediate reconnect or disconnect when tunnel writer tasks fail instead of waiting for liveness timeouts +- Prevent active stream counter underflow during concurrent connection cleanup + ## 2026-03-16 - 4.7.0 - feat(edge,protocol,test) add configurable edge bind address and expand flow-control test coverage diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 81f47ef..418dc09 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -284,6 +284,13 @@ async fn connect_to_hub_and_run( Ok(s) => { // Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE) let _ = s.set_nodelay(true); + // TCP keepalive detects silent network failures (NAT timeout, path change) + // faster than the 45s application-level liveness timeout. + let ka = socket2::TcpKeepalive::new() + .with_time(Duration::from_secs(30)); + #[cfg(target_os = "linux")] + let ka = ka.with_interval(Duration::from_secs(10)); + let _ = socket2::SockRef::from(&s).set_tcp_keepalive(&ka); s } Err(e) => { @@ -388,18 +395,22 @@ async fn connect_to_hub_and_run( // Legacy alias — control channel for PONG, CLOSE, WINDOW_UPDATE, OPEN let tunnel_writer_tx = tunnel_ctrl_tx.clone(); let tw_token = connection_token.clone(); + // Oneshot to signal the reader loop when the writer dies from a write error. + // This avoids the 45s liveness timeout delay when the tunnel is already dead. + let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>(); let tunnel_writer_handle = tokio::spawn(async move { // BufWriter coalesces small writes (frame headers, control frames) into fewer // TLS records and syscalls. Flushed after each frame to avoid holding data. let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half); + let mut write_error = false; loop { tokio::select! { biased; // control frames always take priority over data ctrl = tunnel_ctrl_rx.recv() => { match ctrl { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { break; } - if writer.flush().await.is_err() { break; } + if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } + if writer.flush().await.is_err() { write_error = true; break; } } None => break, } @@ -407,8 +418,8 @@ async fn connect_to_hub_and_run( data = tunnel_data_rx.recv() => { match data { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { break; } - if writer.flush().await.is_err() { break; } + if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } + if writer.flush().await.is_err() { write_error = true; break; } } None => break, } @@ -416,6 +427,10 @@ async fn connect_to_hub_and_run( _ = tw_token.cancelled() => break, } } + if write_error { + log::error!("Tunnel writer failed, signalling reader for fast reconnect"); + let _ = writer_dead_tx.send(()); + } }); // Start TCP listeners for initial ports (hot-reloadable) @@ -532,6 +547,10 @@ async fn connect_to_hub_and_run( liveness_timeout_dur.as_secs()); break EdgeLoopResult::Reconnect; } + _ = &mut writer_dead_rx => { + log::error!("Tunnel writer died, reconnecting immediately"); + break EdgeLoopResult::Reconnect; + } _ = connection_token.cancelled() => { log::info!("Connection cancelled"); break EdgeLoopResult::Shutdown; @@ -636,7 +655,18 @@ fn apply_port_config( Arc::clone(&active_streams), ) .await; - active_streams.fetch_sub(1, Ordering::Relaxed); + // Saturating decrement: prevent underflow when + // edge_main_loop's store(0) races with task cleanup. + loop { + let current = active_streams.load(Ordering::Relaxed); + if current == 0 { break; } + if active_streams.compare_exchange_weak( + current, current - 1, + Ordering::Relaxed, Ordering::Relaxed, + ).is_ok() { + break; + } + } }); } Err(e) => { diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 10c9acd..8dc76da 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -300,6 +300,13 @@ async fn handle_edge_connection( ) -> Result<(), Box> { // Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE) stream.set_nodelay(true)?; + // TCP keepalive detects silent network failures (NAT timeout, path change) + // faster than the 45s application-level liveness timeout. + let ka = socket2::TcpKeepalive::new() + .with_time(Duration::from_secs(30)); + #[cfg(target_os = "linux")] + let ka = ka.with_interval(Duration::from_secs(10)); + let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka); let tls_stream = acceptor.accept(stream).await?; let (read_half, mut write_half) = tokio::io::split(tls_stream); let mut buf_reader = BufReader::new(read_half); @@ -383,18 +390,20 @@ async fn handle_edge_connection( // Legacy alias for code that sends both control and data (will be migrated) let frame_writer_tx = ctrl_tx.clone(); let writer_token = edge_token.clone(); + let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>(); let writer_handle = tokio::spawn(async move { // BufWriter coalesces small writes (frame headers, control frames) into fewer // TLS records and syscalls. Flushed after each frame to avoid holding data. let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half); + let mut write_error = false; loop { tokio::select! { biased; // control frames always take priority over data ctrl = ctrl_rx.recv() => { match ctrl { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { break; } - if writer.flush().await.is_err() { break; } + if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } + if writer.flush().await.is_err() { write_error = true; break; } } None => break, } @@ -402,8 +411,8 @@ async fn handle_edge_connection( data = data_rx.recv() => { match data { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { break; } - if writer.flush().await.is_err() { break; } + if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } + if writer.flush().await.is_err() { write_error = true; break; } } None => break, } @@ -411,6 +420,10 @@ async fn handle_edge_connection( _ = writer_token.cancelled() => break, } } + if write_error { + log::error!("Tunnel writer to edge failed, signalling reader for fast cleanup"); + let _ = writer_dead_tx.send(()); + } }); // Spawn task to forward config updates as FRAME_CONFIG frames @@ -754,6 +767,10 @@ async fn handle_edge_connection( edge_id, liveness_timeout_dur.as_secs()); break; } + _ = &mut writer_dead_rx => { + log::error!("Tunnel writer to edge {} died, disconnecting immediately", edge_id); + break; + } _ = edge_token.cancelled() => { log::info!("Edge {} cancelled by hub", edge_id); break; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index e0d8984..18d1c5c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.7.0', + version: '4.7.1', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }