From 6ac4b37532a199bde1447de304f95483d62fc4a0 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 15 Mar 2026 18:16:10 +0000 Subject: [PATCH] fix(remoteingress-core): improve stream flow control retries and increase channel buffer capacity --- changelog.md | 6 ++++++ rust/crates/remoteingress-core/src/edge.rs | 11 ++++++----- rust/crates/remoteingress-core/src/hub.rs | 11 ++++++----- ts/00_commitinfo_data.ts | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/changelog.md b/changelog.md index cdf91b1..280b9f1 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2026-03-15 - 4.5.2 - fix(remoteingress-core) +improve stream flow control retries and increase channel buffer capacity + +- increase per-stream mpsc channel capacity from 128 to 256 on both edge and hub paths +- only reset accumulated window update bytes after a successful try_send to avoid dropping flow-control credits when the update channel is busy + ## 2026-03-15 - 4.5.1 - fix(protocol) increase per-stream flow control window and channel buffers to improve high-RTT throughput diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 5d666bc..6a9d089 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -625,7 +625,7 @@ async fn handle_client_connection( } // Set up channel for data coming back from hub (capacity 16 is sufficient with flow control) - let (back_tx, mut back_rx) = mpsc::channel::>(128); + let (back_tx, mut back_rx) = mpsc::channel::>(256); let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW)); let window_notify = Arc::new(Notify::new()); { @@ -657,10 +657,11 @@ async fn handle_client_connection( // Track consumption for flow control consumed_since_update += len; if consumed_since_update >= WINDOW_UPDATE_THRESHOLD { - let increment = consumed_since_update; - consumed_since_update = 0; - let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment); - let _ = wu_tx.try_send(frame); + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update); + if wu_tx.try_send(frame).is_ok() { + consumed_since_update = 0; + } + // If try_send fails, keep accumulating — retry on next threshold } } None => break, diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index b99e9a6..bbe389e 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -477,7 +477,7 @@ async fn handle_edge_connection( }); // Create channel for data from edge to this stream (capacity 16 is sufficient with flow control) - let (data_tx, mut data_rx) = mpsc::channel::>(128); + let (data_tx, mut data_rx) = mpsc::channel::>(256); let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW)); let window_notify = Arc::new(Notify::new()); { @@ -528,10 +528,11 @@ async fn handle_edge_connection( // Track consumption for flow control consumed_since_update += len; if consumed_since_update >= WINDOW_UPDATE_THRESHOLD { - let increment = consumed_since_update; - consumed_since_update = 0; - let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment); - let _ = wub_tx.try_send(frame); + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update); + if wub_tx.try_send(frame).is_ok() { + consumed_since_update = 0; + } + // If try_send fails, keep accumulating — retry on next threshold } } None => break, diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 19b40d0..14ef5d9 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.5.1', + version: '4.5.2', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }