4 Commits

5 changed files with 148 additions and 20 deletions

View File

@@ -1,5 +1,19 @@
# Changelog # Changelog
## 2026-03-31 - 1.16.4 - fix(server)
register preloaded WireGuard clients as peers on server startup
- Adds configured clients from the runtime registry to the WireGuard listener when the server starts.
- Ensures clients loaded from config can complete WireGuard handshakes without requiring separate peer registration.
- Logs a warning if automatic peer registration fails for an individual client.
## 2026-03-31 - 1.16.3 - fix(rust-nat)
defer TCP bridge startup until handshake completion and buffer partial NAT socket writes
- Start TCP bridge tasks only after the smoltcp socket becomes active to prevent server data from arriving before the client handshake completes.
- Buffer pending TCP payloads and flush partial writes so bridge-to-socket data is not silently lost under backpressure.
- Keep closing TCP sessions alive until FIN processing completes and add logging for dropped packets when bridge or route channels are full.
## 2026-03-31 - 1.16.2 - fix(wireguard) ## 2026-03-31 - 1.16.2 - fix(wireguard)
sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartvpn", "name": "@push.rocks/smartvpn",
"version": "1.16.2", "version": "1.16.4",
"private": false, "private": false,
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon", "description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
"type": "module", "type": "module",

View File

@@ -372,6 +372,28 @@ impl VpnServer {
} }
info!("VPN server started (transport: {})", transport_mode); info!("VPN server started (transport: {})", transport_mode);
// Register pre-loaded clients (from config.clients) as WG peers.
// The WG listener only starts with config.wg_peers; clients loaded into the
// registry need to be dynamically added so WG handshakes work.
if self.wg_command_tx.is_some() {
let registry = state.client_registry.read().await;
for entry in registry.list() {
if let (Some(ref wg_key), Some(ref ip_str)) = (&entry.wg_public_key, &entry.assigned_ip) {
let peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_key.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", ip_str)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(peer_config).await {
warn!("Failed to register pre-loaded WG peer for {}: {}", entry.client_id, e);
}
}
}
}
Ok(()) Ok(())
} }

View File

@@ -101,7 +101,7 @@ impl Device for VirtualIpDevice {
let mut caps = DeviceCapabilities::default(); let mut caps = DeviceCapabilities::default();
caps.medium = Medium::Ip; caps.medium = Medium::Ip;
caps.max_transmission_unit = self.mtu; caps.max_transmission_unit = self.mtu;
caps.max_burst_size = Some(1); caps.max_burst_size = None;
caps caps
} }
} }
@@ -124,6 +124,14 @@ struct TcpSession {
bridge_data_tx: mpsc::Sender<Vec<u8>>, bridge_data_tx: mpsc::Sender<Vec<u8>>,
#[allow(dead_code)] #[allow(dead_code)]
client_ip: Ipv4Addr, client_ip: Ipv4Addr,
/// Bridge task has been spawned (deferred until handshake completes)
bridge_started: bool,
/// Address to connect the bridge task to (may differ from dst if policy rewrote it)
connect_addr: SocketAddr,
/// Buffered data from bridge waiting to be written to smoltcp socket
pending_send: Vec<u8>,
/// Session is closing (FIN in progress), don't accept new SYNs
closing: bool,
} }
struct UdpSession { struct UdpSession {
@@ -308,7 +316,9 @@ impl NatEngine {
// SYN without ACK = new connection // SYN without ACK = new connection
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0; let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
if is_syn && !self.tcp_sessions.contains_key(&key) { // Skip if session exists (including closing sessions — let FIN complete)
let session_exists = self.tcp_sessions.contains_key(&key);
if is_syn && !session_exists {
match self.evaluate_destination(dst_ip, dst_port) { match self.evaluate_destination(dst_ip, dst_port) {
DestinationAction::Drop => { DestinationAction::Drop => {
debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port); debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
@@ -376,22 +386,23 @@ impl NatEngine {
let handle = self.sockets.add(socket); let handle = self.sockets.add(socket);
// Channel for sending data from NAT engine to bridge task // Channel for sending data from NAT engine to bridge task
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256); let (data_tx, _data_rx) = mpsc::channel::<Vec<u8>>(256);
let session = TcpSession { let session = TcpSession {
smoltcp_handle: handle, smoltcp_handle: handle,
bridge_data_tx: data_tx, bridge_data_tx: data_tx,
client_ip: key.src_ip, client_ip: key.src_ip,
bridge_started: false,
connect_addr,
pending_send: Vec::new(),
closing: false,
}; };
self.tcp_sessions.insert(key.clone(), session); self.tcp_sessions.insert(key.clone(), session);
// Spawn bridge task that connects to the resolved destination // NOTE: Bridge task is NOT spawned here — it will be spawned in process()
let bridge_tx = self.bridge_tx.clone(); // once the smoltcp handshake completes (socket.is_active() == true).
let key_clone = key.clone(); // This prevents data from the real server arriving before the VPN client
let proxy_protocol = self.proxy_protocol; // handshake is done, which would cause silent data loss.
tokio::spawn(async move {
tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await;
});
debug!( debug!(
"NAT: new TCP session {}:{} -> {}:{}", "NAT: new TCP session {}:{} -> {}:{}",
@@ -451,13 +462,54 @@ impl NatEngine {
self.iface self.iface
.poll(now, &mut self.device, &mut self.sockets); .poll(now, &mut self.device, &mut self.sockets);
// Start bridge tasks for sessions whose handshake just completed
let bridge_tx_clone = self.bridge_tx.clone();
let proxy_protocol = self.proxy_protocol;
for (key, session) in self.tcp_sessions.iter_mut() {
if !session.bridge_started && !session.closing {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.is_active() {
session.bridge_started = true;
// Recreate the data channel — the old receiver was dropped
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
session.bridge_data_tx = data_tx;
let btx = bridge_tx_clone.clone();
let k = key.clone();
let addr = session.connect_addr;
let pp = proxy_protocol;
tokio::spawn(async move {
tcp_bridge_task(k, data_rx, btx, pp, addr).await;
});
debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}",
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
}
}
}
// Flush pending send buffers to smoltcp sockets
for (_key, session) in self.tcp_sessions.iter_mut() {
if !session.pending_send.is_empty() {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() {
match socket.send_slice(&session.pending_send) {
Ok(written) if written > 0 => {
session.pending_send.drain(..written);
}
_ => {}
}
}
}
}
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks // Bridge: read data from smoltcp TCP sockets → send to bridge tasks
let mut closed_tcp: Vec<SessionKey> = Vec::new(); let mut closed_tcp: Vec<SessionKey> = Vec::new();
let mut tcp_outbound: Vec<(mpsc::Sender<Vec<u8>>, Vec<u8>)> = Vec::new();
for (key, session) in &self.tcp_sessions { for (key, session) in &self.tcp_sessions {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle); let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_recv() { if session.bridge_started && socket.can_recv() {
let sender = session.bridge_data_tx.clone();
let _ = socket.recv(|data| { let _ = socket.recv(|data| {
let _ = session.bridge_data_tx.try_send(data.to_vec()); tcp_outbound.push((sender.clone(), data.to_vec()));
(data.len(), ()) (data.len(), ())
}); });
} }
@@ -467,6 +519,13 @@ impl NatEngine {
} }
} }
// Send TCP data to bridge tasks (outside borrow of self.tcp_sessions)
for (sender, data) in tcp_outbound {
if sender.try_send(data).is_err() {
debug!("NAT: bridge channel full, TCP data dropped");
}
}
// Clean up closed TCP sessions // Clean up closed TCP sessions
for key in closed_tcp { for key in closed_tcp {
if let Some(session) = self.tcp_sessions.remove(&key) { if let Some(session) = self.tcp_sessions.remove(&key) {
@@ -479,7 +538,9 @@ impl NatEngine {
for (_key, session) in &self.udp_sessions { for (_key, session) in &self.udp_sessions {
let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle); let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
while let Ok((data, _meta)) = socket.recv() { while let Ok((data, _meta)) = socket.recv() {
let _ = session.bridge_data_tx.try_send(data.to_vec()); if session.bridge_data_tx.try_send(data.to_vec()).is_err() {
debug!("NAT: bridge channel full, UDP data dropped");
}
} }
} }
@@ -488,7 +549,9 @@ impl NatEngine {
for packet in self.device.drain_tx() { for packet in self.device.drain_tx() {
if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) { if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) {
if let Some(sender) = routes.get(&dst_ip) { if let Some(sender) = routes.get(&dst_ip) {
let _ = sender.try_send(packet); if sender.try_send(packet).is_err() {
debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip);
}
} }
} }
} }
@@ -497,22 +560,51 @@ impl NatEngine {
fn handle_bridge_message(&mut self, msg: BridgeMessage) { fn handle_bridge_message(&mut self, msg: BridgeMessage) {
match msg { match msg {
BridgeMessage::TcpData { key, data } => { BridgeMessage::TcpData { key, data } => {
if let Some(session) = self.tcp_sessions.get(&key) { if let Some(session) = self.tcp_sessions.get_mut(&key) {
let socket = let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle); self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() { if socket.can_send() {
let _ = socket.send_slice(&data); // Try to write directly first
let all_data = if session.pending_send.is_empty() {
&data
} else {
session.pending_send.extend_from_slice(&data);
&session.pending_send.clone()
};
match socket.send_slice(all_data) {
Ok(written) if written < all_data.len() => {
// Partial write — buffer the rest
if session.pending_send.is_empty() {
session.pending_send = data[written..].to_vec();
} else {
session.pending_send.drain(..written);
}
}
Ok(_) => {
// Full write — clear any pending data
session.pending_send.clear();
}
Err(_) => {
// Write failed — buffer everything
if session.pending_send.is_empty() {
session.pending_send = data;
}
}
}
} else {
// Can't send yet — buffer for later
session.pending_send.extend_from_slice(&data);
} }
} }
} }
BridgeMessage::TcpClosed { key } => { BridgeMessage::TcpClosed { key } => {
if let Some(session) = self.tcp_sessions.remove(&key) { if let Some(session) = self.tcp_sessions.get_mut(&key) {
let socket = let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle); self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.close(); socket.close();
session.closing = true;
// Don't remove from SocketSet yet — let smoltcp send FIN // Don't remove from SocketSet yet — let smoltcp send FIN
// It will be cleaned up in process() when is_open() returns false // It will be cleaned up in process() when is_open() returns false
self.tcp_sessions.insert(key, session);
} }
} }
BridgeMessage::UdpData { key, data } => { BridgeMessage::UdpData { key, data } => {

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartvpn', name: '@push.rocks/smartvpn',
version: '1.16.2', version: '1.16.4',
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
} }