Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 13d0183e9d | |||
| 99a8a29ff1 | |||
| fe9c693ac8 | |||
| 20ef92599b | |||
| c3f180e264 | |||
| 667e5ff3de |
22
changelog.md
22
changelog.md
@@ -1,5 +1,27 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
|
||||
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT
|
||||
|
||||
- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data
|
||||
- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps
|
||||
- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up
|
||||
- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay
|
||||
|
||||
## 2026-03-31 - 1.16.4 - fix(server)
|
||||
register preloaded WireGuard clients as peers on server startup
|
||||
|
||||
- Adds configured clients from the runtime registry to the WireGuard listener when the server starts.
|
||||
- Ensures clients loaded from config can complete WireGuard handshakes without requiring separate peer registration.
|
||||
- Logs a warning if automatic peer registration fails for an individual client.
|
||||
|
||||
## 2026-03-31 - 1.16.3 - fix(rust-nat)
|
||||
defer TCP bridge startup until handshake completion and buffer partial NAT socket writes
|
||||
|
||||
- Start TCP bridge tasks only after the smoltcp socket becomes active to prevent server data from arriving before the client handshake completes.
|
||||
- Buffer pending TCP payloads and flush partial writes so bridge-to-socket data is not silently lost under backpressure.
|
||||
- Keep closing TCP sessions alive until FIN processing completes and add logging for dropped packets when bridge or route channels are full.
|
||||
|
||||
## 2026-03-31 - 1.16.2 - fix(wireguard)
|
||||
sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartvpn",
|
||||
"version": "1.16.2",
|
||||
"version": "1.16.5",
|
||||
"private": false,
|
||||
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
|
||||
"type": "module",
|
||||
|
||||
@@ -372,6 +372,28 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
info!("VPN server started (transport: {})", transport_mode);
|
||||
|
||||
// Register pre-loaded clients (from config.clients) as WG peers.
|
||||
// The WG listener only starts with config.wg_peers; clients loaded into the
|
||||
// registry need to be dynamically added so WG handshakes work.
|
||||
if self.wg_command_tx.is_some() {
|
||||
let registry = state.client_registry.read().await;
|
||||
for entry in registry.list() {
|
||||
if let (Some(ref wg_key), Some(ref ip_str)) = (&entry.wg_public_key, &entry.assigned_ip) {
|
||||
let peer_config = crate::wireguard::WgPeerConfig {
|
||||
public_key: wg_key.clone(),
|
||||
preshared_key: None,
|
||||
allowed_ips: vec![format!("{}/32", ip_str)],
|
||||
endpoint: None,
|
||||
persistent_keepalive: Some(25),
|
||||
};
|
||||
if let Err(e) = self.add_wg_peer(peer_config).await {
|
||||
warn!("Failed to register pre-loaded WG peer for {}: {}", entry.client_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,10 @@ use crate::acl;
|
||||
use crate::server::{DestinationPolicyConfig, ServerState};
|
||||
use crate::tunnel;
|
||||
|
||||
/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer).
|
||||
/// Sessions exceeding this are aborted — the client cannot keep up.
|
||||
const TCP_PENDING_SEND_MAX: usize = 512 * 1024;
|
||||
|
||||
// ============================================================================
|
||||
// Virtual IP device for smoltcp
|
||||
// ============================================================================
|
||||
@@ -101,7 +105,7 @@ impl Device for VirtualIpDevice {
|
||||
let mut caps = DeviceCapabilities::default();
|
||||
caps.medium = Medium::Ip;
|
||||
caps.max_transmission_unit = self.mtu;
|
||||
caps.max_burst_size = Some(1);
|
||||
caps.max_burst_size = None;
|
||||
caps
|
||||
}
|
||||
}
|
||||
@@ -121,9 +125,20 @@ struct SessionKey {
|
||||
|
||||
struct TcpSession {
|
||||
smoltcp_handle: SocketHandle,
|
||||
bridge_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Channel to send data to the bridge task. None until bridge starts.
|
||||
bridge_data_tx: Option<mpsc::Sender<Vec<u8>>>,
|
||||
#[allow(dead_code)]
|
||||
client_ip: Ipv4Addr,
|
||||
/// Bridge task has been spawned (deferred until handshake completes)
|
||||
bridge_started: bool,
|
||||
/// Address to connect the bridge task to (may differ from dst if policy rewrote it)
|
||||
connect_addr: SocketAddr,
|
||||
/// Buffered data from bridge waiting to be written to smoltcp socket
|
||||
pending_send: Vec<u8>,
|
||||
/// Session is closing (FIN in progress), don't accept new SYNs
|
||||
closing: bool,
|
||||
/// Last time data flowed through this session (for idle timeout)
|
||||
last_activity: tokio::time::Instant,
|
||||
}
|
||||
|
||||
struct UdpSession {
|
||||
@@ -308,7 +323,9 @@ impl NatEngine {
|
||||
|
||||
// SYN without ACK = new connection
|
||||
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
|
||||
if is_syn && !self.tcp_sessions.contains_key(&key) {
|
||||
// Skip if session exists (including closing sessions — let FIN complete)
|
||||
let session_exists = self.tcp_sessions.contains_key(&key);
|
||||
if is_syn && !session_exists {
|
||||
match self.evaluate_destination(dst_ip, dst_port) {
|
||||
DestinationAction::Drop => {
|
||||
debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
|
||||
@@ -375,23 +392,22 @@ impl NatEngine {
|
||||
|
||||
let handle = self.sockets.add(socket);
|
||||
|
||||
// Channel for sending data from NAT engine to bridge task
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
|
||||
let session = TcpSession {
|
||||
smoltcp_handle: handle,
|
||||
bridge_data_tx: data_tx,
|
||||
bridge_data_tx: None,
|
||||
client_ip: key.src_ip,
|
||||
bridge_started: false,
|
||||
connect_addr,
|
||||
pending_send: Vec::new(),
|
||||
closing: false,
|
||||
last_activity: tokio::time::Instant::now(),
|
||||
};
|
||||
self.tcp_sessions.insert(key.clone(), session);
|
||||
|
||||
// Spawn bridge task that connects to the resolved destination
|
||||
let bridge_tx = self.bridge_tx.clone();
|
||||
let key_clone = key.clone();
|
||||
let proxy_protocol = self.proxy_protocol;
|
||||
tokio::spawn(async move {
|
||||
tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await;
|
||||
});
|
||||
// NOTE: Bridge task is NOT spawned here — it will be spawned in process()
|
||||
// once the smoltcp handshake completes (socket.is_active() == true).
|
||||
// This prevents data from the real server arriving before the VPN client
|
||||
// handshake is done, which would cause silent data loss.
|
||||
|
||||
debug!(
|
||||
"NAT: new TCP session {}:{} -> {}:{}",
|
||||
@@ -451,15 +467,69 @@ impl NatEngine {
|
||||
self.iface
|
||||
.poll(now, &mut self.device, &mut self.sockets);
|
||||
|
||||
// Start bridge tasks for sessions whose handshake just completed
|
||||
let bridge_tx_clone = self.bridge_tx.clone();
|
||||
let proxy_protocol = self.proxy_protocol;
|
||||
for (key, session) in self.tcp_sessions.iter_mut() {
|
||||
if !session.bridge_started && !session.closing {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.is_active() {
|
||||
session.bridge_started = true;
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
session.bridge_data_tx = Some(data_tx);
|
||||
let btx = bridge_tx_clone.clone();
|
||||
let k = key.clone();
|
||||
let addr = session.connect_addr;
|
||||
let pp = proxy_protocol;
|
||||
tokio::spawn(async move {
|
||||
tcp_bridge_task(k, data_rx, btx, pp, addr).await;
|
||||
});
|
||||
debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush pending send buffers to smoltcp sockets
|
||||
for (_key, session) in self.tcp_sessions.iter_mut() {
|
||||
if !session.pending_send.is_empty() {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_send() {
|
||||
match socket.send_slice(&session.pending_send) {
|
||||
Ok(written) if written > 0 => {
|
||||
session.pending_send.drain(..written);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks
|
||||
let mut closed_tcp: Vec<SessionKey> = Vec::new();
|
||||
let mut active_tcp: Vec<SessionKey> = Vec::new();
|
||||
for (key, session) in &self.tcp_sessions {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_recv() {
|
||||
let _ = socket.recv(|data| {
|
||||
let _ = session.bridge_data_tx.try_send(data.to_vec());
|
||||
(data.len(), ())
|
||||
});
|
||||
if session.bridge_started && socket.can_recv() {
|
||||
if let Some(ref sender) = session.bridge_data_tx {
|
||||
// Reserve channel slot BEFORE consuming from smoltcp.
|
||||
// If the channel is full, we don't consume — smoltcp's RX buffer
|
||||
// fills up, it stops advertising TCP window space, and the VPN
|
||||
// client's TCP stack backs off. Proper end-to-end backpressure.
|
||||
match sender.try_reserve() {
|
||||
Ok(permit) => {
|
||||
let _ = socket.recv(|data| {
|
||||
permit.send(data.to_vec());
|
||||
(data.len(), ())
|
||||
});
|
||||
active_tcp.push(key.clone());
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Detect closed connections
|
||||
if !socket.is_open() && !socket.is_listening() {
|
||||
@@ -467,6 +537,14 @@ impl NatEngine {
|
||||
}
|
||||
}
|
||||
|
||||
// Update last_activity for sessions that had data flow
|
||||
let now = tokio::time::Instant::now();
|
||||
for key in active_tcp {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
session.last_activity = now;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up closed TCP sessions
|
||||
for key in closed_tcp {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
@@ -479,7 +557,9 @@ impl NatEngine {
|
||||
for (_key, session) in &self.udp_sessions {
|
||||
let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
|
||||
while let Ok((data, _meta)) = socket.recv() {
|
||||
let _ = session.bridge_data_tx.try_send(data.to_vec());
|
||||
if session.bridge_data_tx.try_send(data.to_vec()).is_err() {
|
||||
debug!("NAT: bridge channel full, UDP data dropped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,7 +568,9 @@ impl NatEngine {
|
||||
for packet in self.device.drain_tx() {
|
||||
if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) {
|
||||
if let Some(sender) = routes.get(&dst_ip) {
|
||||
let _ = sender.try_send(packet);
|
||||
if sender.try_send(packet).is_err() {
|
||||
debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -497,22 +579,43 @@ impl NatEngine {
|
||||
fn handle_bridge_message(&mut self, msg: BridgeMessage) {
|
||||
match msg {
|
||||
BridgeMessage::TcpData { key, data } => {
|
||||
if let Some(session) = self.tcp_sessions.get(&key) {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
session.last_activity = tokio::time::Instant::now();
|
||||
// Append to pending buffer, then flush as much as possible
|
||||
session.pending_send.extend_from_slice(&data);
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_send() {
|
||||
let _ = socket.send_slice(&data);
|
||||
if socket.can_send() && !session.pending_send.is_empty() {
|
||||
match socket.send_slice(&session.pending_send) {
|
||||
Ok(written) if written > 0 => {
|
||||
session.pending_send.drain(..written);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// Cap check — abort session if client can't keep up
|
||||
if session.pending_send.len() > TCP_PENDING_SEND_MAX {
|
||||
warn!(
|
||||
"NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port,
|
||||
TCP_PENDING_SEND_MAX / 1024
|
||||
);
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.abort();
|
||||
session.pending_send.clear();
|
||||
session.closing = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BridgeMessage::TcpClosed { key } => {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.close();
|
||||
session.closing = true;
|
||||
// Don't remove from SocketSet yet — let smoltcp send FIN
|
||||
// It will be cleaned up in process() when is_open() returns false
|
||||
self.tcp_sessions.insert(key, session);
|
||||
}
|
||||
}
|
||||
BridgeMessage::UdpData { key, data } => {
|
||||
@@ -552,6 +655,29 @@ impl NatEngine {
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_idle_tcp_sessions(&mut self) {
|
||||
let timeout = Duration::from_secs(300); // 5 minutes
|
||||
let now = tokio::time::Instant::now();
|
||||
let expired: Vec<SessionKey> = self
|
||||
.tcp_sessions
|
||||
.iter()
|
||||
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
|
||||
for key in expired {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.abort();
|
||||
self.sockets.remove(session.smoltcp_handle);
|
||||
warn!(
|
||||
"NAT: TCP session timed out {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main async event loop for the NAT engine.
|
||||
pub async fn run(
|
||||
mut self,
|
||||
@@ -559,9 +685,13 @@ impl NatEngine {
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
info!("Userspace NAT engine started");
|
||||
let mut timer = tokio::time::interval(Duration::from_millis(50));
|
||||
let default_poll_delay = Duration::from_millis(50);
|
||||
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
|
||||
|
||||
// Dynamic poll timer — reset after each event using smoltcp's poll_delay()
|
||||
let poll_sleep = tokio::time::sleep(default_poll_delay);
|
||||
tokio::pin!(poll_sleep);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(packet) = packet_rx.recv() => {
|
||||
@@ -572,18 +702,26 @@ impl NatEngine {
|
||||
self.handle_bridge_message(msg);
|
||||
self.process().await;
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
() = &mut poll_sleep => {
|
||||
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
|
||||
self.process().await;
|
||||
}
|
||||
_ = cleanup_timer.tick() => {
|
||||
self.cleanup_idle_udp_sessions();
|
||||
self.cleanup_idle_tcp_sessions();
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Userspace NAT engine shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Reset poll delay based on smoltcp's actual timer needs
|
||||
let now = self.smoltcp_now();
|
||||
let delay = self.iface.poll_delay(now, &self.sockets)
|
||||
.map(|d| Duration::from_millis(d.total_millis()))
|
||||
.unwrap_or(default_poll_delay);
|
||||
poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartvpn',
|
||||
version: '1.16.2',
|
||||
version: '1.16.5',
|
||||
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user