diff --git a/changelog.md b/changelog.md index bdbee88..2d46e31 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.8.13 - fix(remoteingress-protocol) +require a flush after each written frame to bound TLS buffer growth + +- Remove the unflushed byte threshold and stop queueing additional writes while a flush is pending +- Simplify write and flush error logging after dropping unflushed byte tracking +- Update tunnel I/O comments to reflect the stricter flush behavior that avoids OOM and connection resets + ## 2026-03-17 - 4.8.12 - fix(tunnel) prevent tunnel backpressure buffering from exhausting memory and cancel stream handlers before TLS shutdown diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 26f60a9..627e1c7 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -183,11 +183,6 @@ pub enum TunnelEvent { Cancelled, } -/// Maximum bytes written to TLS session buffer before requiring a flush. -/// Prevents unbounded buffer growth (OOM) while keeping the pipe saturated. -/// 256KB ≈ typical TCP send buffer — enough to fill the socket on each flush. -const MAX_UNFLUSHED_BYTES: usize = 256 * 1024; - /// Write state extracted into a sub-struct so the borrow checker can see /// disjoint field access between `self.write` and `self.stream`. struct WriteState { @@ -195,7 +190,6 @@ struct WriteState { data_queue: VecDeque>, // DATA, DATA_BACK — only when ctrl is empty offset: usize, // progress within current frame being written flush_needed: bool, - unflushed_bytes: usize, // bytes written to TLS since last successful flush } impl WriteState { @@ -237,7 +231,6 @@ impl TunnelIo { data_queue: VecDeque::new(), offset: 0, flush_needed: false, - unflushed_bytes: 0, }, } } @@ -319,15 +312,11 @@ impl TunnelIo { cancel_token: &tokio_util::sync::CancellationToken, ) -> Poll { // 1. WRITE: drain ctrl queue first, then data queue. - // Allow up to MAX_UNFLUSHED_BYTES in the TLS session buffer before - // requiring a flush. This keeps the pipe saturated (unlike waiting for - // flush to complete) while preventing unbounded buffer growth (OOM). + // Write one frame, set flush_needed, then flush must complete before + // writing more. This prevents unbounded TLS session buffer growth. // Safe: `self.write` and `self.stream` are disjoint fields. let mut writes = 0; - while self.write.has_work() && writes < 16 - && self.write.unflushed_bytes < MAX_UNFLUSHED_BYTES - && !self.write.flush_needed - { + while self.write.has_work() && writes < 16 && !self.write.flush_needed { let from_ctrl = !self.write.ctrl_queue.is_empty(); let frame = if from_ctrl { self.write.ctrl_queue.front().unwrap() @@ -338,8 +327,8 @@ impl TunnelIo { match Pin::new(&mut self.stream).poll_write(cx, remaining) { Poll::Ready(Ok(0)) => { - log::error!("TunnelIo: poll_write returned 0 (write zero), ctrl_q={} data_q={} unflushed={}", - self.write.ctrl_queue.len(), self.write.data_queue.len(), self.write.unflushed_bytes); + log::error!("TunnelIo: poll_write returned 0 (write zero), ctrl_q={} data_q={}", + self.write.ctrl_queue.len(), self.write.data_queue.len()); return Poll::Ready(TunnelEvent::WriteError( std::io::Error::new(std::io::ErrorKind::WriteZero, "write zero"), )); @@ -347,7 +336,6 @@ impl TunnelIo { Poll::Ready(Ok(n)) => { self.write.offset += n; self.write.flush_needed = true; - self.write.unflushed_bytes += n; if self.write.offset >= frame.len() { if from_ctrl { self.write.ctrl_queue.pop_front(); } else { self.write.data_queue.pop_front(); } @@ -356,8 +344,8 @@ impl TunnelIo { } } Poll::Ready(Err(e)) => { - log::error!("TunnelIo: poll_write error: {} (ctrl_q={} data_q={} unflushed={})", - e, self.write.ctrl_queue.len(), self.write.data_queue.len(), self.write.unflushed_bytes); + log::error!("TunnelIo: poll_write error: {} (ctrl_q={} data_q={})", + e, self.write.ctrl_queue.len(), self.write.data_queue.len()); return Poll::Ready(TunnelEvent::WriteError(e)); } Poll::Pending => break, @@ -369,10 +357,9 @@ impl TunnelIo { match Pin::new(&mut self.stream).poll_flush(cx) { Poll::Ready(Ok(())) => { self.write.flush_needed = false; - self.write.unflushed_bytes = 0; } Poll::Ready(Err(e)) => { - log::error!("TunnelIo: poll_flush error: {} (unflushed={})", e, self.write.unflushed_bytes); + log::error!("TunnelIo: poll_flush error: {}", e); return Poll::Ready(TunnelEvent::WriteError(e)); } Poll::Pending => {} // TCP waker will notify us @@ -461,9 +448,7 @@ impl TunnelIo { // When flush is Pending, the TCP write-readiness waker will notify us. // CRITICAL: do NOT self-wake when flush_needed — poll_write always returns // Ready (TLS buffers in-memory), so self-waking causes a tight spin loop - // that fills the TLS session buffer unboundedly → OOM → ECONNRESET. - // The write loop's MAX_UNFLUSHED_BYTES gate allows up to 64KB per poll_step - // even across flush boundaries, keeping the pipe saturated without spinning. + // that fills the TLS session buffer unboundedly -> OOM -> ECONNRESET. if !self.write.flush_needed && (got_new || self.write.has_work()) { cx.waker().wake_by_ref(); } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 2fe93d8..f4a73a9 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.12', + version: '4.8.13', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }