From b851bc799418106c2c9ccd58308fda8025a8650f Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 17 Mar 2026 00:39:57 +0000 Subject: [PATCH] fix(remoteingress-core): add tunnel write timeouts and scale initial stream windows by active stream count --- changelog.md | 6 ++++++ rust/crates/remoteingress-core/src/edge.rs | 24 ++++++++++++++++------ rust/crates/remoteingress-core/src/hub.rs | 24 ++++++++++++++++------ ts/00_commitinfo_data.ts | 2 +- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/changelog.md b/changelog.md index 1749f0b..47cc104 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2026-03-17 - 4.7.2 - fix(remoteingress-core) +add tunnel write timeouts and scale initial stream windows by active stream count + +- Wrap tunnel frame writes and flushes in a 30-second timeout on both edge and hub to detect stalled writers and trigger faster reconnect or cleanup. +- Compute each stream's initial send window from the current active stream count instead of using a fixed window to keep total in-flight data within the 32MB budget. + ## 2026-03-17 - 4.7.1 - fix(remoteingress-core) improve tunnel failure detection and reconnect handling diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 418dc09..5a78e8e 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -403,14 +403,18 @@ async fn connect_to_hub_and_run( // TLS records and syscalls. Flushed after each frame to avoid holding data. let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half); let mut write_error = false; + let write_timeout = Duration::from_secs(30); loop { tokio::select! { biased; // control frames always take priority over data ctrl = tunnel_ctrl_rx.recv() => { match ctrl { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } - if writer.flush().await.is_err() { write_error = true; break; } + let ok = tokio::time::timeout(write_timeout, async { + writer.write_all(&frame_data).await?; + writer.flush().await + }).await; + if !matches!(ok, Ok(Ok(()))) { write_error = true; break; } } None => break, } @@ -418,8 +422,11 @@ async fn connect_to_hub_and_run( data = tunnel_data_rx.recv() => { match data { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } - if writer.flush().await.is_err() { write_error = true; break; } + let ok = tokio::time::timeout(write_timeout, async { + writer.write_all(&frame_data).await?; + writer.flush().await + }).await; + if !matches!(ok, Ok(Ok(()))) { write_error = true; break; } } None => break, } @@ -428,7 +435,7 @@ async fn connect_to_hub_and_run( } } if write_error { - log::error!("Tunnel writer failed, signalling reader for fast reconnect"); + log::error!("Tunnel writer failed or stalled, signalling reader for fast reconnect"); let _ = writer_dead_tx.send(()); } }); @@ -712,7 +719,12 @@ async fn handle_client_connection( // Set up channel for data coming back from hub (capacity 16 is sufficient with flow control) let (back_tx, mut back_rx) = mpsc::channel::>(256); - let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW)); + // Adaptive initial window: scale with current stream count to keep total in-flight + // data within the 32MB budget. Prevents burst flooding when many streams open. + let initial_window = remoteingress_protocol::compute_window_for_stream_count( + active_streams.load(Ordering::Relaxed), + ); + let send_window = Arc::new(AtomicU32::new(initial_window)); let window_notify = Arc::new(Notify::new()); { let mut writers = client_writers.lock().await; diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 8dc76da..75da97a 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -396,14 +396,18 @@ async fn handle_edge_connection( // TLS records and syscalls. Flushed after each frame to avoid holding data. let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half); let mut write_error = false; + let write_timeout = Duration::from_secs(30); loop { tokio::select! { biased; // control frames always take priority over data ctrl = ctrl_rx.recv() => { match ctrl { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } - if writer.flush().await.is_err() { write_error = true; break; } + let ok = tokio::time::timeout(write_timeout, async { + writer.write_all(&frame_data).await?; + writer.flush().await + }).await; + if !matches!(ok, Ok(Ok(()))) { write_error = true; break; } } None => break, } @@ -411,8 +415,11 @@ async fn handle_edge_connection( data = data_rx.recv() => { match data { Some(frame_data) => { - if writer.write_all(&frame_data).await.is_err() { write_error = true; break; } - if writer.flush().await.is_err() { write_error = true; break; } + let ok = tokio::time::timeout(write_timeout, async { + writer.write_all(&frame_data).await?; + writer.flush().await + }).await; + if !matches!(ok, Ok(Ok(()))) { write_error = true; break; } } None => break, } @@ -421,7 +428,7 @@ async fn handle_edge_connection( } } if write_error { - log::error!("Tunnel writer to edge failed, signalling reader for fast cleanup"); + log::error!("Tunnel writer to edge failed or stalled, signalling reader for fast cleanup"); let _ = writer_dead_tx.send(()); } }); @@ -512,7 +519,12 @@ async fn handle_edge_connection( // Create channel for data from edge to this stream (capacity 16 is sufficient with flow control) let (data_tx, mut data_rx) = mpsc::channel::>(256); - let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW)); + // Adaptive initial window: scale with current stream count + // to keep total in-flight data within the 32MB budget. + let initial_window = compute_window_for_stream_count( + edge_stream_count.load(Ordering::Relaxed), + ); + let send_window = Arc::new(AtomicU32::new(initial_window)); let window_notify = Arc::new(Notify::new()); { let mut s = streams.lock().await; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 18d1c5c..61048b9 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.7.1', + version: '4.7.2', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }