diff --git a/changelog.md b/changelog.md index def07e8..fa83702 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-16 - 4.6.0 - feat(remoteingress-core) +add adaptive per-stream flow control based on active stream counts + +- Track active stream counts on edge and hub connections to size per-stream flow control windows dynamically. +- Cap WINDOW_UPDATE increments and read sizes to the adaptive window so bandwidth is shared more evenly across concurrent streams. +- Apply the adaptive logic to both upload and download paths on edge and hub stream handlers. + ## 2026-03-16 - 4.5.12 - fix(remoteingress-core) improve tunnel liveness handling and enable TCP keepalive for accepted client sockets diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index e25e16e..157bf98 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -620,6 +620,7 @@ fn apply_port_config( tunnel_data_tx, client_writers, client_token, + Arc::clone(&active_streams), ) .await; active_streams.fetch_sub(1, Ordering::Relaxed); @@ -651,6 +652,7 @@ async fn handle_client_connection( tunnel_data_tx: mpsc::Sender>, client_writers: Arc>>, client_token: CancellationToken, + active_streams: Arc, ) { let client_ip = client_addr.ip().to_string(); let client_port = client_addr.port(); @@ -684,6 +686,7 @@ async fn handle_client_connection( // After writing to client TCP, send WINDOW_UPDATE to hub so it can send more let hub_to_client_token = client_token.clone(); let wu_tx = tunnel_ctrl_tx.clone(); + let active_streams_h2c = Arc::clone(&active_streams); let mut hub_to_client = tokio::spawn(async move { let mut consumed_since_update: u32 = 0; loop { @@ -695,12 +698,20 @@ async fn handle_client_connection( if client_write.write_all(&data).await.is_err() { break; } - // Track consumption for flow control + // Track consumption for adaptive flow control. + // The increment is capped to the adaptive window so the sender's + // effective window shrinks to match current demand (fewer streams + // = larger window, more streams = smaller window per stream). consumed_since_update += len; - if consumed_since_update >= WINDOW_UPDATE_THRESHOLD { - let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update); + let adaptive_window = remoteingress_protocol::compute_window_for_stream_count( + active_streams_h2c.load(Ordering::Relaxed), + ); + let threshold = adaptive_window / 2; + if consumed_since_update >= threshold { + let increment = consumed_since_update.min(adaptive_window); + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment); if wu_tx.try_send(frame).is_ok() { - consumed_since_update = 0; + consumed_since_update -= increment; } // If try_send fails, keep accumulating — retry on next threshold } @@ -746,7 +757,11 @@ async fn handle_client_connection( log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id); break; } - let max_read = w.min(buf.len()); + // Adaptive: cap read to current per-stream target window + let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count( + active_streams.load(Ordering::Relaxed), + ) as usize; + let max_read = w.min(buf.len()).min(adaptive_cap); tokio::select! { read_result = client_read.read(&mut buf[..max_read]) => { diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 56e280f..205af89 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -373,6 +373,9 @@ async fn handle_edge_connection( ); } + // Per-edge active stream counter for adaptive flow control + let edge_stream_count = Arc::new(AtomicU32::new(0)); + // QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE) // have priority over data frames (DATA_BACK). This prevents PING starvation under load. let (ctrl_tx, mut ctrl_rx) = mpsc::channel::>(64); @@ -509,8 +512,10 @@ async fn handle_edge_connection( } // Spawn task: connect to SmartProxy, send PROXY header, pipe data + let stream_counter = Arc::clone(&edge_stream_count); tokio::spawn(async move { let _permit = permit; // hold semaphore permit until stream completes + stream_counter.fetch_add(1, Ordering::Relaxed); let result = async { // A2: Connect to SmartProxy with timeout @@ -533,6 +538,7 @@ async fn handle_edge_connection( // After writing to upstream, send WINDOW_UPDATE_BACK to edge let writer_token = stream_token.clone(); let wub_tx = writer_tx.clone(); + let stream_counter_w = Arc::clone(&stream_counter); let writer_for_edge_data = tokio::spawn(async move { let mut consumed_since_update: u32 = 0; loop { @@ -558,12 +564,18 @@ async fn handle_edge_connection( break; } } - // Track consumption for flow control + // Track consumption for adaptive flow control. + // Increment capped to adaptive window to limit per-stream in-flight data. consumed_since_update += len; - if consumed_since_update >= WINDOW_UPDATE_THRESHOLD { - let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update); + let adaptive_window = remoteingress_protocol::compute_window_for_stream_count( + stream_counter_w.load(Ordering::Relaxed), + ); + let threshold = adaptive_window / 2; + if consumed_since_update >= threshold { + let increment = consumed_since_update.min(adaptive_window); + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment); if wub_tx.try_send(frame).is_ok() { - consumed_since_update = 0; + consumed_since_update -= increment; } // If try_send fails, keep accumulating — retry on next threshold } @@ -610,7 +622,11 @@ async fn handle_edge_connection( log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id); break; } - let max_read = w.min(buf.len()); + // Adaptive: cap read to current per-stream target window + let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count( + stream_counter.load(Ordering::Relaxed), + ) as usize; + let max_read = w.min(buf.len()).min(adaptive_cap); tokio::select! { read_result = up_read.read(&mut buf[..max_read]) => { @@ -665,6 +681,7 @@ async fn handle_edge_connection( stream_id, }); } + stream_counter.fetch_sub(1, Ordering::Relaxed); }); } FRAME_DATA => { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index d68b387..bb827b8 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.5.12', + version: '4.6.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }