From 4e9041c6a760fec5ca7e72a7efc603b386362869 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 15 Mar 2026 17:01:27 +0000 Subject: [PATCH] fix(remoteingress-core): prevent stream data loss by applying backpressure and closing saturated channels --- changelog.md | 6 ++++++ rust/crates/remoteingress-core/src/edge.rs | 17 +++++++++++------ rust/crates/remoteingress-core/src/hub.rs | 19 +++++++++++++------ ts/00_commitinfo_data.ts | 2 +- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/changelog.md b/changelog.md index f10eb97..1629cdd 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2026-03-15 - 4.4.1 - fix(remoteingress-core) +prevent stream data loss by applying backpressure and closing saturated channels + +- replace non-blocking frame writes with awaited sends in per-stream tasks so large transfers respect backpressure instead of dropping data +- close and remove streams when back-channel or data channels fill up to avoid TCP stream corruption from silently dropped frames + ## 2026-03-03 - 4.4.0 - feat(remoteingress) add heartbeat PING/PONG and liveness timeouts; implement fast-reconnect/backoff reset and JS crash-recovery auto-restart diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index d19734e..bc5131d 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -407,11 +407,14 @@ async fn connect_to_hub_and_run( match frame.frame_type { FRAME_DATA_BACK => { - // A1: Non-blocking send to prevent head-of-line blocking - let writers = client_writers.lock().await; + // Non-blocking send to prevent head-of-line blocking in the main dispatch loop. + // If the per-stream channel is full, close the stream rather than silently + // dropping data (which would corrupt the TCP stream). + let mut writers = client_writers.lock().await; if let Some(tx) = writers.get(&frame.stream_id) { if tx.try_send(frame.payload).is_err() { - log::warn!("Stream {} back-channel full, dropping frame", frame.stream_id); + log::warn!("Stream {} back-channel full, closing stream to prevent data corruption", frame.stream_id); + writers.remove(&frame.stream_id); } } } @@ -635,9 +638,11 @@ async fn handle_client_connection( Ok(0) => break, Ok(n) => { let data_frame = encode_frame(stream_id, FRAME_DATA, &buf[..n]); - // A5: Use try_send to avoid blocking if writer channel is full - if tunnel_writer_tx.try_send(data_frame).is_err() { - log::warn!("Stream {} tunnel writer full, closing", stream_id); + // Use send().await for backpressure — this is a per-stream task so + // blocking only stalls this stream, not others. Prevents data loss + // for large transfers (e.g. 352MB Docker layers). + if tunnel_writer_tx.send(data_frame).await.is_err() { + log::warn!("Stream {} tunnel writer closed, closing", stream_id); break; } } diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 42211ac..4720e12 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -520,9 +520,11 @@ async fn handle_edge_connection( Ok(n) => { let frame = encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]); - // A5: Use try_send to avoid blocking if writer channel is full - if writer_tx.try_send(frame).is_err() { - log::warn!("Stream {} writer channel full, closing", stream_id); + // Use send().await for backpressure — this is a per-stream task so + // blocking only stalls this stream, not others. Prevents data loss + // for large transfers (e.g. 352MB Docker layers). + if writer_tx.send(frame).await.is_err() { + log::warn!("Stream {} writer channel closed, closing", stream_id); break; } } @@ -567,11 +569,16 @@ async fn handle_edge_connection( }); } FRAME_DATA => { - // A1: Non-blocking send to prevent head-of-line blocking - let s = streams.lock().await; + // Non-blocking send to prevent head-of-line blocking in the main dispatch loop. + // If the per-stream channel is full, close the stream rather than silently + // dropping data (which would corrupt the TCP stream). + let mut s = streams.lock().await; if let Some((tx, _)) = s.get(&frame.stream_id) { if tx.try_send(frame.payload).is_err() { - log::warn!("Stream {} data channel full, dropping frame", frame.stream_id); + log::warn!("Stream {} data channel full, closing stream to prevent data corruption", frame.stream_id); + if let Some((_, token)) = s.remove(&frame.stream_id) { + token.cancel(); + } } } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 95d599b..4ea0a19 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.4.0', + version: '4.4.1', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }