From 99a8a29ff1a21c93b738ac668d7172acb070edd3 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 31 Mar 2026 08:58:27 +0000 Subject: [PATCH] fix(rust-userspace-nat): improve TCP session backpressure, buffering, and idle cleanup in userspace NAT --- changelog.md | 8 +++ rust/src/userspace_nat.rs | 142 +++++++++++++++++++++++++------------- ts/00_commitinfo_data.ts | 2 +- 3 files changed, 103 insertions(+), 49 deletions(-) diff --git a/changelog.md b/changelog.md index a1e6d65..bbb0623 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat) +improve TCP session backpressure, buffering, and idle cleanup in userspace NAT + +- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data +- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps +- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up +- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay + ## 2026-03-31 - 1.16.4 - fix(server) register preloaded WireGuard clients as peers on server startup diff --git a/rust/src/userspace_nat.rs b/rust/src/userspace_nat.rs index c3d961c..f931a0b 100644 --- a/rust/src/userspace_nat.rs +++ b/rust/src/userspace_nat.rs @@ -17,6 +17,10 @@ use crate::acl; use crate::server::{DestinationPolicyConfig, ServerState}; use crate::tunnel; +/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer). +/// Sessions exceeding this are aborted — the client cannot keep up. +const TCP_PENDING_SEND_MAX: usize = 512 * 1024; + // ============================================================================ // Virtual IP device for smoltcp // ============================================================================ @@ -121,7 +125,8 @@ struct SessionKey { struct TcpSession { smoltcp_handle: SocketHandle, - bridge_data_tx: mpsc::Sender>, + /// Channel to send data to the bridge task. None until bridge starts. + bridge_data_tx: Option>>, #[allow(dead_code)] client_ip: Ipv4Addr, /// Bridge task has been spawned (deferred until handshake completes) @@ -132,6 +137,8 @@ struct TcpSession { pending_send: Vec, /// Session is closing (FIN in progress), don't accept new SYNs closing: bool, + /// Last time data flowed through this session (for idle timeout) + last_activity: tokio::time::Instant, } struct UdpSession { @@ -385,17 +392,15 @@ impl NatEngine { let handle = self.sockets.add(socket); - // Channel for sending data from NAT engine to bridge task - let (data_tx, _data_rx) = mpsc::channel::>(256); - let session = TcpSession { smoltcp_handle: handle, - bridge_data_tx: data_tx, + bridge_data_tx: None, client_ip: key.src_ip, bridge_started: false, connect_addr, pending_send: Vec::new(), closing: false, + last_activity: tokio::time::Instant::now(), }; self.tcp_sessions.insert(key.clone(), session); @@ -470,9 +475,8 @@ impl NatEngine { let socket = self.sockets.get_mut::(session.smoltcp_handle); if socket.is_active() { session.bridge_started = true; - // Recreate the data channel — the old receiver was dropped let (data_tx, data_rx) = mpsc::channel::>(256); - session.bridge_data_tx = data_tx; + session.bridge_data_tx = Some(data_tx); let btx = bridge_tx_clone.clone(); let k = key.clone(); let addr = session.connect_addr; @@ -503,15 +507,29 @@ impl NatEngine { // Bridge: read data from smoltcp TCP sockets → send to bridge tasks let mut closed_tcp: Vec = Vec::new(); - let mut tcp_outbound: Vec<(mpsc::Sender>, Vec)> = Vec::new(); + let mut active_tcp: Vec = Vec::new(); for (key, session) in &self.tcp_sessions { let socket = self.sockets.get_mut::(session.smoltcp_handle); if session.bridge_started && socket.can_recv() { - let sender = session.bridge_data_tx.clone(); - let _ = socket.recv(|data| { - tcp_outbound.push((sender.clone(), data.to_vec())); - (data.len(), ()) - }); + if let Some(ref sender) = session.bridge_data_tx { + // Reserve channel slot BEFORE consuming from smoltcp. + // If the channel is full, we don't consume — smoltcp's RX buffer + // fills up, it stops advertising TCP window space, and the VPN + // client's TCP stack backs off. Proper end-to-end backpressure. + match sender.try_reserve() { + Ok(permit) => { + let _ = socket.recv(|data| { + permit.send(data.to_vec()); + (data.len(), ()) + }); + active_tcp.push(key.clone()); + } + Err(_) => { + debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure", + key.src_ip, key.src_port, key.dst_ip, key.dst_port); + } + } + } } // Detect closed connections if !socket.is_open() && !socket.is_listening() { @@ -519,10 +537,11 @@ impl NatEngine { } } - // Send TCP data to bridge tasks (outside borrow of self.tcp_sessions) - for (sender, data) in tcp_outbound { - if sender.try_send(data).is_err() { - debug!("NAT: bridge channel full, TCP data dropped"); + // Update last_activity for sessions that had data flow + let now = tokio::time::Instant::now(); + for key in active_tcp { + if let Some(session) = self.tcp_sessions.get_mut(&key) { + session.last_activity = now; } } @@ -561,39 +580,31 @@ impl NatEngine { match msg { BridgeMessage::TcpData { key, data } => { if let Some(session) = self.tcp_sessions.get_mut(&key) { + session.last_activity = tokio::time::Instant::now(); + // Append to pending buffer, then flush as much as possible + session.pending_send.extend_from_slice(&data); let socket = self.sockets.get_mut::(session.smoltcp_handle); - if socket.can_send() { - // Try to write directly first - let all_data = if session.pending_send.is_empty() { - &data - } else { - session.pending_send.extend_from_slice(&data); - &session.pending_send.clone() - }; - match socket.send_slice(all_data) { - Ok(written) if written < all_data.len() => { - // Partial write — buffer the rest - if session.pending_send.is_empty() { - session.pending_send = data[written..].to_vec(); - } else { - session.pending_send.drain(..written); - } - } - Ok(_) => { - // Full write — clear any pending data - session.pending_send.clear(); - } - Err(_) => { - // Write failed — buffer everything - if session.pending_send.is_empty() { - session.pending_send = data; - } + if socket.can_send() && !session.pending_send.is_empty() { + match socket.send_slice(&session.pending_send) { + Ok(written) if written > 0 => { + session.pending_send.drain(..written); } + _ => {} } - } else { - // Can't send yet — buffer for later - session.pending_send.extend_from_slice(&data); + } + // Cap check — abort session if client can't keep up + if session.pending_send.len() > TCP_PENDING_SEND_MAX { + warn!( + "NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting", + key.src_ip, key.src_port, key.dst_ip, key.dst_port, + TCP_PENDING_SEND_MAX / 1024 + ); + let socket = + self.sockets.get_mut::(session.smoltcp_handle); + socket.abort(); + session.pending_send.clear(); + session.closing = true; } } } @@ -644,6 +655,29 @@ impl NatEngine { } } + fn cleanup_idle_tcp_sessions(&mut self) { + let timeout = Duration::from_secs(300); // 5 minutes + let now = tokio::time::Instant::now(); + let expired: Vec = self + .tcp_sessions + .iter() + .filter(|(_, s)| now.duration_since(s.last_activity) > timeout) + .map(|(k, _)| k.clone()) + .collect(); + + for key in expired { + if let Some(session) = self.tcp_sessions.remove(&key) { + let socket = self.sockets.get_mut::(session.smoltcp_handle); + socket.abort(); + self.sockets.remove(session.smoltcp_handle); + warn!( + "NAT: TCP session timed out {}:{} -> {}:{}", + key.src_ip, key.src_port, key.dst_ip, key.dst_port + ); + } + } + } + /// Main async event loop for the NAT engine. pub async fn run( mut self, @@ -651,9 +685,13 @@ impl NatEngine { mut shutdown_rx: mpsc::Receiver<()>, ) -> Result<()> { info!("Userspace NAT engine started"); - let mut timer = tokio::time::interval(Duration::from_millis(50)); + let default_poll_delay = Duration::from_millis(50); let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10)); + // Dynamic poll timer — reset after each event using smoltcp's poll_delay() + let poll_sleep = tokio::time::sleep(default_poll_delay); + tokio::pin!(poll_sleep); + loop { tokio::select! { Some(packet) = packet_rx.recv() => { @@ -664,18 +702,26 @@ impl NatEngine { self.handle_bridge_message(msg); self.process().await; } - _ = timer.tick() => { + () = &mut poll_sleep => { // Periodic poll for smoltcp maintenance (TCP retransmit, etc.) self.process().await; } _ = cleanup_timer.tick() => { self.cleanup_idle_udp_sessions(); + self.cleanup_idle_tcp_sessions(); } _ = shutdown_rx.recv() => { info!("Userspace NAT engine shutting down"); break; } } + + // Reset poll delay based on smoltcp's actual timer needs + let now = self.smoltcp_now(); + let delay = self.iface.poll_delay(now, &self.sockets) + .map(|d| Duration::from_millis(d.total_millis())) + .unwrap_or(default_poll_delay); + poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay); } Ok(()) diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 17b6713..e683453 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartvpn', - version: '1.16.4', + version: '1.16.5', description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' }