From 124df129ecaa35bee61beb02fac1249e6eca8d75 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 17 Mar 2026 22:46:55 +0000 Subject: [PATCH] fix(protocol): increase per-stream flow control windows and remove adaptive read caps --- changelog.md | 7 ++ rust/crates/remoteingress-core/src/edge.rs | 6 +- rust/crates/remoteingress-core/src/hub.rs | 6 +- rust/crates/remoteingress-protocol/src/lib.rs | 91 ++++++------------- ts/00_commitinfo_data.ts | 2 +- 5 files changed, 38 insertions(+), 74 deletions(-) diff --git a/changelog.md b/changelog.md index 9688da2..82f2cde 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.8.17 - fix(protocol) +increase per-stream flow control windows and remove adaptive read caps + +- Raise the initial per-stream window from 4MB to 16MB and expand the adaptive window budget to 800MB with a 4MB floor +- Stop limiting edge and hub reads by the adaptive per-stream target window, keeping reads capped only by the current window and 32KB chunk size +- Update protocol tests to match the new adaptive window scaling and budget boundaries + ## 2026-03-17 - 4.8.16 - fix(release) bump package version to 4.8.15 diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 104bbd5..3b6d3dd 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -862,11 +862,7 @@ async fn handle_client_connection( log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id); break; } - // Adaptive: cap read to current per-stream target window - let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count( - active_streams.load(Ordering::Relaxed), - ) as usize; - let max_read = w.min(32768).min(adaptive_cap); + let max_read = w.min(32768); tokio::select! { read_result = client_read.read(&mut buf[FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + max_read]) => { diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 9aa9803..c3eef0c 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -487,11 +487,7 @@ async fn handle_hub_frame( log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id); break; } - // Adaptive: cap read to current per-stream target window - let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count( - stream_counter.load(Ordering::Relaxed), - ) as usize; - let max_read = w.min(32768).min(adaptive_cap); + let max_read = w.min(32768); tokio::select! { read_result = up_read.read(&mut buf[FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + max_read]) => { diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 627e1c7..523372b 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -23,9 +23,8 @@ pub const FRAME_HEADER_SIZE: usize = 9; pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024; // Per-stream flow control constants -/// Initial per-stream window size (4 MB). Sized for full throughput at high RTT: -/// at 100ms RTT, this sustains ~40 MB/s per stream. -pub const INITIAL_STREAM_WINDOW: u32 = 4 * 1024 * 1024; +/// Initial (and maximum) per-stream window size (16 MB). +pub const INITIAL_STREAM_WINDOW: u32 = 16 * 1024 * 1024; /// Send WINDOW_UPDATE after consuming this many bytes (half the initial window). pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2; /// Maximum window size to prevent overflow. @@ -37,12 +36,11 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> V } /// Compute the target per-stream window size based on the number of active streams. -/// Total memory budget is ~32MB shared across all streams. As more streams are active, -/// each gets a smaller window. This adapts to current demand — few streams get high -/// throughput, many streams save memory and reduce control frame pressure. +/// Total memory budget is ~800MB shared across all streams. Up to 50 streams get the +/// full 16MB window; above that the window scales down to a 4MB floor at 200+ streams. pub fn compute_window_for_stream_count(active: u32) -> u32 { - let per_stream = (32 * 1024 * 1024u64) / (active.max(1) as u64); - per_stream.clamp(64 * 1024, INITIAL_STREAM_WINDOW as u64) as u32 + let per_stream = (800 * 1024 * 1024u64) / (active.max(1) as u64); + per_stream.clamp(4 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32 } /// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed. @@ -683,90 +681,57 @@ mod tests { #[test] fn test_adaptive_window_zero_streams() { - // 0 streams treated as 1: 32MB/1 = 32MB → clamped to 4MB max + // 0 streams treated as 1: 800MB/1 → clamped to 16MB max assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW); } #[test] fn test_adaptive_window_one_stream() { - // 32MB/1 = 32MB → clamped to 4MB max assert_eq!(compute_window_for_stream_count(1), INITIAL_STREAM_WINDOW); } #[test] - fn test_adaptive_window_at_max_boundary() { - // 32MB/8 = 4MB = exactly INITIAL_STREAM_WINDOW - assert_eq!(compute_window_for_stream_count(8), INITIAL_STREAM_WINDOW); + fn test_adaptive_window_50_streams_full() { + // 800MB/50 = 16MB = exactly INITIAL_STREAM_WINDOW + assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW); } #[test] - fn test_adaptive_window_just_below_max() { - // 32MB/9 = 3,728,270 — first value below INITIAL_STREAM_WINDOW - let w = compute_window_for_stream_count(9); + fn test_adaptive_window_51_streams_starts_scaling() { + // 800MB/51 < 16MB — first value below max + let w = compute_window_for_stream_count(51); assert!(w < INITIAL_STREAM_WINDOW); - assert_eq!(w, (32 * 1024 * 1024u64 / 9) as u32); - } - - #[test] - fn test_adaptive_window_16_streams() { - // 32MB/16 = 2MB - assert_eq!(compute_window_for_stream_count(16), 2 * 1024 * 1024); + assert_eq!(w, (800 * 1024 * 1024u64 / 51) as u32); } #[test] fn test_adaptive_window_100_streams() { - // 32MB/100 = 335,544 bytes (~327KB) - let w = compute_window_for_stream_count(100); - assert_eq!(w, (32 * 1024 * 1024u64 / 100) as u32); - assert!(w > 64 * 1024); // above floor - assert!(w < INITIAL_STREAM_WINDOW as u32); // below ceiling + // 800MB/100 = 8MB + assert_eq!(compute_window_for_stream_count(100), 8 * 1024 * 1024); } #[test] - fn test_adaptive_window_200_streams() { - // 32MB/200 = 167,772 bytes (~163KB), above 64KB floor - let w = compute_window_for_stream_count(200); - assert_eq!(w, (32 * 1024 * 1024u64 / 200) as u32); - assert!(w > 64 * 1024); + fn test_adaptive_window_200_streams_at_floor() { + // 800MB/200 = 4MB = exactly the floor + assert_eq!(compute_window_for_stream_count(200), 4 * 1024 * 1024); } #[test] - fn test_adaptive_window_500_streams() { - // 32MB/500 = 67,108 bytes (~65.5KB), just above 64KB floor - let w = compute_window_for_stream_count(500); - assert_eq!(w, (32 * 1024 * 1024u64 / 500) as u32); - assert!(w > 64 * 1024); - } - - #[test] - fn test_adaptive_window_at_min_boundary() { - // 32MB/512 = 65,536 = exactly 64KB floor - assert_eq!(compute_window_for_stream_count(512), 64 * 1024); - } - - #[test] - fn test_adaptive_window_below_min_clamped() { - // 32MB/513 = 65,408 → clamped up to 64KB - assert_eq!(compute_window_for_stream_count(513), 64 * 1024); - } - - #[test] - fn test_adaptive_window_1000_streams() { - // 32MB/1000 = 33,554 → clamped to 64KB - assert_eq!(compute_window_for_stream_count(1000), 64 * 1024); + fn test_adaptive_window_500_streams_clamped() { + // 800MB/500 = 1.6MB → clamped up to 4MB floor + assert_eq!(compute_window_for_stream_count(500), 4 * 1024 * 1024); } #[test] fn test_adaptive_window_max_u32() { - // Extreme: u32::MAX streams → tiny value → clamped to 64KB - assert_eq!(compute_window_for_stream_count(u32::MAX), 64 * 1024); + // Extreme: u32::MAX streams → tiny value → clamped to 4MB + assert_eq!(compute_window_for_stream_count(u32::MAX), 4 * 1024 * 1024); } #[test] fn test_adaptive_window_monotonically_decreasing() { - // Window should decrease (or stay same) as stream count increases let mut prev = compute_window_for_stream_count(1); - for n in [2, 5, 10, 50, 100, 200, 500, 512, 1000] { + for n in [2, 10, 50, 51, 100, 200, 500, 1000] { let w = compute_window_for_stream_count(n); assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n); prev = w; @@ -775,11 +740,11 @@ mod tests { #[test] fn test_adaptive_window_total_budget_bounded() { - // active × per_stream_window should never exceed 32MB (+ clamp overhead for high N) - for n in [1, 10, 50, 100, 200, 500] { + // active × per_stream_window should never exceed 800MB (+ clamp overhead for high N) + for n in [1, 10, 50, 100, 200] { let w = compute_window_for_stream_count(n); let total = w as u64 * n as u64; - assert!(total <= 32 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n); + assert!(total <= 800 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n); } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 5fa3c5c..2a492a2 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.16', + version: '4.8.17', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }