diff --git a/changelog.md b/changelog.md index 2d46e31..61f6bc0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.8.14 - fix(rust-core,protocol) +eliminate edge stream registration races and reduce frame buffering copies + +- replace Vec tunnel/frame buffers with bytes::Bytes and BytesMut for lower-copy frame parsing and queueing +- move edge stream ownership into the main I/O loop with explicit register and cleanup channels to ensure streams are registered before OPEN processing +- add proactive send window clamping so active streams converge immediately to adaptive flow-control targets + ## 2026-03-17 - 4.8.13 - fix(remoteingress-protocol) require a flush after each written frame to bound TLS buffer growth diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 900f3a7..9f7dfcb 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -551,6 +551,7 @@ dependencies = [ name = "remoteingress-core" version = "2.0.0" dependencies = [ + "bytes", "log", "rcgen", "remoteingress-protocol", @@ -568,6 +569,7 @@ dependencies = [ name = "remoteingress-protocol" version = "2.0.0" dependencies = [ + "bytes", "log", "tokio", "tokio-util", diff --git a/rust/crates/remoteingress-core/Cargo.toml b/rust/crates/remoteingress-core/Cargo.toml index 11ec5cf..5c92934 100644 --- a/rust/crates/remoteingress-core/Cargo.toml +++ b/rust/crates/remoteingress-core/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" remoteingress-protocol = { path = "../remoteingress-protocol" } tokio = { version = "1", features = ["full"] } tokio-rustls = "0.26" +bytes = "1" rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } rcgen = "0.13" serde = { version = "1", features = ["derive"] } diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 104bbd5..53675c5 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -9,6 +9,7 @@ use tokio::task::JoinHandle; use tokio::time::{Instant, sleep_until}; use tokio_rustls::TlsConnector; use tokio_util::sync::CancellationToken; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use remoteingress_protocol::*; @@ -22,11 +23,11 @@ enum EdgeFrameAction { Disconnect(String), } -/// Per-stream state tracked in the edge's client_writers map. +/// Per-stream state tracked in the edge's stream map. struct EdgeStreamState { /// Unbounded channel to deliver FRAME_DATA_BACK payloads to the hub_to_client task. /// Unbounded because flow control (WINDOW_UPDATE) already limits bytes-in-flight. - back_tx: mpsc::UnboundedSender>, + back_tx: mpsc::UnboundedSender, /// Send window for FRAME_DATA (upload direction). /// Decremented by the client reader, incremented by FRAME_WINDOW_UPDATE_BACK from hub. send_window: Arc, @@ -34,6 +35,12 @@ struct EdgeStreamState { window_notify: Arc, } +/// Registration message sent from per-stream tasks to the main I/O loop. +struct StreamRegistration { + stream_id: u32, + state: EdgeStreamState, +} + /// Edge configuration (hub-host + credentials only; ports come from hub). #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -284,39 +291,29 @@ enum EdgeLoopResult { /// Process a single frame received from the hub side of the tunnel. /// Handles FRAME_DATA_BACK, FRAME_WINDOW_UPDATE_BACK, FRAME_CLOSE_BACK, FRAME_CONFIG, FRAME_PING. -async fn handle_edge_frame( +/// No mutex — edge_streams is owned by the main I/O loop (same pattern as hub.rs). +fn handle_edge_frame( frame: Frame, tunnel_io: &mut remoteingress_protocol::TunnelIo, - client_writers: &Arc>>, - listen_ports: &Arc>>, - event_tx: &mpsc::Sender, - tunnel_writer_tx: &mpsc::Sender>, - tunnel_data_tx: &mpsc::Sender>, - port_listeners: &mut HashMap>, - active_streams: &Arc, - next_stream_id: &Arc, - edge_id: &str, - connection_token: &CancellationToken, - bind_address: &str, + edge_streams: &mut HashMap, + listen_ports_update: &mut Option>, ) -> EdgeFrameAction { match frame.frame_type { FRAME_DATA_BACK => { // Dispatch to per-stream unbounded channel. Flow control (WINDOW_UPDATE) // limits bytes-in-flight, so the channel won't grow unbounded. send() only // fails if the receiver is dropped (hub_to_client task already exited). - let mut writers = client_writers.lock().await; - if let Some(state) = writers.get(&frame.stream_id) { + if let Some(state) = edge_streams.get(&frame.stream_id) { if state.back_tx.send(frame.payload).is_err() { // Receiver dropped — hub_to_client task already exited, clean up - writers.remove(&frame.stream_id); + edge_streams.remove(&frame.stream_id); } } } FRAME_WINDOW_UPDATE_BACK => { if let Some(increment) = decode_window_update(&frame.payload) { if increment > 0 { - let writers = client_writers.lock().await; - if let Some(state) = writers.get(&frame.stream_id) { + if let Some(state) = edge_streams.get(&frame.stream_id) { let prev = state.send_window.fetch_add(increment, Ordering::Release); if prev + increment > MAX_WINDOW_SIZE { state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release); @@ -327,28 +324,12 @@ async fn handle_edge_frame( } } FRAME_CLOSE_BACK => { - let mut writers = client_writers.lock().await; - writers.remove(&frame.stream_id); + edge_streams.remove(&frame.stream_id); } FRAME_CONFIG => { if let Ok(update) = serde_json::from_slice::(&frame.payload) { log::info!("Config update from hub: ports {:?}", update.listen_ports); - *listen_ports.write().await = update.listen_ports.clone(); - let _ = event_tx.try_send(EdgeEvent::PortsUpdated { - listen_ports: update.listen_ports.clone(), - }); - apply_port_config( - &update.listen_ports, - port_listeners, - tunnel_writer_tx, - tunnel_data_tx, - client_writers, - active_streams, - next_stream_id, - edge_id, - connection_token, - bind_address, - ); + *listen_ports_update = Some(update.listen_ports); } } FRAME_PING => { @@ -490,14 +471,17 @@ async fn connect_to_hub_and_run( } }); - // Client socket map: stream_id -> per-stream state (back channel + flow control) - let client_writers: Arc>> = - Arc::new(Mutex::new(HashMap::new())); + // Stream map owned by the main I/O loop — no mutex, matching hub.rs pattern. + let mut edge_streams: HashMap = HashMap::new(); + // Channel for per-stream tasks to register their stream state with the main loop. + let (register_tx, mut register_rx) = mpsc::channel::(256); + // Channel for per-stream tasks to deregister when done. + let (cleanup_tx, mut cleanup_rx) = mpsc::channel::(256); // QoS dual-channel: ctrl frames have priority over data frames. // Stream handlers send through these channels → TunnelIo drains them. - let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::>(256); - let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::>(4096); + let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::(256); + let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::(4096); let tunnel_writer_tx = tunnel_ctrl_tx.clone(); // Start TCP listeners for initial ports @@ -508,7 +492,8 @@ async fn connect_to_hub_and_run( &mut port_listeners, &tunnel_writer_tx, &tunnel_data_tx, - &client_writers, + ®ister_tx, + &cleanup_tx, active_streams, next_stream_id, &config.edge_id, @@ -525,7 +510,18 @@ async fn connect_to_hub_and_run( let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur)); let result = 'io_loop: loop { + // Drain stream registrations from per-stream tasks (before poll_step so + // registrations are processed before OPEN frames are sent to the hub). + while let Ok(reg) = register_rx.try_recv() { + edge_streams.insert(reg.stream_id, reg.state); + } + // Drain stream cleanups from per-stream tasks + while let Ok(stream_id) = cleanup_rx.try_recv() { + edge_streams.remove(&stream_id); + } + // Drain any buffered frames + let mut listen_ports_update = None; loop { let frame = match tunnel_io.try_parse_frame() { Some(Ok(f)) => f, @@ -538,28 +534,55 @@ async fn connect_to_hub_and_run( last_activity = Instant::now(); liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame( - frame, &mut tunnel_io, &client_writers, listen_ports, event_tx, - &tunnel_writer_tx, &tunnel_data_tx, &mut port_listeners, - active_streams, next_stream_id, &config.edge_id, connection_token, bind_address, - ).await { + frame, &mut tunnel_io, &mut edge_streams, &mut listen_ports_update, + ) { break 'io_loop EdgeLoopResult::Reconnect(reason); } } + // Apply port config update if handle_edge_frame signalled one + if let Some(new_ports) = listen_ports_update.take() { + *listen_ports.write().await = new_ports.clone(); + let _ = event_tx.try_send(EdgeEvent::PortsUpdated { + listen_ports: new_ports.clone(), + }); + apply_port_config( + &new_ports, + &mut port_listeners, + &tunnel_writer_tx, + &tunnel_data_tx, + ®ister_tx, + &cleanup_tx, + active_streams, + next_stream_id, + &config.edge_id, + connection_token, + bind_address, + ); + } + // Poll I/O: write(ctrl→data), flush, read, channels, timers let event = std::future::poll_fn(|cx| { tunnel_io.poll_step(cx, &mut tunnel_ctrl_rx, &mut tunnel_data_rx, &mut liveness_deadline, connection_token) }).await; + // Drain registrations/cleanups before processing the event — registrations + // may have arrived while poll_step was running (multiple poll cycles inside .await). + while let Ok(reg) = register_rx.try_recv() { + edge_streams.insert(reg.stream_id, reg.state); + } + while let Ok(stream_id) = cleanup_rx.try_recv() { + edge_streams.remove(&stream_id); + } + + let mut listen_ports_update = None; match event { remoteingress_protocol::TunnelEvent::Frame(frame) => { last_activity = Instant::now(); liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame( - frame, &mut tunnel_io, &client_writers, listen_ports, event_tx, - &tunnel_writer_tx, &tunnel_data_tx, &mut port_listeners, - active_streams, next_stream_id, &config.edge_id, connection_token, bind_address, - ).await { + frame, &mut tunnel_io, &mut edge_streams, &mut listen_ports_update, + ) { break EdgeLoopResult::Reconnect(reason); } } @@ -586,6 +609,27 @@ async fn connect_to_hub_and_run( break EdgeLoopResult::Shutdown; } } + + // Apply port config update if handle_edge_frame signalled one + if let Some(new_ports) = listen_ports_update.take() { + *listen_ports.write().await = new_ports.clone(); + let _ = event_tx.try_send(EdgeEvent::PortsUpdated { + listen_ports: new_ports.clone(), + }); + apply_port_config( + &new_ports, + &mut port_listeners, + &tunnel_writer_tx, + &tunnel_data_tx, + ®ister_tx, + &cleanup_tx, + active_streams, + next_stream_id, + &config.edge_id, + connection_token, + bind_address, + ); + } }; // Cancel stream tokens FIRST so stream handlers exit immediately. @@ -612,9 +656,10 @@ async fn connect_to_hub_and_run( fn apply_port_config( new_ports: &[u16], port_listeners: &mut HashMap>, - tunnel_ctrl_tx: &mpsc::Sender>, - tunnel_data_tx: &mpsc::Sender>, - client_writers: &Arc>>, + tunnel_ctrl_tx: &mpsc::Sender, + tunnel_data_tx: &mpsc::Sender, + register_tx: &mpsc::Sender, + cleanup_tx: &mpsc::Sender, active_streams: &Arc, next_stream_id: &Arc, edge_id: &str, @@ -636,7 +681,8 @@ fn apply_port_config( for &port in new_set.difference(&old_set) { let tunnel_ctrl_tx = tunnel_ctrl_tx.clone(); let tunnel_data_tx = tunnel_data_tx.clone(); - let client_writers = client_writers.clone(); + let register_tx = register_tx.clone(); + let cleanup_tx = cleanup_tx.clone(); let active_streams = active_streams.clone(); let next_stream_id = next_stream_id.clone(); let edge_id = edge_id.to_string(); @@ -670,7 +716,8 @@ fn apply_port_config( let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed); let tunnel_ctrl_tx = tunnel_ctrl_tx.clone(); let tunnel_data_tx = tunnel_data_tx.clone(); - let client_writers = client_writers.clone(); + let register_tx = register_tx.clone(); + let cleanup_tx = cleanup_tx.clone(); let active_streams = active_streams.clone(); let edge_id = edge_id.clone(); let client_token = port_token.child_token(); @@ -686,7 +733,8 @@ fn apply_port_config( &edge_id, tunnel_ctrl_tx, tunnel_data_tx, - client_writers, + register_tx, + cleanup_tx, client_token, Arc::clone(&active_streams), ) @@ -727,9 +775,10 @@ async fn handle_client_connection( stream_id: u32, dest_port: u16, edge_id: &str, - tunnel_ctrl_tx: mpsc::Sender>, - tunnel_data_tx: mpsc::Sender>, - client_writers: Arc>>, + tunnel_ctrl_tx: mpsc::Sender, + tunnel_data_tx: mpsc::Sender, + register_tx: mpsc::Sender, + cleanup_tx: mpsc::Sender, client_token: CancellationToken, active_streams: Arc, ) { @@ -739,6 +788,36 @@ async fn handle_client_connection( // Determine edge IP (use 0.0.0.0 as placeholder — hub doesn't use it for routing) let edge_ip = "0.0.0.0"; + // Per-stream unbounded back-channel. Flow control (WINDOW_UPDATE) limits + // bytes-in-flight, so this won't grow unbounded. Unbounded avoids killing + // streams due to channel overflow — backpressure slows streams, never kills them. + let (back_tx, mut back_rx) = mpsc::unbounded_channel::(); + // Adaptive initial window: scale with current stream count to keep total in-flight + // data within the 32MB budget. Prevents burst flooding when many streams open. + let initial_window = remoteingress_protocol::compute_window_for_stream_count( + active_streams.load(Ordering::Relaxed), + ); + let send_window = Arc::new(AtomicU32::new(initial_window)); + let window_notify = Arc::new(Notify::new()); + + // Register with the main I/O loop BEFORE sending OPEN. The main loop drains + // register_rx before poll_step drains ctrl_rx, guaranteeing the stream is + // registered before the OPEN frame reaches the hub and DATA_BACK arrives. + let reg_ok = tokio::select! { + result = register_tx.send(StreamRegistration { + stream_id, + state: EdgeStreamState { + back_tx, + send_window: Arc::clone(&send_window), + window_notify: Arc::clone(&window_notify), + }, + }) => result.is_ok(), + _ = client_token.cancelled() => false, + }; + if !reg_ok { + return; + } + // Send OPEN frame with PROXY v1 header via control channel let proxy_header = build_proxy_v1_header(&client_ip, edge_ip, client_port, dest_port); let open_frame = encode_frame(stream_id, FRAME_OPEN, proxy_header.as_bytes()); @@ -747,29 +826,10 @@ async fn handle_client_connection( _ = client_token.cancelled() => false, }; if !send_ok { + let _ = cleanup_tx.try_send(stream_id); return; } - // Per-stream unbounded back-channel. Flow control (WINDOW_UPDATE) limits - // bytes-in-flight, so this won't grow unbounded. Unbounded avoids killing - // streams due to channel overflow — backpressure slows streams, never kills them. - let (back_tx, mut back_rx) = mpsc::unbounded_channel::>(); - // Adaptive initial window: scale with current stream count to keep total in-flight - // data within the 32MB budget. Prevents burst flooding when many streams open. - let initial_window = remoteingress_protocol::compute_window_for_stream_count( - active_streams.load(Ordering::Relaxed), - ); - let send_window = Arc::new(AtomicU32::new(initial_window)); - let window_notify = Arc::new(Notify::new()); - { - let mut writers = client_writers.lock().await; - writers.insert(stream_id, EdgeStreamState { - back_tx, - send_window: Arc::clone(&send_window), - window_notify: Arc::clone(&window_notify), - }); - } - let (mut client_read, mut client_write) = client_stream.into_split(); // Task: hub -> client (download direction) @@ -853,20 +913,17 @@ async fn handle_client_connection( } if client_token.is_cancelled() { break; } - // Limit read size to available window. - // IMPORTANT: if window is 0 (stall timeout fired), we must NOT - // read into an empty buffer — read(&mut buf[..0]) returns Ok(0) - // which would be falsely interpreted as EOF. - let w = send_window.load(Ordering::Acquire) as usize; + // Proactive QoS: clamp send_window to current adaptive target so existing + // streams converge immediately when concurrency increases (no drain cycle). + let adaptive_target = remoteingress_protocol::compute_window_for_stream_count( + active_streams.load(Ordering::Relaxed), + ); + let w = remoteingress_protocol::clamp_send_window(&send_window, adaptive_target) as usize; if w == 0 { log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id); break; } - // Adaptive: cap read to current per-stream target window - let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count( - active_streams.load(Ordering::Relaxed), - ) as usize; - let max_read = w.min(32768).min(adaptive_cap); + let max_read = w.min(32768); tokio::select! { read_result = client_read.read(&mut buf[FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + max_read]) => { @@ -875,7 +932,7 @@ async fn handle_client_connection( Ok(n) => { send_window.fetch_sub(n as u32, Ordering::Release); encode_frame_header(&mut buf, stream_id, FRAME_DATA, n); - let data_frame = buf[..FRAME_HEADER_SIZE + n].to_vec(); + let data_frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]); let sent = tokio::select! { result = tunnel_data_tx.send(data_frame) => result.is_ok(), _ = client_token.cancelled() => false, @@ -910,11 +967,8 @@ async fn handle_client_connection( } } - // Clean up - { - let mut writers = client_writers.lock().await; - writers.remove(&stream_id); - } + // Clean up — notify main loop to remove stream state + let _ = cleanup_tx.try_send(stream_id); hub_to_client.abort(); // No-op if already finished; safety net if timeout fired let _ = edge_id; // used for logging context } diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 9aa9803..ecd0a67 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -10,6 +10,7 @@ use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use serde::{Deserialize, Serialize}; +use bytes::Bytes; use remoteingress_protocol::*; type HubTlsStream = tokio_rustls::server::TlsStream; @@ -26,7 +27,7 @@ struct HubStreamState { /// Unbounded channel to deliver FRAME_DATA payloads to the upstream writer task. /// Unbounded because flow control (WINDOW_UPDATE) already limits bytes-in-flight. /// A bounded channel would kill streams instead of applying backpressure. - data_tx: mpsc::UnboundedSender>, + data_tx: mpsc::UnboundedSender, /// Cancellation token for this stream. cancel_token: CancellationToken, /// Send window for FRAME_DATA_BACK (download direction). @@ -307,8 +308,8 @@ async fn handle_hub_frame( edge_stream_count: &Arc, edge_id: &str, event_tx: &mpsc::Sender, - ctrl_tx: &mpsc::Sender>, - data_tx: &mpsc::Sender>, + ctrl_tx: &mpsc::Sender, + data_tx: &mpsc::Sender, target_host: &str, edge_token: &CancellationToken, cleanup_tx: &mpsc::Sender, @@ -346,7 +347,7 @@ async fn handle_hub_frame( }); // Create channel for data from edge to this stream - let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::>(); + let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::(); // Adaptive initial window: scale with current stream count // to keep total in-flight data within the 32MB budget. let initial_window = compute_window_for_stream_count( @@ -478,20 +479,17 @@ async fn handle_hub_frame( } if stream_token.is_cancelled() { break; } - // Limit read size to available window. - // IMPORTANT: if window is 0 (stall timeout fired), we must NOT - // read into an empty buffer — read(&mut buf[..0]) returns Ok(0) - // which would be falsely interpreted as EOF. - let w = send_window.load(Ordering::Acquire) as usize; + // Proactive QoS: clamp send_window to current adaptive target so existing + // streams converge immediately when concurrency increases (no drain cycle). + let adaptive_target = remoteingress_protocol::compute_window_for_stream_count( + stream_counter.load(Ordering::Relaxed), + ); + let w = remoteingress_protocol::clamp_send_window(&send_window, adaptive_target) as usize; if w == 0 { log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id); break; } - // Adaptive: cap read to current per-stream target window - let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count( - stream_counter.load(Ordering::Relaxed), - ) as usize; - let max_read = w.min(32768).min(adaptive_cap); + let max_read = w.min(32768); tokio::select! { read_result = up_read.read(&mut buf[FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + max_read]) => { @@ -500,7 +498,7 @@ async fn handle_hub_frame( Ok(n) => { send_window.fetch_sub(n as u32, Ordering::Release); encode_frame_header(&mut buf, stream_id, FRAME_DATA_BACK, n); - let frame = buf[..FRAME_HEADER_SIZE + n].to_vec(); + let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]); let sent = tokio::select! { result = data_writer_tx.send(frame) => result.is_ok(), _ = stream_token.cancelled() => false, @@ -711,8 +709,8 @@ async fn handle_edge_connection( // QoS dual-channel: ctrl frames have priority over data frames. // Stream handlers send through these channels -> TunnelIo drains them. - let (ctrl_tx, mut ctrl_rx) = mpsc::channel::>(256); - let (data_tx, mut data_rx) = mpsc::channel::>(4096); + let (ctrl_tx, mut ctrl_rx) = mpsc::channel::(256); + let (data_tx, mut data_rx) = mpsc::channel::(4096); // Spawn task to forward config updates as FRAME_CONFIG frames let config_writer_tx = ctrl_tx.clone(); diff --git a/rust/crates/remoteingress-protocol/Cargo.toml b/rust/crates/remoteingress-protocol/Cargo.toml index b72b423..9b7f17d 100644 --- a/rust/crates/remoteingress-protocol/Cargo.toml +++ b/rust/crates/remoteingress-protocol/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["io-util", "sync", "time"] } tokio-util = "0.7" +bytes = "1" log = "0.4" [dev-dependencies] diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 627e1c7..7cee773 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use bytes::{Bytes, BytesMut}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; // Frame type constants @@ -32,7 +33,7 @@ pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2; pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; /// Encode a WINDOW_UPDATE frame for a specific stream. -pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Vec { +pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes { encode_frame(stream_id, frame_type, &increment.to_be_bytes()) } @@ -45,6 +46,30 @@ pub fn compute_window_for_stream_count(active: u32) -> u32 { per_stream.clamp(64 * 1024, INITIAL_STREAM_WINDOW as u64) as u32 } +/// Proactively clamp a send_window AtomicU32 down to at most `target`. +/// CAS loop so concurrent WINDOW_UPDATE additions are not lost. +/// Returns the value after clamping. +#[inline] +pub fn clamp_send_window( + send_window: &std::sync::atomic::AtomicU32, + target: u32, +) -> u32 { + loop { + let current = send_window.load(std::sync::atomic::Ordering::Acquire); + if current <= target { + return current; + } + match send_window.compare_exchange_weak( + current, target, + std::sync::atomic::Ordering::AcqRel, + std::sync::atomic::Ordering::Relaxed, + ) { + Ok(_) => return target, + Err(_) => continue, + } + } +} + /// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed. pub fn decode_window_update(payload: &[u8]) -> Option { if payload.len() != 4 { @@ -58,18 +83,18 @@ pub fn decode_window_update(payload: &[u8]) -> Option { pub struct Frame { pub stream_id: u32, pub frame_type: u8, - pub payload: Vec, + pub payload: Bytes, } /// Encode a frame into bytes: [stream_id:4][type:1][length:4][payload] -pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec { +pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Bytes { let len = payload.len() as u32; let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len()); buf.extend_from_slice(&stream_id.to_be_bytes()); buf.push(frame_type); buf.extend_from_slice(&len.to_be_bytes()); buf.extend_from_slice(payload); - buf + Bytes::from(buf) } /// Write a frame header into `buf[0..FRAME_HEADER_SIZE]`. @@ -152,7 +177,7 @@ impl FrameReader { Ok(Some(Frame { stream_id, frame_type, - payload, + payload: Bytes::from(payload), })) } @@ -186,9 +211,9 @@ pub enum TunnelEvent { /// Write state extracted into a sub-struct so the borrow checker can see /// disjoint field access between `self.write` and `self.stream`. struct WriteState { - ctrl_queue: VecDeque>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first - data_queue: VecDeque>, // DATA, DATA_BACK — only when ctrl is empty - offset: usize, // progress within current frame being written + ctrl_queue: VecDeque, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first + data_queue: VecDeque, // DATA, DATA_BACK — only when ctrl is empty + offset: usize, // progress within current frame being written flush_needed: bool, } @@ -206,26 +231,21 @@ impl WriteState { /// WINDOW_UPDATE starvation that causes flow control deadlocks. pub struct TunnelIo { stream: S, - // Read state: accumulate bytes, parse frames incrementally - read_buf: Vec, - read_pos: usize, - parse_pos: usize, + // Read state: BytesMut accumulates bytes; split_to extracts frames zero-copy. + read_buf: BytesMut, // Write state: extracted sub-struct for safe disjoint borrows write: WriteState, } impl TunnelIo { pub fn new(stream: S, initial_data: Vec) -> Self { - let read_pos = initial_data.len(); - let mut read_buf = initial_data; + let mut read_buf = BytesMut::from(&initial_data[..]); if read_buf.capacity() < 65536 { read_buf.reserve(65536 - read_buf.len()); } Self { stream, read_buf, - read_pos, - parse_pos: 0, write: WriteState { ctrl_queue: VecDeque::new(), data_queue: VecDeque::new(), @@ -236,41 +256,39 @@ impl TunnelIo { } /// Queue a high-priority control frame (PONG, WINDOW_UPDATE, CLOSE, OPEN). - pub fn queue_ctrl(&mut self, frame: Vec) { + pub fn queue_ctrl(&mut self, frame: Bytes) { self.write.ctrl_queue.push_back(frame); } /// Queue a lower-priority data frame (DATA, DATA_BACK). - pub fn queue_data(&mut self, frame: Vec) { + pub fn queue_data(&mut self, frame: Bytes) { self.write.data_queue.push_back(frame); } /// Try to parse a complete frame from the read buffer. - /// Uses a parse_pos cursor to avoid drain() on every frame. + /// Zero-copy: uses BytesMut::split_to to extract frames without allocating. pub fn try_parse_frame(&mut self) -> Option> { - let available = self.read_pos - self.parse_pos; - if available < FRAME_HEADER_SIZE { + if self.read_buf.len() < FRAME_HEADER_SIZE { return None; } - let base = self.parse_pos; let stream_id = u32::from_be_bytes([ - self.read_buf[base], self.read_buf[base + 1], - self.read_buf[base + 2], self.read_buf[base + 3], + self.read_buf[0], self.read_buf[1], + self.read_buf[2], self.read_buf[3], ]); - let frame_type = self.read_buf[base + 4]; + let frame_type = self.read_buf[4]; let length = u32::from_be_bytes([ - self.read_buf[base + 5], self.read_buf[base + 6], - self.read_buf[base + 7], self.read_buf[base + 8], + self.read_buf[5], self.read_buf[6], + self.read_buf[7], self.read_buf[8], ]); if length > MAX_PAYLOAD_SIZE { let header = [ - self.read_buf[base], self.read_buf[base + 1], - self.read_buf[base + 2], self.read_buf[base + 3], - self.read_buf[base + 4], self.read_buf[base + 5], - self.read_buf[base + 6], self.read_buf[base + 7], - self.read_buf[base + 8], + self.read_buf[0], self.read_buf[1], + self.read_buf[2], self.read_buf[3], + self.read_buf[4], self.read_buf[5], + self.read_buf[6], self.read_buf[7], + self.read_buf[8], ]; log::error!( "CORRUPT FRAME HEADER: raw={:02x?} stream_id={} type=0x{:02x} length={}", @@ -283,19 +301,15 @@ impl TunnelIo { } let total_frame_size = FRAME_HEADER_SIZE + length as usize; - if available < total_frame_size { + if self.read_buf.len() < total_frame_size { return None; } - let payload = self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size].to_vec(); - self.parse_pos += total_frame_size; - - // Compact when parse_pos > half the data to reclaim memory - if self.parse_pos > self.read_pos / 2 && self.parse_pos > 0 { - self.read_buf.drain(..self.parse_pos); - self.read_pos -= self.parse_pos; - self.parse_pos = 0; - } + // Zero-copy extraction: split the frame off the read buffer (O(1) pointer adjustment). + // split_to removes the first total_frame_size bytes from read_buf. + let mut frame_data = self.read_buf.split_to(total_frame_size); + // Split off header, keep only payload. freeze() converts BytesMut → Bytes (O(1)). + let payload = frame_data.split_off(FRAME_HEADER_SIZE).freeze(); Some(Ok(Frame { stream_id, frame_type, payload })) } @@ -306,8 +320,8 @@ impl TunnelIo { pub fn poll_step( &mut self, cx: &mut Context<'_>, - ctrl_rx: &mut tokio::sync::mpsc::Receiver>, - data_rx: &mut tokio::sync::mpsc::Receiver>, + ctrl_rx: &mut tokio::sync::mpsc::Receiver, + data_rx: &mut tokio::sync::mpsc::Receiver, liveness_deadline: &mut Pin>, cancel_token: &tokio_util::sync::CancellationToken, ) -> Poll { @@ -371,23 +385,18 @@ impl TunnelIo { // the waker without re-registering it, causing the task to sleep until a // timer or channel wakes it (potentially 15+ seconds of lost reads). loop { - // Compact if needed to make room for reads - if self.parse_pos > 0 && self.read_buf.len() - self.read_pos < 32768 { - self.read_buf.drain(..self.parse_pos); - self.read_pos -= self.parse_pos; - self.parse_pos = 0; - } - if self.read_buf.len() < self.read_pos + 32768 { - self.read_buf.resize(self.read_pos + 32768, 0); - } - let mut rbuf = ReadBuf::new(&mut self.read_buf[self.read_pos..]); + // Ensure at least 32KB of writable space + let len_before = self.read_buf.len(); + self.read_buf.resize(len_before + 32768, 0); + let mut rbuf = ReadBuf::new(&mut self.read_buf[len_before..]); match Pin::new(&mut self.stream).poll_read(cx, &mut rbuf) { Poll::Ready(Ok(())) => { let n = rbuf.filled().len(); + // Trim back to actual data length + self.read_buf.truncate(len_before + n); if n == 0 { return Poll::Ready(TunnelEvent::Eof); } - self.read_pos += n; if let Some(result) = self.try_parse_frame() { return match result { Ok(frame) => Poll::Ready(TunnelEvent::Frame(frame)), @@ -398,10 +407,14 @@ impl TunnelIo { // waker is re-registered when it finally returns Pending. } Poll::Ready(Err(e)) => { + self.read_buf.truncate(len_before); log::error!("TunnelIo: poll_read error: {}", e); return Poll::Ready(TunnelEvent::ReadError(e)); } - Poll::Pending => break, + Poll::Pending => { + self.read_buf.truncate(len_before); + break; + } } } @@ -471,14 +484,14 @@ mod tests { let mut buf = vec![0u8; FRAME_HEADER_SIZE + payload.len()]; buf[FRAME_HEADER_SIZE..].copy_from_slice(payload); encode_frame_header(&mut buf, 42, FRAME_DATA, payload.len()); - assert_eq!(buf, encode_frame(42, FRAME_DATA, payload)); + assert_eq!(buf[..], encode_frame(42, FRAME_DATA, payload)[..]); } #[test] fn test_encode_frame_header_empty_payload() { let mut buf = vec![0u8; FRAME_HEADER_SIZE]; encode_frame_header(&mut buf, 99, FRAME_CLOSE, 0); - assert_eq!(buf, encode_frame(99, FRAME_CLOSE, &[])); + assert_eq!(buf[..], encode_frame(99, FRAME_CLOSE, &[])[..]); } #[test] @@ -646,7 +659,7 @@ mod tests { let frame = reader.next_frame().await.unwrap().unwrap(); assert_eq!(frame.stream_id, i as u32); assert_eq!(frame.frame_type, ft); - assert_eq!(frame.payload, format!("payload_{}", i).as_bytes()); + assert_eq!(&frame.payload[..], format!("payload_{}", i).as_bytes()); } assert!(reader.next_frame().await.unwrap().is_none()); @@ -655,7 +668,7 @@ mod tests { #[tokio::test] async fn test_frame_reader_zero_length_payload() { let data = encode_frame(42, FRAME_CLOSE, &[]); - let cursor = std::io::Cursor::new(data); + let cursor = std::io::Cursor::new(data.to_vec()); let mut reader = FrameReader::new(cursor); let frame = reader.next_frame().await.unwrap().unwrap(); @@ -783,6 +796,39 @@ mod tests { } } + // --- clamp_send_window tests --- + + #[test] + fn test_clamp_send_window_reduces_above_target() { + let w = std::sync::atomic::AtomicU32::new(4 * 1024 * 1024); // 4 MB + let result = clamp_send_window(&w, 512 * 1024); // target 512 KB + assert_eq!(result, 512 * 1024); + assert_eq!(w.load(std::sync::atomic::Ordering::Relaxed), 512 * 1024); + } + + #[test] + fn test_clamp_send_window_noop_below_target() { + let w = std::sync::atomic::AtomicU32::new(256 * 1024); // 256 KB + let result = clamp_send_window(&w, 512 * 1024); // target 512 KB + assert_eq!(result, 256 * 1024); + assert_eq!(w.load(std::sync::atomic::Ordering::Relaxed), 256 * 1024); + } + + #[test] + fn test_clamp_send_window_noop_at_target() { + let w = std::sync::atomic::AtomicU32::new(512 * 1024); + let result = clamp_send_window(&w, 512 * 1024); + assert_eq!(result, 512 * 1024); + assert_eq!(w.load(std::sync::atomic::Ordering::Relaxed), 512 * 1024); + } + + #[test] + fn test_clamp_send_window_zero_value() { + let w = std::sync::atomic::AtomicU32::new(0); + let result = clamp_send_window(&w, 64 * 1024); + assert_eq!(result, 0); + } + // --- encode/decode window_update roundtrip --- #[test] diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index f4a73a9..f691b4c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.13', + version: '4.8.14', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }