diff --git a/changelog.md b/changelog.md index 5c1d81f..9218db0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-18 - 4.8.19 - fix(remoteingress-protocol) +reduce per-stream flow control windows and increase control channel buffering + +- Lower the initial and maximum per-stream window from 16MB to 4MB and scale adaptive windows against a 200MB total budget with a 1MB minimum. +- Increase edge and hub control frame channel capacity from 256 to 512 to better handle prioritized control traffic. +- Update flow-control tests and comments to reflect the new window sizing and budget behavior. + ## 2026-03-17 - 4.8.18 - fix(rust-protocol) switch tunnel frame buffers from Vec to Bytes to reduce copying and memory overhead diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 18fae9e..5bf1542 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -497,7 +497,7 @@ async fn connect_to_hub_and_run( // QoS dual-channel: ctrl frames have priority over data frames. // Stream handlers send through these channels → TunnelIo drains them. - let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::(256); + let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::(512); let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::(4096); let tunnel_writer_tx = tunnel_ctrl_tx.clone(); @@ -756,7 +756,7 @@ async fn handle_client_connection( // streams due to channel overflow — backpressure slows streams, never kills them. let (back_tx, mut back_rx) = mpsc::unbounded_channel::(); // Adaptive initial window: scale with current stream count to keep total in-flight - // data within the 32MB budget. Prevents burst flooding when many streams open. + // data within the 200MB budget. Prevents burst flooding when many streams open. let initial_window = remoteingress_protocol::compute_window_for_stream_count( active_streams.load(Ordering::Relaxed), ); diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 1a50892..85ad108 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -349,7 +349,7 @@ async fn handle_hub_frame( // Create channel for data from edge to this stream let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::(); // Adaptive initial window: scale with current stream count - // to keep total in-flight data within the 32MB budget. + // to keep total in-flight data within the 200MB budget. let initial_window = compute_window_for_stream_count( edge_stream_count.load(Ordering::Relaxed), ); @@ -708,7 +708,7 @@ async fn handle_edge_connection( // QoS dual-channel: ctrl frames have priority over data frames. // Stream handlers send through these channels -> TunnelIo drains them. - let (ctrl_tx, mut ctrl_rx) = mpsc::channel::(256); + let (ctrl_tx, mut ctrl_rx) = mpsc::channel::(512); let (data_tx, mut data_rx) = mpsc::channel::(4096); // Spawn task to forward config updates as FRAME_CONFIG frames diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index bf37e16..3468d9d 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -24,12 +24,12 @@ pub const FRAME_HEADER_SIZE: usize = 9; pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024; // Per-stream flow control constants -/// Initial (and maximum) per-stream window size (16 MB). -pub const INITIAL_STREAM_WINDOW: u32 = 16 * 1024 * 1024; +/// Initial (and maximum) per-stream window size (4 MB). +pub const INITIAL_STREAM_WINDOW: u32 = 4 * 1024 * 1024; /// Send WINDOW_UPDATE after consuming this many bytes (half the initial window). pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2; /// Maximum window size to prevent overflow. -pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; +pub const MAX_WINDOW_SIZE: u32 = 4 * 1024 * 1024; /// Encode a WINDOW_UPDATE frame for a specific stream. pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes { @@ -37,11 +37,11 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> B } /// Compute the target per-stream window size based on the number of active streams. -/// Total memory budget is ~800MB shared across all streams. Up to 50 streams get the -/// full 16MB window; above that the window scales down to a 4MB floor at 200+ streams. +/// Total memory budget is ~200MB shared across all streams. Up to 50 streams get the +/// full 4MB window; above that the window scales down to a 1MB floor at 200+ streams. pub fn compute_window_for_stream_count(active: u32) -> u32 { - let per_stream = (800 * 1024 * 1024u64) / (active.max(1) as u64); - per_stream.clamp(4 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32 + let per_stream = (200 * 1024 * 1024u64) / (active.max(1) as u64); + per_stream.clamp(1 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32 } /// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed. @@ -684,7 +684,7 @@ mod tests { #[test] fn test_adaptive_window_zero_streams() { - // 0 streams treated as 1: 800MB/1 -> clamped to 16MB max + // 0 streams treated as 1: 200MB/1 -> clamped to 4MB max assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW); } @@ -695,40 +695,40 @@ mod tests { #[test] fn test_adaptive_window_50_streams_full() { - // 800MB/50 = 16MB = exactly INITIAL_STREAM_WINDOW + // 200MB/50 = 4MB = exactly INITIAL_STREAM_WINDOW assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW); } #[test] fn test_adaptive_window_51_streams_starts_scaling() { - // 800MB/51 < 16MB — first value below max + // 200MB/51 < 4MB — first value below max let w = compute_window_for_stream_count(51); assert!(w < INITIAL_STREAM_WINDOW); - assert_eq!(w, (800 * 1024 * 1024u64 / 51) as u32); + assert_eq!(w, (200 * 1024 * 1024u64 / 51) as u32); } #[test] fn test_adaptive_window_100_streams() { - // 800MB/100 = 8MB - assert_eq!(compute_window_for_stream_count(100), 8 * 1024 * 1024); + // 200MB/100 = 2MB + assert_eq!(compute_window_for_stream_count(100), 2 * 1024 * 1024); } #[test] fn test_adaptive_window_200_streams_at_floor() { - // 800MB/200 = 4MB = exactly the floor - assert_eq!(compute_window_for_stream_count(200), 4 * 1024 * 1024); + // 200MB/200 = 1MB = exactly the floor + assert_eq!(compute_window_for_stream_count(200), 1 * 1024 * 1024); } #[test] fn test_adaptive_window_500_streams_clamped() { - // 800MB/500 = 1.6MB -> clamped up to 4MB floor - assert_eq!(compute_window_for_stream_count(500), 4 * 1024 * 1024); + // 200MB/500 = 0.4MB -> clamped up to 1MB floor + assert_eq!(compute_window_for_stream_count(500), 1 * 1024 * 1024); } #[test] fn test_adaptive_window_max_u32() { - // Extreme: u32::MAX streams -> tiny value -> clamped to 4MB - assert_eq!(compute_window_for_stream_count(u32::MAX), 4 * 1024 * 1024); + // Extreme: u32::MAX streams -> tiny value -> clamped to 1MB + assert_eq!(compute_window_for_stream_count(u32::MAX), 1 * 1024 * 1024); } #[test] @@ -743,11 +743,11 @@ mod tests { #[test] fn test_adaptive_window_total_budget_bounded() { - // active x per_stream_window should never exceed 800MB (+ clamp overhead for high N) + // active x per_stream_window should never exceed 200MB (+ clamp overhead for high N) for n in [1, 10, 50, 100, 200] { let w = compute_window_for_stream_count(n); let total = w as u64 * n as u64; - assert!(total <= 800 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n); + assert!(total <= 200 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n); } } diff --git a/test/test.flowcontrol.node.ts b/test/test.flowcontrol.node.ts index 0125b4a..f33b1b2 100644 --- a/test/test.flowcontrol.node.ts +++ b/test/test.flowcontrol.node.ts @@ -324,7 +324,7 @@ tap.test('setup: start echo server and tunnel', async () => { expect(tunnel.hub.running).toBeTrue(); }); -tap.test('single stream: 32MB transfer exceeding initial 4MB window', async () => { +tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple refills)', async () => { const size = 32 * 1024 * 1024; const data = crypto.randomBytes(size); const expectedHash = sha256(data); @@ -392,7 +392,7 @@ tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => { } }); -tap.test('100 streams x 1MB each (100MB total exceeding 32MB budget)', async () => { +tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async () => { const streamCount = 100; const payloadSize = 1 * 1024 * 1024; @@ -446,7 +446,7 @@ tap.test('active stream counter tracks concurrent connections', async () => { }); tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => { - // At 50 concurrent streams: adaptive window = 32MB/50 = 655KB per stream + // At 50 concurrent streams: adaptive window = 200MB/50 = 4MB per stream // Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream const streamCount = 50; const payloadSize = 2 * 1024 * 1024; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index c1a5df6..710d576 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.18', + version: '4.8.19', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }