From 761551596b9f8840fd366685c706086826378afe Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 15 Mar 2026 17:33:59 +0000 Subject: [PATCH] feat(remoteingress-core): add per-stream flow control for edge and hub tunnel data transfer --- changelog.md | 7 ++ rust/crates/remoteingress-core/src/edge.rs | 101 +++++++++++++---- rust/crates/remoteingress-core/src/hub.rs | 106 ++++++++++++++---- rust/crates/remoteingress-protocol/src/lib.rs | 23 ++++ ts/00_commitinfo_data.ts | 2 +- 5 files changed, 198 insertions(+), 41 deletions(-) diff --git a/changelog.md b/changelog.md index 1629cdd..7143bfc 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-15 - 4.5.0 - feat(remoteingress-core) +add per-stream flow control for edge and hub tunnel data transfer + +- introduce WINDOW_UPDATE frame types and protocol helpers for per-stream flow control +- track per-stream send windows on both edge and hub to limit reads based on available capacity +- send window updates after downstream writes to reduce channel pressure during large transfers + ## 2026-03-15 - 4.4.1 - fix(remoteingress-core) prevent stream data loss by applying backpressure and closing saturated channels diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index bc5131d..f44a09d 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::sync::{mpsc, Mutex, Notify, RwLock}; use tokio::task::JoinHandle; use tokio::time::{Instant, sleep_until}; use tokio_rustls::TlsConnector; @@ -13,6 +13,17 @@ use serde::{Deserialize, Serialize}; use remoteingress_protocol::*; +/// Per-stream state tracked in the edge's client_writers map. +struct EdgeStreamState { + /// Channel to deliver FRAME_DATA_BACK payloads to the hub_to_client task. + back_tx: mpsc::Sender>, + /// Send window for FRAME_DATA (upload direction). + /// Decremented by the client reader, incremented by FRAME_WINDOW_UPDATE_BACK from hub. + send_window: Arc, + /// Notifier to wake the client reader when the window opens. + window_notify: Arc, +} + /// Edge configuration (hub-host + credentials only; ports come from hub). #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -351,8 +362,8 @@ async fn connect_to_hub_and_run( } }); - // Client socket map: stream_id -> sender for writing data back to client - let client_writers: Arc>>>> = + // Client socket map: stream_id -> per-stream state (back channel + flow control) + let client_writers: Arc>> = Arc::new(Mutex::new(HashMap::new())); // A5: Channel-based tunnel writer replaces Arc> @@ -407,17 +418,31 @@ async fn connect_to_hub_and_run( match frame.frame_type { FRAME_DATA_BACK => { - // Non-blocking send to prevent head-of-line blocking in the main dispatch loop. - // If the per-stream channel is full, close the stream rather than silently - // dropping data (which would corrupt the TCP stream). + // Non-blocking dispatch to per-stream channel. + // With flow control, the sender should rarely exceed the channel capacity. let mut writers = client_writers.lock().await; - if let Some(tx) = writers.get(&frame.stream_id) { - if tx.try_send(frame.payload).is_err() { - log::warn!("Stream {} back-channel full, closing stream to prevent data corruption", frame.stream_id); + if let Some(state) = writers.get(&frame.stream_id) { + if state.back_tx.try_send(frame.payload).is_err() { + log::warn!("Stream {} back-channel full, closing stream", frame.stream_id); writers.remove(&frame.stream_id); } } } + FRAME_WINDOW_UPDATE_BACK => { + // Hub consumed data — increase our send window for this stream (upload direction) + if let Some(increment) = decode_window_update(&frame.payload) { + if increment > 0 { + let writers = client_writers.lock().await; + if let Some(state) = writers.get(&frame.stream_id) { + let prev = state.send_window.fetch_add(increment, Ordering::Release); + if prev + increment > MAX_WINDOW_SIZE { + state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release); + } + state.window_notify.notify_one(); + } + } + } + } FRAME_CLOSE_BACK => { let mut writers = client_writers.lock().await; writers.remove(&frame.stream_id); @@ -495,7 +520,7 @@ fn apply_port_config( new_ports: &[u16], port_listeners: &mut HashMap>, tunnel_writer_tx: &mpsc::Sender>, - client_writers: &Arc>>>>, + client_writers: &Arc>>, active_streams: &Arc, next_stream_id: &Arc, edge_id: &str, @@ -583,7 +608,7 @@ async fn handle_client_connection( dest_port: u16, edge_id: &str, tunnel_writer_tx: mpsc::Sender>, - client_writers: Arc>>>>, + client_writers: Arc>>, client_token: CancellationToken, ) { let client_ip = client_addr.ip().to_string(); @@ -599,26 +624,44 @@ async fn handle_client_connection( return; } - // Set up channel for data coming back from hub - let (back_tx, mut back_rx) = mpsc::channel::>(256); + // Set up channel for data coming back from hub (capacity 16 is sufficient with flow control) + let (back_tx, mut back_rx) = mpsc::channel::>(16); + let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW)); + let window_notify = Arc::new(Notify::new()); { let mut writers = client_writers.lock().await; - writers.insert(stream_id, back_tx); + writers.insert(stream_id, EdgeStreamState { + back_tx, + send_window: Arc::clone(&send_window), + window_notify: Arc::clone(&window_notify), + }); } let (mut client_read, mut client_write) = client_stream.into_split(); - // Task: hub -> client + // Task: hub -> client (download direction) + // After writing to client TCP, send WINDOW_UPDATE to hub so it can send more let hub_to_client_token = client_token.clone(); + let wu_tx = tunnel_writer_tx.clone(); let hub_to_client = tokio::spawn(async move { + let mut consumed_since_update: u32 = 0; loop { tokio::select! { data = back_rx.recv() => { match data { Some(data) => { + let len = data.len() as u32; if client_write.write_all(&data).await.is_err() { break; } + // Track consumption for flow control + consumed_since_update += len; + if consumed_since_update >= WINDOW_UPDATE_THRESHOLD { + let increment = consumed_since_update; + consumed_since_update = 0; + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment); + let _ = wu_tx.try_send(frame); + } } None => break, } @@ -626,21 +669,39 @@ async fn handle_client_connection( _ = hub_to_client_token.cancelled() => break, } } + // Send final window update for any remaining consumed bytes + if consumed_since_update > 0 { + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update); + let _ = wu_tx.try_send(frame); + } let _ = client_write.shutdown().await; }); - // Task: client -> hub (via writer channel) + // Task: client -> hub (upload direction) with per-stream flow control let mut buf = vec![0u8; 32768]; loop { + // Wait for send window to have capacity + loop { + let w = send_window.load(Ordering::Acquire); + if w > 0 { break; } + tokio::select! { + _ = window_notify.notified() => continue, + _ = client_token.cancelled() => break, + } + } + if client_token.is_cancelled() { break; } + + // Limit read size to available window + let w = send_window.load(Ordering::Acquire) as usize; + let max_read = w.min(buf.len()); + tokio::select! { - read_result = client_read.read(&mut buf) => { + read_result = client_read.read(&mut buf[..max_read]) => { match read_result { Ok(0) => break, Ok(n) => { + send_window.fetch_sub(n as u32, Ordering::Release); let data_frame = encode_frame(stream_id, FRAME_DATA, &buf[..n]); - // Use send().await for backpressure — this is a per-stream task so - // blocking only stalls this stream, not others. Prevents data loss - // for large transfers (e.g. 352MB Docker layers). if tunnel_writer_tx.send(data_frame).await.is_err() { log::warn!("Stream {} tunnel writer closed, closing", stream_id); break; diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 4720e12..c1b386b 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{mpsc, Mutex, RwLock, Semaphore}; +use tokio::sync::{mpsc, Mutex, Notify, RwLock, Semaphore}; use tokio::time::{interval, sleep_until, Instant}; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; @@ -11,6 +12,19 @@ use serde::{Deserialize, Serialize}; use remoteingress_protocol::*; +/// Per-stream state tracked in the hub's stream map. +struct HubStreamState { + /// Channel to deliver FRAME_DATA payloads to the upstream writer task. + data_tx: mpsc::Sender>, + /// Cancellation token for this stream. + cancel_token: CancellationToken, + /// Send window for FRAME_DATA_BACK (download direction). + /// Decremented by the upstream reader, incremented by FRAME_WINDOW_UPDATE from edge. + send_window: Arc, + /// Notifier to wake the upstream reader when the window opens. + window_notify: Arc, +} + /// Hub configuration. #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -109,7 +123,7 @@ pub struct TunnelHub { struct ConnectedEdgeInfo { connected_at: u64, peer_addr: String, - active_streams: Arc>, CancellationToken)>>>, + active_streams: Arc>>, config_tx: mpsc::Sender, #[allow(dead_code)] // kept alive for Drop — cancels child tokens when edge is removed cancel_token: CancellationToken, @@ -333,7 +347,7 @@ async fn handle_edge_connection( write_half.write_all(handshake_json.as_bytes()).await?; // Track this edge - let streams: Arc>, CancellationToken)>>> = + let streams: Arc>> = Arc::new(Mutex::new(HashMap::new())); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -462,11 +476,18 @@ async fn handle_edge_connection( stream_id, }); - // Create channel for data from edge to this stream - let (data_tx, mut data_rx) = mpsc::channel::>(256); + // Create channel for data from edge to this stream (capacity 16 is sufficient with flow control) + let (data_tx, mut data_rx) = mpsc::channel::>(16); + let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW)); + let window_notify = Arc::new(Notify::new()); { let mut s = streams.lock().await; - s.insert(stream_id, (data_tx, stream_token.clone())); + s.insert(stream_id, HubStreamState { + data_tx, + cancel_token: stream_token.clone(), + send_window: Arc::clone(&send_window), + window_notify: Arc::clone(&window_notify), + }); } // Spawn task: connect to SmartProxy, send PROXY header, pipe data @@ -490,16 +511,28 @@ async fn handle_edge_connection( upstream.into_split(); // Forward data from edge (via channel) to SmartProxy + // After writing to upstream, send WINDOW_UPDATE_BACK to edge let writer_token = stream_token.clone(); + let wub_tx = writer_tx.clone(); let writer_for_edge_data = tokio::spawn(async move { + let mut consumed_since_update: u32 = 0; loop { tokio::select! { data = data_rx.recv() => { match data { Some(data) => { + let len = data.len() as u32; if up_write.write_all(&data).await.is_err() { break; } + // Track consumption for flow control + consumed_since_update += len; + if consumed_since_update >= WINDOW_UPDATE_THRESHOLD { + let increment = consumed_since_update; + consumed_since_update = 0; + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment); + let _ = wub_tx.try_send(frame); + } } None => break, } @@ -507,22 +540,41 @@ async fn handle_edge_connection( _ = writer_token.cancelled() => break, } } + // Send final window update for remaining consumed bytes + if consumed_since_update > 0 { + let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update); + let _ = wub_tx.try_send(frame); + } let _ = up_write.shutdown().await; }); // Forward data from SmartProxy back to edge via writer channel + // with per-stream flow control (check send_window before reading) let mut buf = vec![0u8; 32768]; loop { + // Wait for send window to have capacity + loop { + let w = send_window.load(Ordering::Acquire); + if w > 0 { break; } + tokio::select! { + _ = window_notify.notified() => continue, + _ = stream_token.cancelled() => break, + } + } + if stream_token.is_cancelled() { break; } + + // Limit read size to available window + let w = send_window.load(Ordering::Acquire) as usize; + let max_read = w.min(buf.len()); + tokio::select! { - read_result = up_read.read(&mut buf) => { + read_result = up_read.read(&mut buf[..max_read]) => { match read_result { Ok(0) => break, Ok(n) => { + send_window.fetch_sub(n as u32, Ordering::Release); let frame = encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]); - // Use send().await for backpressure — this is a per-stream task so - // blocking only stalls this stream, not others. Prevents data loss - // for large transfers (e.g. 352MB Docker layers). if writer_tx.send(frame).await.is_err() { log::warn!("Stream {} writer channel closed, closing", stream_id); break; @@ -569,23 +621,37 @@ async fn handle_edge_connection( }); } FRAME_DATA => { - // Non-blocking send to prevent head-of-line blocking in the main dispatch loop. - // If the per-stream channel is full, close the stream rather than silently - // dropping data (which would corrupt the TCP stream). + // Non-blocking dispatch to per-stream channel. + // With flow control, the sender should rarely exceed the channel capacity. let mut s = streams.lock().await; - if let Some((tx, _)) = s.get(&frame.stream_id) { - if tx.try_send(frame.payload).is_err() { - log::warn!("Stream {} data channel full, closing stream to prevent data corruption", frame.stream_id); - if let Some((_, token)) = s.remove(&frame.stream_id) { - token.cancel(); + if let Some(state) = s.get(&frame.stream_id) { + if state.data_tx.try_send(frame.payload).is_err() { + log::warn!("Stream {} data channel full, closing stream", frame.stream_id); + if let Some(state) = s.remove(&frame.stream_id) { + state.cancel_token.cancel(); + } + } + } + } + FRAME_WINDOW_UPDATE => { + // Edge consumed data — increase our send window for this stream + if let Some(increment) = decode_window_update(&frame.payload) { + if increment > 0 { + let s = streams.lock().await; + if let Some(state) = s.get(&frame.stream_id) { + let prev = state.send_window.fetch_add(increment, Ordering::Release); + if prev + increment > MAX_WINDOW_SIZE { + state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release); + } + state.window_notify.notify_one(); } } } } FRAME_CLOSE => { let mut s = streams.lock().await; - if let Some((_, token)) = s.remove(&frame.stream_id) { - token.cancel(); + if let Some(state) = s.remove(&frame.stream_id) { + state.cancel_token.cancel(); let _ = event_tx.try_send(HubEvent::StreamClosed { edge_id: edge_id.clone(), stream_id: frame.stream_id, diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 4db3e97..937a8d8 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -9,6 +9,8 @@ pub const FRAME_CLOSE_BACK: u8 = 0x05; pub const FRAME_CONFIG: u8 = 0x06; // Hub -> Edge: configuration update pub const FRAME_PING: u8 = 0x07; // Hub -> Edge: heartbeat probe pub const FRAME_PONG: u8 = 0x08; // Edge -> Hub: heartbeat response +pub const FRAME_WINDOW_UPDATE: u8 = 0x09; // Edge -> Hub: per-stream flow control +pub const FRAME_WINDOW_UPDATE_BACK: u8 = 0x0A; // Hub -> Edge: per-stream flow control // Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes pub const FRAME_HEADER_SIZE: usize = 9; @@ -16,6 +18,27 @@ pub const FRAME_HEADER_SIZE: usize = 9; // Maximum payload size (16 MB) pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024; +// Per-stream flow control constants +/// Initial per-stream window size (256 KB). With 32KB frames, this allows ~8 frames in flight. +pub const INITIAL_STREAM_WINDOW: u32 = 256 * 1024; +/// Send WINDOW_UPDATE after consuming this many bytes (half the initial window). +pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2; +/// Maximum window size to prevent overflow. +pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; + +/// Encode a WINDOW_UPDATE frame for a specific stream. +pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Vec { + encode_frame(stream_id, frame_type, &increment.to_be_bytes()) +} + +/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed. +pub fn decode_window_update(payload: &[u8]) -> Option { + if payload.len() != 4 { + return None; + } + Some(u32::from_be_bytes([payload[0], payload[1], payload[2], payload[3]])) +} + /// A single multiplexed frame. #[derive(Debug, Clone)] pub struct Frame { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 4ea0a19..e86edae 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.4.1', + version: '4.5.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }