fix(rust-userspace-nat): improve TCP session backpressure, buffering, and idle cleanup in userspace NAT

This commit is contained in:
2026-03-31 08:58:27 +00:00
parent fe9c693ac8
commit 99a8a29ff1
3 changed files with 103 additions and 49 deletions

View File

@@ -1,5 +1,13 @@
# Changelog # Changelog
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT
- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data
- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps
- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up
- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay
## 2026-03-31 - 1.16.4 - fix(server) ## 2026-03-31 - 1.16.4 - fix(server)
register preloaded WireGuard clients as peers on server startup register preloaded WireGuard clients as peers on server startup

View File

@@ -17,6 +17,10 @@ use crate::acl;
use crate::server::{DestinationPolicyConfig, ServerState}; use crate::server::{DestinationPolicyConfig, ServerState};
use crate::tunnel; use crate::tunnel;
/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer).
/// Sessions exceeding this are aborted — the client cannot keep up.
const TCP_PENDING_SEND_MAX: usize = 512 * 1024;
// ============================================================================ // ============================================================================
// Virtual IP device for smoltcp // Virtual IP device for smoltcp
// ============================================================================ // ============================================================================
@@ -121,7 +125,8 @@ struct SessionKey {
struct TcpSession { struct TcpSession {
smoltcp_handle: SocketHandle, smoltcp_handle: SocketHandle,
bridge_data_tx: mpsc::Sender<Vec<u8>>, /// Channel to send data to the bridge task. None until bridge starts.
bridge_data_tx: Option<mpsc::Sender<Vec<u8>>>,
#[allow(dead_code)] #[allow(dead_code)]
client_ip: Ipv4Addr, client_ip: Ipv4Addr,
/// Bridge task has been spawned (deferred until handshake completes) /// Bridge task has been spawned (deferred until handshake completes)
@@ -132,6 +137,8 @@ struct TcpSession {
pending_send: Vec<u8>, pending_send: Vec<u8>,
/// Session is closing (FIN in progress), don't accept new SYNs /// Session is closing (FIN in progress), don't accept new SYNs
closing: bool, closing: bool,
/// Last time data flowed through this session (for idle timeout)
last_activity: tokio::time::Instant,
} }
struct UdpSession { struct UdpSession {
@@ -385,17 +392,15 @@ impl NatEngine {
let handle = self.sockets.add(socket); let handle = self.sockets.add(socket);
// Channel for sending data from NAT engine to bridge task
let (data_tx, _data_rx) = mpsc::channel::<Vec<u8>>(256);
let session = TcpSession { let session = TcpSession {
smoltcp_handle: handle, smoltcp_handle: handle,
bridge_data_tx: data_tx, bridge_data_tx: None,
client_ip: key.src_ip, client_ip: key.src_ip,
bridge_started: false, bridge_started: false,
connect_addr, connect_addr,
pending_send: Vec::new(), pending_send: Vec::new(),
closing: false, closing: false,
last_activity: tokio::time::Instant::now(),
}; };
self.tcp_sessions.insert(key.clone(), session); self.tcp_sessions.insert(key.clone(), session);
@@ -470,9 +475,8 @@ impl NatEngine {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle); let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.is_active() { if socket.is_active() {
session.bridge_started = true; session.bridge_started = true;
// Recreate the data channel — the old receiver was dropped
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256); let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
session.bridge_data_tx = data_tx; session.bridge_data_tx = Some(data_tx);
let btx = bridge_tx_clone.clone(); let btx = bridge_tx_clone.clone();
let k = key.clone(); let k = key.clone();
let addr = session.connect_addr; let addr = session.connect_addr;
@@ -503,15 +507,29 @@ impl NatEngine {
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks // Bridge: read data from smoltcp TCP sockets → send to bridge tasks
let mut closed_tcp: Vec<SessionKey> = Vec::new(); let mut closed_tcp: Vec<SessionKey> = Vec::new();
let mut tcp_outbound: Vec<(mpsc::Sender<Vec<u8>>, Vec<u8>)> = Vec::new(); let mut active_tcp: Vec<SessionKey> = Vec::new();
for (key, session) in &self.tcp_sessions { for (key, session) in &self.tcp_sessions {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle); let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if session.bridge_started && socket.can_recv() { if session.bridge_started && socket.can_recv() {
let sender = session.bridge_data_tx.clone(); if let Some(ref sender) = session.bridge_data_tx {
let _ = socket.recv(|data| { // Reserve channel slot BEFORE consuming from smoltcp.
tcp_outbound.push((sender.clone(), data.to_vec())); // If the channel is full, we don't consume — smoltcp's RX buffer
(data.len(), ()) // fills up, it stops advertising TCP window space, and the VPN
}); // client's TCP stack backs off. Proper end-to-end backpressure.
match sender.try_reserve() {
Ok(permit) => {
let _ = socket.recv(|data| {
permit.send(data.to_vec());
(data.len(), ())
});
active_tcp.push(key.clone());
}
Err(_) => {
debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure",
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
}
}
}
} }
// Detect closed connections // Detect closed connections
if !socket.is_open() && !socket.is_listening() { if !socket.is_open() && !socket.is_listening() {
@@ -519,10 +537,11 @@ impl NatEngine {
} }
} }
// Send TCP data to bridge tasks (outside borrow of self.tcp_sessions) // Update last_activity for sessions that had data flow
for (sender, data) in tcp_outbound { let now = tokio::time::Instant::now();
if sender.try_send(data).is_err() { for key in active_tcp {
debug!("NAT: bridge channel full, TCP data dropped"); if let Some(session) = self.tcp_sessions.get_mut(&key) {
session.last_activity = now;
} }
} }
@@ -561,39 +580,31 @@ impl NatEngine {
match msg { match msg {
BridgeMessage::TcpData { key, data } => { BridgeMessage::TcpData { key, data } => {
if let Some(session) = self.tcp_sessions.get_mut(&key) { if let Some(session) = self.tcp_sessions.get_mut(&key) {
session.last_activity = tokio::time::Instant::now();
// Append to pending buffer, then flush as much as possible
session.pending_send.extend_from_slice(&data);
let socket = let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle); self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() { if socket.can_send() && !session.pending_send.is_empty() {
// Try to write directly first match socket.send_slice(&session.pending_send) {
let all_data = if session.pending_send.is_empty() { Ok(written) if written > 0 => {
&data session.pending_send.drain(..written);
} else {
session.pending_send.extend_from_slice(&data);
&session.pending_send.clone()
};
match socket.send_slice(all_data) {
Ok(written) if written < all_data.len() => {
// Partial write — buffer the rest
if session.pending_send.is_empty() {
session.pending_send = data[written..].to_vec();
} else {
session.pending_send.drain(..written);
}
}
Ok(_) => {
// Full write — clear any pending data
session.pending_send.clear();
}
Err(_) => {
// Write failed — buffer everything
if session.pending_send.is_empty() {
session.pending_send = data;
}
} }
_ => {}
} }
} else { }
// Can't send yet — buffer for later // Cap check — abort session if client can't keep up
session.pending_send.extend_from_slice(&data); if session.pending_send.len() > TCP_PENDING_SEND_MAX {
warn!(
"NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting",
key.src_ip, key.src_port, key.dst_ip, key.dst_port,
TCP_PENDING_SEND_MAX / 1024
);
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.abort();
session.pending_send.clear();
session.closing = true;
} }
} }
} }
@@ -644,6 +655,29 @@ impl NatEngine {
} }
} }
fn cleanup_idle_tcp_sessions(&mut self) {
let timeout = Duration::from_secs(300); // 5 minutes
let now = tokio::time::Instant::now();
let expired: Vec<SessionKey> = self
.tcp_sessions
.iter()
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
.map(|(k, _)| k.clone())
.collect();
for key in expired {
if let Some(session) = self.tcp_sessions.remove(&key) {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.abort();
self.sockets.remove(session.smoltcp_handle);
warn!(
"NAT: TCP session timed out {}:{} -> {}:{}",
key.src_ip, key.src_port, key.dst_ip, key.dst_port
);
}
}
}
/// Main async event loop for the NAT engine. /// Main async event loop for the NAT engine.
pub async fn run( pub async fn run(
mut self, mut self,
@@ -651,9 +685,13 @@ impl NatEngine {
mut shutdown_rx: mpsc::Receiver<()>, mut shutdown_rx: mpsc::Receiver<()>,
) -> Result<()> { ) -> Result<()> {
info!("Userspace NAT engine started"); info!("Userspace NAT engine started");
let mut timer = tokio::time::interval(Duration::from_millis(50)); let default_poll_delay = Duration::from_millis(50);
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10)); let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
// Dynamic poll timer — reset after each event using smoltcp's poll_delay()
let poll_sleep = tokio::time::sleep(default_poll_delay);
tokio::pin!(poll_sleep);
loop { loop {
tokio::select! { tokio::select! {
Some(packet) = packet_rx.recv() => { Some(packet) = packet_rx.recv() => {
@@ -664,18 +702,26 @@ impl NatEngine {
self.handle_bridge_message(msg); self.handle_bridge_message(msg);
self.process().await; self.process().await;
} }
_ = timer.tick() => { () = &mut poll_sleep => {
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.) // Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
self.process().await; self.process().await;
} }
_ = cleanup_timer.tick() => { _ = cleanup_timer.tick() => {
self.cleanup_idle_udp_sessions(); self.cleanup_idle_udp_sessions();
self.cleanup_idle_tcp_sessions();
} }
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
info!("Userspace NAT engine shutting down"); info!("Userspace NAT engine shutting down");
break; break;
} }
} }
// Reset poll delay based on smoltcp's actual timer needs
let now = self.smoltcp_now();
let delay = self.iface.poll_delay(now, &self.sockets)
.map(|d| Duration::from_millis(d.total_millis()))
.unwrap_or(default_poll_delay);
poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay);
} }
Ok(()) Ok(())

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartvpn', name: '@push.rocks/smartvpn',
version: '1.16.4', version: '1.16.5',
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
} }