From e8431c01748f465822461716b6e2c4c764a6373b Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 17 Mar 2026 11:47:33 +0000 Subject: [PATCH] fix(remoteingress-core): prevent stream stalls by guaranteeing flow-control updates and avoiding bounded per-stream channel overflows --- changelog.md | 7 ++++ rust/crates/remoteingress-core/src/edge.rs | 32 ++++++++++++------ rust/crates/remoteingress-core/src/hub.rs | 39 ++++++++++++---------- ts/00_commitinfo_data.ts | 2 +- 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/changelog.md b/changelog.md index 6cdbdcd..1dec4ad 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.8.4 - fix(remoteingress-core) +prevent stream stalls by guaranteeing flow-control updates and avoiding bounded per-stream channel overflows + +- Replace bounded per-stream data channels with unbounded channels on edge and hub, relying on existing WINDOW_UPDATE flow control to limit bytes in flight +- Use awaited sends for FRAME_WINDOW_UPDATE and FRAME_WINDOW_UPDATE_BACK so updates are not dropped and streams do not deadlock under backpressure +- Clean up stream state when channel receivers have already exited instead of closing active streams because a bounded queue filled + ## 2026-03-17 - 4.8.3 - fix(protocol,edge) optimize tunnel frame handling and zero-copy uploads in edge I/O diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 4438ae4..5a8efd2 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -24,8 +24,9 @@ enum EdgeFrameAction { /// Per-stream state tracked in the edge's client_writers map. struct EdgeStreamState { - /// Channel to deliver FRAME_DATA_BACK payloads to the hub_to_client task. - back_tx: mpsc::Sender>, + /// Unbounded channel to deliver FRAME_DATA_BACK payloads to the hub_to_client task. + /// Unbounded because flow control (WINDOW_UPDATE) already limits bytes-in-flight. + back_tx: mpsc::UnboundedSender>, /// Send window for FRAME_DATA (upload direction). /// Decremented by the client reader, incremented by FRAME_WINDOW_UPDATE_BACK from hub. send_window: Arc, @@ -300,10 +301,13 @@ async fn handle_edge_frame( ) -> EdgeFrameAction { match frame.frame_type { FRAME_DATA_BACK => { + // Dispatch to per-stream unbounded channel. Flow control (WINDOW_UPDATE) + // limits bytes-in-flight, so the channel won't grow unbounded. send() only + // fails if the receiver is dropped (hub_to_client task already exited). let mut writers = client_writers.lock().await; if let Some(state) = writers.get(&frame.stream_id) { - if state.back_tx.try_send(frame.payload).is_err() { - log::warn!("Stream {} back-channel full, closing", frame.stream_id); + if state.back_tx.send(frame.payload).is_err() { + // Receiver dropped — hub_to_client task already exited, clean up writers.remove(&frame.stream_id); } } @@ -731,8 +735,10 @@ async fn handle_client_connection( return; } - // Set up channel for data coming back from hub (capacity 16 is sufficient with flow control) - let (back_tx, mut back_rx) = mpsc::channel::>(1024); + // Per-stream unbounded back-channel. Flow control (WINDOW_UPDATE) limits + // bytes-in-flight, so this won't grow unbounded. Unbounded avoids killing + // streams due to channel overflow — backpressure slows streams, never kills them. + let (back_tx, mut back_rx) = mpsc::unbounded_channel::>(); // Adaptive initial window: scale with current stream count to keep total in-flight // data within the 32MB budget. Prevents burst flooding when many streams open. let initial_window = remoteingress_protocol::compute_window_for_stream_count( @@ -779,10 +785,16 @@ async fn handle_client_connection( if consumed_since_update >= threshold { let increment = consumed_since_update.min(adaptive_window); let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment); - if wu_tx.try_send(frame).is_ok() { - consumed_since_update -= increment; + // Use send().await for guaranteed delivery — dropping WINDOW_UPDATEs + // causes permanent flow stalls. Safe: runs in per-stream task, not main loop. + tokio::select! { + result = wu_tx.send(frame) => { + if result.is_ok() { + consumed_since_update -= increment; + } + } + _ = hub_to_client_token.cancelled() => break, } - // If try_send fails, keep accumulating — retry on next threshold } } None => break, @@ -794,7 +806,7 @@ async fn handle_client_connection( // Send final window update for any remaining consumed bytes if consumed_since_update > 0 { let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update); - let _ = wu_tx.try_send(frame); + let _ = wu_tx.send(frame).await; } let _ = client_write.shutdown().await; }); diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 8986761..5983bf1 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -14,10 +14,6 @@ use remoteingress_protocol::*; type HubTlsStream = tokio_rustls::server::TlsStream; -/// Per-stream data channel capacity. With 4MB window and 32KB frames, -/// at most ~128 frames are in-flight. 256 provides comfortable headroom. -const PER_STREAM_DATA_CAPACITY: usize = 256; - /// Result of processing a frame. #[allow(dead_code)] enum FrameAction { @@ -27,8 +23,10 @@ enum FrameAction { /// Per-stream state tracked in the hub's stream map. struct HubStreamState { - /// Channel to deliver FRAME_DATA payloads to the upstream writer task. - data_tx: mpsc::Sender>, + /// Unbounded channel to deliver FRAME_DATA payloads to the upstream writer task. + /// Unbounded because flow control (WINDOW_UPDATE) already limits bytes-in-flight. + /// A bounded channel would kill streams instead of applying backpressure. + data_tx: mpsc::UnboundedSender>, /// Cancellation token for this stream. cancel_token: CancellationToken, /// Send window for FRAME_DATA_BACK (download direction). @@ -348,7 +346,7 @@ async fn handle_hub_frame( }); // Create channel for data from edge to this stream - let (stream_data_tx, mut stream_data_rx) = mpsc::channel::>(PER_STREAM_DATA_CAPACITY); + let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::>(); // Adaptive initial window: scale with current stream count // to keep total in-flight data within the 32MB budget. let initial_window = compute_window_for_stream_count( @@ -426,10 +424,16 @@ async fn handle_hub_frame( if consumed_since_update >= threshold { let increment = consumed_since_update.min(adaptive_window); let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment); - if wub_tx.try_send(frame).is_ok() { - consumed_since_update -= increment; + // Use send().await for guaranteed delivery — dropping WINDOW_UPDATEs + // causes permanent flow stalls. Safe: runs in per-stream task, not main loop. + tokio::select! { + result = wub_tx.send(frame) => { + if result.is_ok() { + consumed_since_update -= increment; + } + } + _ = writer_token.cancelled() => break, } - // If try_send fails, keep accumulating — retry on next threshold } } None => break, @@ -441,7 +445,7 @@ async fn handle_hub_frame( // Send final window update for remaining consumed bytes if consumed_since_update > 0 { let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update); - let _ = wub_tx.try_send(frame); + let _ = wub_tx.send(frame).await; } let _ = up_write.shutdown().await; }); @@ -534,14 +538,13 @@ async fn handle_hub_frame( }); } FRAME_DATA => { - // Non-blocking dispatch to per-stream channel. - // With flow control, the sender should rarely exceed the channel capacity. + // Dispatch to per-stream unbounded channel. Flow control (WINDOW_UPDATE) + // limits bytes-in-flight, so the channel won't grow unbounded. send() only + // fails if the receiver is dropped (stream handler already exited). if let Some(state) = streams.get(&frame.stream_id) { - if state.data_tx.try_send(frame.payload).is_err() { - log::warn!("Stream {} data channel full, closing stream", frame.stream_id); - if let Some(state) = streams.remove(&frame.stream_id) { - state.cancel_token.cancel(); - } + if state.data_tx.send(frame.payload).is_err() { + // Receiver dropped — stream handler already exited, clean up + streams.remove(&frame.stream_id); } } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 993b859..87c5505 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.3', + version: '4.8.4', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }