diff --git a/changelog.md b/changelog.md index 82f2cde..5c1d81f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.8.18 - fix(rust-protocol) +switch tunnel frame buffers from Vec to Bytes to reduce copying and memory overhead + +- Add the bytes crate to core and protocol crates +- Update frame encoding, reader payloads, channel queues, and stream backchannels to use Bytes +- Adjust edge and hub data/control paths to send framed payloads as Bytes + ## 2026-03-17 - 4.8.17 - fix(protocol) increase per-stream flow control windows and remove adaptive read caps diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 900f3a7..9f7dfcb 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -551,6 +551,7 @@ dependencies = [ name = "remoteingress-core" version = "2.0.0" dependencies = [ + "bytes", "log", "rcgen", "remoteingress-protocol", @@ -568,6 +569,7 @@ dependencies = [ name = "remoteingress-protocol" version = "2.0.0" dependencies = [ + "bytes", "log", "tokio", "tokio-util", diff --git a/rust/crates/remoteingress-core/Cargo.toml b/rust/crates/remoteingress-core/Cargo.toml index 11ec5cf..5c92934 100644 --- a/rust/crates/remoteingress-core/Cargo.toml +++ b/rust/crates/remoteingress-core/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" remoteingress-protocol = { path = "../remoteingress-protocol" } tokio = { version = "1", features = ["full"] } tokio-rustls = "0.26" +bytes = "1" rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } rcgen = "0.13" serde = { version = "1", features = ["derive"] } diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 3b6d3dd..18fae9e 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -11,6 +11,7 @@ use tokio_rustls::TlsConnector; use tokio_util::sync::CancellationToken; use serde::{Deserialize, Serialize}; +use bytes::Bytes; use remoteingress_protocol::*; type EdgeTlsStream = tokio_rustls::client::TlsStream; @@ -26,7 +27,7 @@ enum EdgeFrameAction { struct EdgeStreamState { /// Unbounded channel to deliver FRAME_DATA_BACK payloads to the hub_to_client task. /// Unbounded because flow control (WINDOW_UPDATE) already limits bytes-in-flight. - back_tx: mpsc::UnboundedSender>, + back_tx: mpsc::UnboundedSender, /// Send window for FRAME_DATA (upload direction). /// Decremented by the client reader, incremented by FRAME_WINDOW_UPDATE_BACK from hub. send_window: Arc, @@ -290,8 +291,8 @@ async fn handle_edge_frame( client_writers: &Arc>>, listen_ports: &Arc>>, event_tx: &mpsc::Sender, - tunnel_writer_tx: &mpsc::Sender>, - tunnel_data_tx: &mpsc::Sender>, + tunnel_writer_tx: &mpsc::Sender, + tunnel_data_tx: &mpsc::Sender, port_listeners: &mut HashMap>, active_streams: &Arc, next_stream_id: &Arc, @@ -496,8 +497,8 @@ async fn connect_to_hub_and_run( // QoS dual-channel: ctrl frames have priority over data frames. // Stream handlers send through these channels → TunnelIo drains them. - let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::>(256); - let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::>(4096); + let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::(256); + let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::(4096); let tunnel_writer_tx = tunnel_ctrl_tx.clone(); // Start TCP listeners for initial ports @@ -612,8 +613,8 @@ async fn connect_to_hub_and_run( fn apply_port_config( new_ports: &[u16], port_listeners: &mut HashMap>, - tunnel_ctrl_tx: &mpsc::Sender>, - tunnel_data_tx: &mpsc::Sender>, + tunnel_ctrl_tx: &mpsc::Sender, + tunnel_data_tx: &mpsc::Sender, client_writers: &Arc>>, active_streams: &Arc, next_stream_id: &Arc, @@ -727,8 +728,8 @@ async fn handle_client_connection( stream_id: u32, dest_port: u16, edge_id: &str, - tunnel_ctrl_tx: mpsc::Sender>, - tunnel_data_tx: mpsc::Sender>, + tunnel_ctrl_tx: mpsc::Sender, + tunnel_data_tx: mpsc::Sender, client_writers: Arc>>, client_token: CancellationToken, active_streams: Arc, @@ -753,7 +754,7 @@ async fn handle_client_connection( // Per-stream unbounded back-channel. Flow control (WINDOW_UPDATE) limits // bytes-in-flight, so this won't grow unbounded. Unbounded avoids killing // streams due to channel overflow — backpressure slows streams, never kills them. - let (back_tx, mut back_rx) = mpsc::unbounded_channel::>(); + let (back_tx, mut back_rx) = mpsc::unbounded_channel::(); // Adaptive initial window: scale with current stream count to keep total in-flight // data within the 32MB budget. Prevents burst flooding when many streams open. let initial_window = remoteingress_protocol::compute_window_for_stream_count( @@ -871,7 +872,7 @@ async fn handle_client_connection( Ok(n) => { send_window.fetch_sub(n as u32, Ordering::Release); encode_frame_header(&mut buf, stream_id, FRAME_DATA, n); - let data_frame = buf[..FRAME_HEADER_SIZE + n].to_vec(); + let data_frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]); let sent = tokio::select! { result = tunnel_data_tx.send(data_frame) => result.is_ok(), _ = client_token.cancelled() => false, diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index c3eef0c..1a50892 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -10,6 +10,7 @@ use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use serde::{Deserialize, Serialize}; +use bytes::Bytes; use remoteingress_protocol::*; type HubTlsStream = tokio_rustls::server::TlsStream; @@ -26,7 +27,7 @@ struct HubStreamState { /// Unbounded channel to deliver FRAME_DATA payloads to the upstream writer task. /// Unbounded because flow control (WINDOW_UPDATE) already limits bytes-in-flight. /// A bounded channel would kill streams instead of applying backpressure. - data_tx: mpsc::UnboundedSender>, + data_tx: mpsc::UnboundedSender, /// Cancellation token for this stream. cancel_token: CancellationToken, /// Send window for FRAME_DATA_BACK (download direction). @@ -307,8 +308,8 @@ async fn handle_hub_frame( edge_stream_count: &Arc, edge_id: &str, event_tx: &mpsc::Sender, - ctrl_tx: &mpsc::Sender>, - data_tx: &mpsc::Sender>, + ctrl_tx: &mpsc::Sender, + data_tx: &mpsc::Sender, target_host: &str, edge_token: &CancellationToken, cleanup_tx: &mpsc::Sender, @@ -346,7 +347,7 @@ async fn handle_hub_frame( }); // Create channel for data from edge to this stream - let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::>(); + let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::(); // Adaptive initial window: scale with current stream count // to keep total in-flight data within the 32MB budget. let initial_window = compute_window_for_stream_count( @@ -496,7 +497,7 @@ async fn handle_hub_frame( Ok(n) => { send_window.fetch_sub(n as u32, Ordering::Release); encode_frame_header(&mut buf, stream_id, FRAME_DATA_BACK, n); - let frame = buf[..FRAME_HEADER_SIZE + n].to_vec(); + let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]); let sent = tokio::select! { result = data_writer_tx.send(frame) => result.is_ok(), _ = stream_token.cancelled() => false, @@ -707,8 +708,8 @@ async fn handle_edge_connection( // QoS dual-channel: ctrl frames have priority over data frames. // Stream handlers send through these channels -> TunnelIo drains them. - let (ctrl_tx, mut ctrl_rx) = mpsc::channel::>(256); - let (data_tx, mut data_rx) = mpsc::channel::>(4096); + let (ctrl_tx, mut ctrl_rx) = mpsc::channel::(256); + let (data_tx, mut data_rx) = mpsc::channel::(4096); // Spawn task to forward config updates as FRAME_CONFIG frames let config_writer_tx = ctrl_tx.clone(); diff --git a/rust/crates/remoteingress-protocol/Cargo.toml b/rust/crates/remoteingress-protocol/Cargo.toml index b72b423..9b7f17d 100644 --- a/rust/crates/remoteingress-protocol/Cargo.toml +++ b/rust/crates/remoteingress-protocol/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["io-util", "sync", "time"] } tokio-util = "0.7" +bytes = "1" log = "0.4" [dev-dependencies] diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 523372b..bf37e16 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use bytes::{Bytes, BytesMut, BufMut}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; // Frame type constants @@ -31,7 +32,7 @@ pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2; pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; /// Encode a WINDOW_UPDATE frame for a specific stream. -pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Vec { +pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes { encode_frame(stream_id, frame_type, &increment.to_be_bytes()) } @@ -56,18 +57,18 @@ pub fn decode_window_update(payload: &[u8]) -> Option { pub struct Frame { pub stream_id: u32, pub frame_type: u8, - pub payload: Vec, + pub payload: Bytes, } /// Encode a frame into bytes: [stream_id:4][type:1][length:4][payload] -pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec { +pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Bytes { let len = payload.len() as u32; - let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len()); - buf.extend_from_slice(&stream_id.to_be_bytes()); - buf.push(frame_type); - buf.extend_from_slice(&len.to_be_bytes()); - buf.extend_from_slice(payload); - buf + let mut buf = BytesMut::with_capacity(FRAME_HEADER_SIZE + payload.len()); + buf.put_slice(&stream_id.to_be_bytes()); + buf.put_u8(frame_type); + buf.put_slice(&len.to_be_bytes()); + buf.put_slice(payload); + buf.freeze() } /// Write a frame header into `buf[0..FRAME_HEADER_SIZE]`. @@ -142,7 +143,7 @@ impl FrameReader { )); } - let mut payload = vec![0u8; length as usize]; + let mut payload = BytesMut::zeroed(length as usize); if length > 0 { self.reader.read_exact(&mut payload).await?; } @@ -150,7 +151,7 @@ impl FrameReader { Ok(Some(Frame { stream_id, frame_type, - payload, + payload: payload.freeze(), })) } @@ -184,8 +185,8 @@ pub enum TunnelEvent { /// Write state extracted into a sub-struct so the borrow checker can see /// disjoint field access between `self.write` and `self.stream`. struct WriteState { - ctrl_queue: VecDeque>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first - data_queue: VecDeque>, // DATA, DATA_BACK — only when ctrl is empty + ctrl_queue: VecDeque, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first + data_queue: VecDeque, // DATA, DATA_BACK — only when ctrl is empty offset: usize, // progress within current frame being written flush_needed: bool, } @@ -234,12 +235,12 @@ impl TunnelIo { } /// Queue a high-priority control frame (PONG, WINDOW_UPDATE, CLOSE, OPEN). - pub fn queue_ctrl(&mut self, frame: Vec) { + pub fn queue_ctrl(&mut self, frame: Bytes) { self.write.ctrl_queue.push_back(frame); } /// Queue a lower-priority data frame (DATA, DATA_BACK). - pub fn queue_data(&mut self, frame: Vec) { + pub fn queue_data(&mut self, frame: Bytes) { self.write.data_queue.push_back(frame); } @@ -285,7 +286,9 @@ impl TunnelIo { return None; } - let payload = self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size].to_vec(); + let payload = Bytes::copy_from_slice( + &self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size], + ); self.parse_pos += total_frame_size; // Compact when parse_pos > half the data to reclaim memory @@ -300,12 +303,12 @@ impl TunnelIo { /// Poll-based I/O step. Returns Ready on events, Pending when idle. /// - /// Order: write(ctrl→data) → flush → read → channels → timers + /// Order: write(ctrl->data) -> flush -> read -> channels -> timers pub fn poll_step( &mut self, cx: &mut Context<'_>, - ctrl_rx: &mut tokio::sync::mpsc::Receiver>, - data_rx: &mut tokio::sync::mpsc::Receiver>, + ctrl_rx: &mut tokio::sync::mpsc::Receiver, + data_rx: &mut tokio::sync::mpsc::Receiver, liveness_deadline: &mut Pin>, cancel_token: &tokio_util::sync::CancellationToken, ) -> Poll { @@ -407,7 +410,7 @@ impl TunnelIo { // Ctrl frames must never be delayed — always drain fully. // Data frames are gated: keep data in the bounded channel for proper // backpressure when TLS writes are slow. Without this gate, the internal - // data_queue (unbounded VecDeque) grows to hundreds of MB under throttle → OOM. + // data_queue (unbounded VecDeque) grows to hundreds of MB under throttle -> OOM. let mut got_new = false; loop { match ctrl_rx.poll_recv(cx) { @@ -469,14 +472,14 @@ mod tests { let mut buf = vec![0u8; FRAME_HEADER_SIZE + payload.len()]; buf[FRAME_HEADER_SIZE..].copy_from_slice(payload); encode_frame_header(&mut buf, 42, FRAME_DATA, payload.len()); - assert_eq!(buf, encode_frame(42, FRAME_DATA, payload)); + assert_eq!(buf, &encode_frame(42, FRAME_DATA, payload)[..]); } #[test] fn test_encode_frame_header_empty_payload() { let mut buf = vec![0u8; FRAME_HEADER_SIZE]; encode_frame_header(&mut buf, 99, FRAME_CLOSE, 0); - assert_eq!(buf, encode_frame(99, FRAME_CLOSE, &[])); + assert_eq!(buf, &encode_frame(99, FRAME_CLOSE, &[])[..]); } #[test] @@ -644,7 +647,7 @@ mod tests { let frame = reader.next_frame().await.unwrap().unwrap(); assert_eq!(frame.stream_id, i as u32); assert_eq!(frame.frame_type, ft); - assert_eq!(frame.payload, format!("payload_{}", i).as_bytes()); + assert_eq!(&frame.payload[..], format!("payload_{}", i).as_bytes()); } assert!(reader.next_frame().await.unwrap().is_none()); @@ -653,7 +656,7 @@ mod tests { #[tokio::test] async fn test_frame_reader_zero_length_payload() { let data = encode_frame(42, FRAME_CLOSE, &[]); - let cursor = std::io::Cursor::new(data); + let cursor = std::io::Cursor::new(data.to_vec()); let mut reader = FrameReader::new(cursor); let frame = reader.next_frame().await.unwrap().unwrap(); @@ -681,7 +684,7 @@ mod tests { #[test] fn test_adaptive_window_zero_streams() { - // 0 streams treated as 1: 800MB/1 → clamped to 16MB max + // 0 streams treated as 1: 800MB/1 -> clamped to 16MB max assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW); } @@ -718,13 +721,13 @@ mod tests { #[test] fn test_adaptive_window_500_streams_clamped() { - // 800MB/500 = 1.6MB → clamped up to 4MB floor + // 800MB/500 = 1.6MB -> clamped up to 4MB floor assert_eq!(compute_window_for_stream_count(500), 4 * 1024 * 1024); } #[test] fn test_adaptive_window_max_u32() { - // Extreme: u32::MAX streams → tiny value → clamped to 4MB + // Extreme: u32::MAX streams -> tiny value -> clamped to 4MB assert_eq!(compute_window_for_stream_count(u32::MAX), 4 * 1024 * 1024); } @@ -740,7 +743,7 @@ mod tests { #[test] fn test_adaptive_window_total_budget_bounded() { - // active × per_stream_window should never exceed 800MB (+ clamp overhead for high N) + // active x per_stream_window should never exceed 800MB (+ clamp overhead for high N) for n in [1, 10, 50, 100, 200] { let w = compute_window_for_stream_count(n); let total = w as u64 * n as u64; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 2a492a2..c1a5df6 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.17', + version: '4.8.18', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }