From 6cbe8bee5e3b6cf646818af0d2e9dc842399913e Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Wed, 18 Mar 2026 00:13:14 +0000 Subject: [PATCH] feat(protocol): add sustained-stream tunnel scheduling to isolate high-throughput traffic --- changelog.md | 7 + rust/crates/remoteingress-core/src/edge.rs | 36 ++++- rust/crates/remoteingress-core/src/hub.rs | 36 ++++- rust/crates/remoteingress-protocol/src/lib.rs | 132 +++++++++++++++--- ts/00_commitinfo_data.ts | 2 +- 5 files changed, 181 insertions(+), 32 deletions(-) diff --git a/changelog.md b/changelog.md index 9218db0..d9bf8e4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-18 - 4.9.0 - feat(protocol) +add sustained-stream tunnel scheduling to isolate high-throughput traffic + +- Introduce a third low-priority sustained queue in TunnelIo with a forced drain budget to prevent long-lived high-bandwidth streams from starving control and normal data frames. +- Classify upload and download streams as sustained after exceeding the throughput threshold for the minimum duration, and route their DATA and CLOSE frames through the sustained channel. +- Wire the new sustained channel through edge and hub stream handling so sustained traffic is scheduled consistently on both sides of the tunnel. + ## 2026-03-18 - 4.8.19 - fix(remoteingress-protocol) reduce per-stream flow control windows and increase control channel buffering diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 5bf1542..a498de6 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -293,6 +293,7 @@ async fn handle_edge_frame( event_tx: &mpsc::Sender, tunnel_writer_tx: &mpsc::Sender, tunnel_data_tx: &mpsc::Sender, + tunnel_sustained_tx: &mpsc::Sender, port_listeners: &mut HashMap>, active_streams: &Arc, next_stream_id: &Arc, @@ -343,6 +344,7 @@ async fn handle_edge_frame( port_listeners, tunnel_writer_tx, tunnel_data_tx, + tunnel_sustained_tx, client_writers, active_streams, next_stream_id, @@ -499,6 +501,7 @@ async fn connect_to_hub_and_run( // Stream handlers send through these channels → TunnelIo drains them. let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::(512); let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::(4096); + let (tunnel_sustained_tx, mut tunnel_sustained_rx) = mpsc::channel::(4096); let tunnel_writer_tx = tunnel_ctrl_tx.clone(); // Start TCP listeners for initial ports @@ -509,6 +512,7 @@ async fn connect_to_hub_and_run( &mut port_listeners, &tunnel_writer_tx, &tunnel_data_tx, + &tunnel_sustained_tx, &client_writers, active_streams, next_stream_id, @@ -540,7 +544,7 @@ async fn connect_to_hub_and_run( liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame( frame, &mut tunnel_io, &client_writers, listen_ports, event_tx, - &tunnel_writer_tx, &tunnel_data_tx, &mut port_listeners, + &tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners, active_streams, next_stream_id, &config.edge_id, connection_token, bind_address, ).await { break 'io_loop EdgeLoopResult::Reconnect(reason); @@ -549,7 +553,7 @@ async fn connect_to_hub_and_run( // Poll I/O: write(ctrl→data), flush, read, channels, timers let event = std::future::poll_fn(|cx| { - tunnel_io.poll_step(cx, &mut tunnel_ctrl_rx, &mut tunnel_data_rx, &mut liveness_deadline, connection_token) + tunnel_io.poll_step(cx, &mut tunnel_ctrl_rx, &mut tunnel_data_rx, &mut tunnel_sustained_rx, &mut liveness_deadline, connection_token) }).await; match event { @@ -558,7 +562,7 @@ async fn connect_to_hub_and_run( liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame( frame, &mut tunnel_io, &client_writers, listen_ports, event_tx, - &tunnel_writer_tx, &tunnel_data_tx, &mut port_listeners, + &tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners, active_streams, next_stream_id, &config.edge_id, connection_token, bind_address, ).await { break EdgeLoopResult::Reconnect(reason); @@ -615,6 +619,7 @@ fn apply_port_config( port_listeners: &mut HashMap>, tunnel_ctrl_tx: &mpsc::Sender, tunnel_data_tx: &mpsc::Sender, + tunnel_sustained_tx: &mpsc::Sender, client_writers: &Arc>>, active_streams: &Arc, next_stream_id: &Arc, @@ -637,6 +642,7 @@ fn apply_port_config( for &port in new_set.difference(&old_set) { let tunnel_ctrl_tx = tunnel_ctrl_tx.clone(); let tunnel_data_tx = tunnel_data_tx.clone(); + let tunnel_sustained_tx = tunnel_sustained_tx.clone(); let client_writers = client_writers.clone(); let active_streams = active_streams.clone(); let next_stream_id = next_stream_id.clone(); @@ -671,6 +677,7 @@ fn apply_port_config( let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed); let tunnel_ctrl_tx = tunnel_ctrl_tx.clone(); let tunnel_data_tx = tunnel_data_tx.clone(); + let tunnel_sustained_tx = tunnel_sustained_tx.clone(); let client_writers = client_writers.clone(); let active_streams = active_streams.clone(); let edge_id = edge_id.clone(); @@ -687,6 +694,7 @@ fn apply_port_config( &edge_id, tunnel_ctrl_tx, tunnel_data_tx, + tunnel_sustained_tx, client_writers, client_token, Arc::clone(&active_streams), @@ -730,6 +738,7 @@ async fn handle_client_connection( edge_id: &str, tunnel_ctrl_tx: mpsc::Sender, tunnel_data_tx: mpsc::Sender, + tunnel_sustained_tx: mpsc::Sender, client_writers: Arc>>, client_token: CancellationToken, active_streams: Arc, @@ -833,6 +842,9 @@ async fn handle_client_connection( // Task: client -> hub (upload direction) with per-stream flow control. // Zero-copy: read payload directly after the header, then prepend header. let mut buf = vec![0u8; FRAME_HEADER_SIZE + 32768]; + let mut stream_bytes_sent: u64 = 0; + let stream_start = tokio::time::Instant::now(); + let mut is_sustained = false; loop { // Wait for send window to have capacity (with stall timeout). // Safe pattern: register notified BEFORE checking the condition @@ -873,8 +885,21 @@ async fn handle_client_connection( send_window.fetch_sub(n as u32, Ordering::Release); encode_frame_header(&mut buf, stream_id, FRAME_DATA, n); let data_frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]); + // Sustained classification: >2.5 MB/s for >10 seconds + stream_bytes_sent += n as u64; + if !is_sustained { + let elapsed = stream_start.elapsed().as_secs(); + if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS + && stream_bytes_sent / elapsed >= remoteingress_protocol::SUSTAINED_THRESHOLD_BPS + { + is_sustained = true; + log::debug!("Stream {} classified as sustained (upload, {} bytes in {}s)", + stream_id, stream_bytes_sent, elapsed); + } + } + let tx = if is_sustained { &tunnel_sustained_tx } else { &tunnel_data_tx }; let sent = tokio::select! { - result = tunnel_data_tx.send(data_frame) => result.is_ok(), + result = tx.send(data_frame) => result.is_ok(), _ = client_token.cancelled() => false, }; if !sent { break; } @@ -901,8 +926,9 @@ async fn handle_client_connection( // select! with cancellation guard prevents indefinite blocking if tunnel dies. if !client_token.is_cancelled() { let close_frame = encode_frame(stream_id, FRAME_CLOSE, &[]); + let tx = if is_sustained { &tunnel_sustained_tx } else { &tunnel_data_tx }; tokio::select! { - _ = tunnel_data_tx.send(close_frame) => {} + _ = tx.send(close_frame) => {} _ = client_token.cancelled() => {} } } diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 85ad108..91b3c68 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -310,6 +310,7 @@ async fn handle_hub_frame( event_tx: &mpsc::Sender, ctrl_tx: &mpsc::Sender, data_tx: &mpsc::Sender, + sustained_tx: &mpsc::Sender, target_host: &str, edge_token: &CancellationToken, cleanup_tx: &mpsc::Sender, @@ -338,6 +339,7 @@ async fn handle_hub_frame( let cleanup = cleanup_tx.clone(); let writer_tx = ctrl_tx.clone(); // control: CLOSE_BACK, WINDOW_UPDATE_BACK let data_writer_tx = data_tx.clone(); // data: DATA_BACK + let sustained_writer_tx = sustained_tx.clone(); // sustained: DATA_BACK from elephant flows let target = target_host.to_string(); let stream_token = edge_token.child_token(); @@ -458,6 +460,9 @@ async fn handle_hub_frame( // with per-stream flow control (check send_window before reading). // Zero-copy: read payload directly after the header, then prepend header. let mut buf = vec![0u8; FRAME_HEADER_SIZE + 32768]; + let mut dl_bytes_sent: u64 = 0; + let dl_start = tokio::time::Instant::now(); + let mut is_sustained = false; loop { // Wait for send window to have capacity (with stall timeout). // Safe pattern: register notified BEFORE checking the condition @@ -498,8 +503,21 @@ async fn handle_hub_frame( send_window.fetch_sub(n as u32, Ordering::Release); encode_frame_header(&mut buf, stream_id, FRAME_DATA_BACK, n); let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]); + // Sustained classification: >2.5 MB/s for >10 seconds + dl_bytes_sent += n as u64; + if !is_sustained { + let elapsed = dl_start.elapsed().as_secs(); + if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS + && dl_bytes_sent / elapsed >= remoteingress_protocol::SUSTAINED_THRESHOLD_BPS + { + is_sustained = true; + log::debug!("Stream {} classified as sustained (download, {} bytes in {}s)", + stream_id, dl_bytes_sent, elapsed); + } + } + let tx = if is_sustained { &sustained_writer_tx } else { &data_writer_tx }; let sent = tokio::select! { - result = data_writer_tx.send(frame) => result.is_ok(), + result = tx.send(frame) => result.is_ok(), _ = stream_token.cancelled() => false, }; if !sent { break; } @@ -511,12 +529,13 @@ async fn handle_hub_frame( } } - // Send CLOSE_BACK via DATA channel (must arrive AFTER last DATA_BACK). + // Send CLOSE_BACK via same channel as DATA_BACK (must arrive AFTER last DATA_BACK). // select! with cancellation guard prevents indefinite blocking if tunnel dies. if !stream_token.is_cancelled() { let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]); + let tx = if is_sustained { &sustained_writer_tx } else { &data_writer_tx }; tokio::select! { - _ = data_writer_tx.send(close_frame) => {} + _ = tx.send(close_frame) => {} _ = stream_token.cancelled() => {} } } @@ -528,7 +547,9 @@ async fn handle_hub_frame( if let Err(e) = result { log::error!("Stream {} error: {}", stream_id, e); - // Send CLOSE_BACK via DATA channel on error (must arrive after any DATA_BACK). + // Send CLOSE_BACK on error (must arrive after any DATA_BACK). + // Error path: is_sustained not available here, use data channel (safe — + // if error occurs before classification, no sustained frames were sent). if !stream_token.is_cancelled() { let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]); tokio::select! { @@ -710,6 +731,7 @@ async fn handle_edge_connection( // Stream handlers send through these channels -> TunnelIo drains them. let (ctrl_tx, mut ctrl_rx) = mpsc::channel::(512); let (data_tx, mut data_rx) = mpsc::channel::(4096); + let (sustained_tx, mut sustained_rx) = mpsc::channel::(4096); // Spawn task to forward config updates as FRAME_CONFIG frames let config_writer_tx = ctrl_tx.clone(); @@ -783,7 +805,7 @@ async fn handle_edge_connection( liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let FrameAction::Disconnect(reason) = handle_hub_frame( frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count, - &edge_id, &event_tx, &ctrl_tx, &data_tx, &target_host, &edge_token, + &edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token, &cleanup_tx, ).await { disconnect_reason = reason; @@ -797,7 +819,7 @@ async fn handle_edge_connection( if ping_ticker.poll_tick(cx).is_ready() { tunnel_io.queue_ctrl(encode_frame(0, FRAME_PING, &[])); } - tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut liveness_deadline, &edge_token) + tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut sustained_rx, &mut liveness_deadline, &edge_token) }).await; match event { @@ -806,7 +828,7 @@ async fn handle_edge_connection( liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let FrameAction::Disconnect(reason) = handle_hub_frame( frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count, - &edge_id, &event_tx, &ctrl_tx, &data_tx, &target_host, &edge_token, + &edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token, &cleanup_tx, ).await { disconnect_reason = reason; diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 3468d9d..dab2811 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -2,8 +2,10 @@ use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use bytes::{Bytes, BytesMut, BufMut}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; +use tokio::time::Instant; // Frame type constants pub const FRAME_OPEN: u8 = 0x01; @@ -31,6 +33,16 @@ pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2; /// Maximum window size to prevent overflow. pub const MAX_WINDOW_SIZE: u32 = 4 * 1024 * 1024; +// Sustained stream classification constants +/// Throughput threshold for sustained classification (2.5 MB/s = 20 Mbit/s). +pub const SUSTAINED_THRESHOLD_BPS: u64 = 2_500_000; +/// Minimum duration before a stream can be classified as sustained. +pub const SUSTAINED_MIN_DURATION_SECS: u64 = 10; +/// Fixed window for sustained streams (1 MB — the floor). +pub const SUSTAINED_WINDOW: u32 = 1 * 1024 * 1024; +/// Maximum bytes written from sustained queue per forced drain (1 MB/s guarantee). +pub const SUSTAINED_FORCED_DRAIN_CAP: usize = 1_048_576; + /// Encode a WINDOW_UPDATE frame for a specific stream. pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes { encode_frame(stream_id, frame_type, &increment.to_be_bytes()) @@ -185,24 +197,30 @@ pub enum TunnelEvent { /// Write state extracted into a sub-struct so the borrow checker can see /// disjoint field access between `self.write` and `self.stream`. struct WriteState { - ctrl_queue: VecDeque, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first - data_queue: VecDeque, // DATA, DATA_BACK — only when ctrl is empty - offset: usize, // progress within current frame being written + ctrl_queue: VecDeque, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first + data_queue: VecDeque, // DATA, DATA_BACK — only when ctrl is empty + sustained_queue: VecDeque, // DATA, DATA_BACK from sustained streams — lowest priority + offset: usize, // progress within current frame being written flush_needed: bool, + // Sustained starvation prevention: guaranteed 1 MB/s drain + sustained_last_drain: Instant, + sustained_bytes_this_period: usize, } impl WriteState { fn has_work(&self) -> bool { - !self.ctrl_queue.is_empty() || !self.data_queue.is_empty() + !self.ctrl_queue.is_empty() || !self.data_queue.is_empty() || !self.sustained_queue.is_empty() } } /// Single-owner I/O engine for the tunnel TLS connection. /// /// Owns the TLS stream directly — no `tokio::io::split()`, no mutex. -/// Uses two priority write queues: ctrl frames (PONG, WINDOW_UPDATE, CLOSE, OPEN) -/// are ALWAYS written before data frames (DATA, DATA_BACK). This prevents -/// WINDOW_UPDATE starvation that causes flow control deadlocks. +/// Uses three priority write queues: +/// 1. ctrl (PONG, WINDOW_UPDATE, CLOSE, OPEN) — always first +/// 2. data (DATA, DATA_BACK from normal streams) — when ctrl empty +/// 3. sustained (DATA, DATA_BACK from sustained streams) — lowest priority, +/// drained freely when ctrl+data empty, or forced 1MB/s when they're not pub struct TunnelIo { stream: S, // Read state: accumulate bytes, parse frames incrementally @@ -228,8 +246,11 @@ impl TunnelIo { write: WriteState { ctrl_queue: VecDeque::new(), data_queue: VecDeque::new(), + sustained_queue: VecDeque::new(), offset: 0, flush_needed: false, + sustained_last_drain: Instant::now(), + sustained_bytes_this_period: 0, }, } } @@ -244,6 +265,11 @@ impl TunnelIo { self.write.data_queue.push_back(frame); } + /// Queue a lowest-priority sustained data frame. + pub fn queue_sustained(&mut self, frame: Bytes) { + self.write.sustained_queue.push_back(frame); + } + /// Try to parse a complete frame from the read buffer. /// Uses a parse_pos cursor to avoid drain() on every frame. pub fn try_parse_frame(&mut self) -> Option> { @@ -303,33 +329,42 @@ impl TunnelIo { /// Poll-based I/O step. Returns Ready on events, Pending when idle. /// - /// Order: write(ctrl->data) -> flush -> read -> channels -> timers + /// Order: write(ctrl->data->sustained) -> flush -> read -> channels -> timers pub fn poll_step( &mut self, cx: &mut Context<'_>, ctrl_rx: &mut tokio::sync::mpsc::Receiver, data_rx: &mut tokio::sync::mpsc::Receiver, + sustained_rx: &mut tokio::sync::mpsc::Receiver, liveness_deadline: &mut Pin>, cancel_token: &tokio_util::sync::CancellationToken, ) -> Poll { - // 1. WRITE: drain ctrl queue first, then data queue. + // 1. WRITE: 3-tier priority — ctrl first, then data, then sustained. + // Sustained drains freely when ctrl+data are empty. // Write one frame, set flush_needed, then flush must complete before // writing more. This prevents unbounded TLS session buffer growth. // Safe: `self.write` and `self.stream` are disjoint fields. let mut writes = 0; while self.write.has_work() && writes < 16 && !self.write.flush_needed { - let from_ctrl = !self.write.ctrl_queue.is_empty(); - let frame = if from_ctrl { - self.write.ctrl_queue.front().unwrap() + // Pick queue: ctrl > data > sustained + let queue_id = if !self.write.ctrl_queue.is_empty() { + 0 // ctrl + } else if !self.write.data_queue.is_empty() { + 1 // data } else { - self.write.data_queue.front().unwrap() + 2 // sustained + }; + let frame = match queue_id { + 0 => self.write.ctrl_queue.front().unwrap(), + 1 => self.write.data_queue.front().unwrap(), + _ => self.write.sustained_queue.front().unwrap(), }; let remaining = &frame[self.write.offset..]; match Pin::new(&mut self.stream).poll_write(cx, remaining) { Poll::Ready(Ok(0)) => { - log::error!("TunnelIo: poll_write returned 0 (write zero), ctrl_q={} data_q={}", - self.write.ctrl_queue.len(), self.write.data_queue.len()); + log::error!("TunnelIo: poll_write returned 0 (write zero), ctrl_q={} data_q={} sustained_q={}", + self.write.ctrl_queue.len(), self.write.data_queue.len(), self.write.sustained_queue.len()); return Poll::Ready(TunnelEvent::WriteError( std::io::Error::new(std::io::ErrorKind::WriteZero, "write zero"), )); @@ -338,21 +373,70 @@ impl TunnelIo { self.write.offset += n; self.write.flush_needed = true; if self.write.offset >= frame.len() { - if from_ctrl { self.write.ctrl_queue.pop_front(); } - else { self.write.data_queue.pop_front(); } + match queue_id { + 0 => { self.write.ctrl_queue.pop_front(); } + 1 => { self.write.data_queue.pop_front(); } + _ => { + self.write.sustained_queue.pop_front(); + self.write.sustained_last_drain = Instant::now(); + self.write.sustained_bytes_this_period = 0; + } + } self.write.offset = 0; writes += 1; } } Poll::Ready(Err(e)) => { - log::error!("TunnelIo: poll_write error: {} (ctrl_q={} data_q={})", - e, self.write.ctrl_queue.len(), self.write.data_queue.len()); + log::error!("TunnelIo: poll_write error: {} (ctrl_q={} data_q={} sustained_q={})", + e, self.write.ctrl_queue.len(), self.write.data_queue.len(), self.write.sustained_queue.len()); return Poll::Ready(TunnelEvent::WriteError(e)); } Poll::Pending => break, } } + // 1b. FORCED SUSTAINED DRAIN: when ctrl/data have work but sustained is waiting, + // guarantee at least 1 MB/s by draining up to SUSTAINED_FORCED_DRAIN_CAP + // once per second. + if !self.write.sustained_queue.is_empty() + && (!self.write.ctrl_queue.is_empty() || !self.write.data_queue.is_empty()) + && !self.write.flush_needed + { + let now = Instant::now(); + if now.duration_since(self.write.sustained_last_drain) >= Duration::from_secs(1) { + self.write.sustained_bytes_this_period = 0; + self.write.sustained_last_drain = now; + + while !self.write.sustained_queue.is_empty() + && self.write.sustained_bytes_this_period < SUSTAINED_FORCED_DRAIN_CAP + && !self.write.flush_needed + { + let frame = self.write.sustained_queue.front().unwrap(); + let remaining = &frame[self.write.offset..]; + match Pin::new(&mut self.stream).poll_write(cx, remaining) { + Poll::Ready(Ok(0)) => { + return Poll::Ready(TunnelEvent::WriteError( + std::io::Error::new(std::io::ErrorKind::WriteZero, "write zero"), + )); + } + Poll::Ready(Ok(n)) => { + self.write.offset += n; + self.write.flush_needed = true; + self.write.sustained_bytes_this_period += n; + if self.write.offset >= frame.len() { + self.write.sustained_queue.pop_front(); + self.write.offset = 0; + } + } + Poll::Ready(Err(e)) => { + return Poll::Ready(TunnelEvent::WriteError(e)); + } + Poll::Pending => break, + } + } + } + } + // 2. FLUSH: push encrypted data from TLS session to TCP. if self.write.flush_needed { match Pin::new(&mut self.stream).poll_flush(cx) { @@ -436,6 +520,16 @@ impl TunnelIo { } } } + // Sustained channel: drain when sustained_queue is small (same backpressure pattern). + // Channel close is non-fatal — not all connections have sustained streams. + if self.write.sustained_queue.len() < 64 { + loop { + match sustained_rx.poll_recv(cx) { + Poll::Ready(Some(frame)) => { self.write.sustained_queue.push_back(frame); got_new = true; } + Poll::Ready(None) | Poll::Pending => break, + } + } + } // 5. TIMERS if liveness_deadline.as_mut().poll(cx).is_ready() { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 710d576..7aecf6a 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.19', + version: '4.9.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }