12 Commits

7 changed files with 527 additions and 64 deletions

View File

@@ -1,5 +1,48 @@
# Changelog
## 2026-03-31 - 1.17.0 - feat(wireguard)
track per-transport server statistics and make WireGuard clients active only after handshake
- add websocket, quic, and wireguard active-client and total-connection counters to server statistics
- register WireGuard peers without marking them active until handshake/data is received, and remove them from active clients on expiration or idle timeout
- sync WireGuard byte counters into aggregate server stats independently of active client presence and expose new statistics fields in TypeScript interfaces
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT
- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data
- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps
- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up
- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay
## 2026-03-31 - 1.16.4 - fix(server)
register preloaded WireGuard clients as peers on server startup
- Adds configured clients from the runtime registry to the WireGuard listener when the server starts.
- Ensures clients loaded from config can complete WireGuard handshakes without requiring separate peer registration.
- Logs a warning if automatic peer registration fails for an individual client.
## 2026-03-31 - 1.16.3 - fix(rust-nat)
defer TCP bridge startup until handshake completion and buffer partial NAT socket writes
- Start TCP bridge tasks only after the smoltcp socket becomes active to prevent server data from arriving before the client handshake completes.
- Buffer pending TCP payloads and flush partial writes so bridge-to-socket data is not silently lost under backpressure.
- Keep closing TCP sessions alive until FIN processing completes and add logging for dropped packets when bridge or route channels are full.
## 2026-03-31 - 1.16.2 - fix(wireguard)
sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key
- Register, remove, and rotate WireGuard peers in the running listener when clients are added, deleted, or rekeyed.
- Generate client WireGuard configs with the public key derived from the configured WireGuard private key instead of reusing the generic server public key.
- Handle expired WireGuard sessions by re-initiating handshakes and mark client state as handshaking until the tunnel becomes active.
- Improve allowed IP matching and peer VPN IP extraction for runtime packet routing.
## 2026-03-30 - 1.16.1 - fix(rust/server)
add serde alias for clientAllowedIPs in server config
- Accepts the camelCase clientAllowedIPs field when deserializing server configuration.
- Improves compatibility with existing or external configuration formats without changing runtime behavior.
## 2026-03-30 - 1.16.0 - feat(server)
add configurable client endpoint and allowed IPs for generated VPN configs

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartvpn",
"version": "1.16.0",
"version": "1.17.0",
"private": false,
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
"type": "module",

View File

@@ -7,7 +7,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{info, error, warn};
use tracing::{debug, info, error, warn};
use crate::acl;
use crate::client_registry::{ClientEntry, ClientRegistry};
@@ -90,6 +90,7 @@ pub struct ServerConfig {
pub server_endpoint: Option<String>,
/// AllowedIPs for generated WireGuard client configs.
/// Defaults to ["0.0.0.0/0"] (full tunnel).
#[serde(alias = "clientAllowedIPs")]
pub client_allowed_ips: Option<Vec<String>>,
}
@@ -131,6 +132,14 @@ pub struct ServerStatistics {
pub uptime_seconds: u64,
pub active_clients: u64,
pub total_connections: u64,
/// Per-transport active client counts.
pub active_clients_websocket: u64,
pub active_clients_quic: u64,
pub active_clients_wireguard: u64,
/// Per-transport total connection counts.
pub total_connections_websocket: u64,
pub total_connections_quic: u64,
pub total_connections_wireguard: u64,
}
/// The forwarding engine determines how decrypted IP packets are routed.
@@ -371,6 +380,28 @@ impl VpnServer {
}
info!("VPN server started (transport: {})", transport_mode);
// Register pre-loaded clients (from config.clients) as WG peers.
// The WG listener only starts with config.wg_peers; clients loaded into the
// registry need to be dynamically added so WG handshakes work.
if self.wg_command_tx.is_some() {
let registry = state.client_registry.read().await;
for entry in registry.list() {
if let (Some(ref wg_key), Some(ref ip_str)) = (&entry.wg_public_key, &entry.assigned_ip) {
let peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_key.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", ip_str)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(peer_config).await {
warn!("Failed to register pre-loaded WG peer for {}: {}", entry.client_id, e);
}
}
}
}
Ok(())
}
@@ -427,7 +458,21 @@ impl VpnServer {
if let Some(ref state) = self.state {
let mut stats = state.stats.read().await.clone();
stats.uptime_seconds = state.started_at.elapsed().as_secs();
stats.active_clients = state.clients.read().await.len() as u64;
let clients = state.clients.read().await;
stats.active_clients = clients.len() as u64;
// Compute per-transport active counts
stats.active_clients_websocket = 0;
stats.active_clients_quic = 0;
stats.active_clients_wireguard = 0;
for info in clients.values() {
match info.transport_type.as_str() {
"websocket" => stats.active_clients_websocket += 1,
"quic" => stats.active_clients_quic += 1,
"wireguard" => stats.active_clients_wireguard += 1,
_ => {}
}
}
drop(clients);
stats
} else {
ServerStatistics::default()
@@ -593,6 +638,20 @@ impl VpnServer {
// Add to registry
state.client_registry.write().await.add(entry.clone())?;
// Register WG peer with the running WG listener (if active)
if self.wg_command_tx.is_some() {
let wg_peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_pub.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", assigned_ip)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(wg_peer_config).await {
warn!("Failed to register WG peer for client {}: {}", client_id, e);
}
}
// Build SmartVPN client config
let smartvpn_server_url = format!("wss://{}",
state.config.server_endpoint.as_deref()
@@ -609,6 +668,10 @@ impl VpnServer {
});
// Build WireGuard config string
let wg_server_pubkey = match &state.config.wg_private_key {
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
None => state.config.public_key.clone(),
};
let wg_endpoint = state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr);
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
@@ -621,7 +684,7 @@ impl VpnServer {
state.config.dns.as_ref()
.map(|d| format!("DNS = {}", d.join(", ")))
.unwrap_or_default(),
state.config.public_key,
wg_server_pubkey,
wg_allowed_ips,
wg_endpoint,
);
@@ -644,6 +707,14 @@ impl VpnServer {
let state = self.state.as_ref()
.ok_or_else(|| anyhow::anyhow!("Server not running"))?;
let entry = state.client_registry.write().await.remove(client_id)?;
// Remove WG peer from running listener
if self.wg_command_tx.is_some() {
if let Some(ref wg_key) = entry.wg_public_key {
if let Err(e) = self.remove_wg_peer(wg_key).await {
debug!("Failed to remove WG peer for client {}: {}", client_id, e);
}
}
}
// Release the IP if assigned
if let Some(ref ip_str) = entry.assigned_ip {
if let Ok(ip) = ip_str.parse::<Ipv4Addr>() {
@@ -730,6 +801,14 @@ impl VpnServer {
let state = self.state.as_ref()
.ok_or_else(|| anyhow::anyhow!("Server not running"))?;
// Capture old WG key before rotation (needed to remove from WG listener)
let old_wg_pub = {
let registry = state.client_registry.read().await;
let entry = registry.get_by_id(client_id)
.ok_or_else(|| anyhow::anyhow!("Client '{}' not found", client_id))?;
entry.wg_public_key.clone()
};
let (noise_pub, noise_priv) = crypto::generate_keypair()?;
let (wg_pub, wg_priv) = crate::wireguard::generate_wg_keypair();
@@ -748,6 +827,25 @@ impl VpnServer {
.and_then(|v| v.as_str())
.unwrap_or("0.0.0.0");
// Update WG listener: remove old peer, add new peer
if self.wg_command_tx.is_some() {
if let Some(ref old_key) = old_wg_pub {
if let Err(e) = self.remove_wg_peer(old_key).await {
debug!("Failed to remove old WG peer during rotation: {}", e);
}
}
let wg_peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_pub.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", assigned_ip)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(wg_peer_config).await {
warn!("Failed to register new WG peer during rotation: {}", e);
}
}
let smartvpn_server_url = format!("wss://{}",
state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr)
@@ -762,6 +860,10 @@ impl VpnServer {
"keepaliveIntervalSecs": state.config.keepalive_interval_secs,
});
let wg_server_pubkey = match &state.config.wg_private_key {
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
None => state.config.public_key.clone(),
};
let wg_endpoint = state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr);
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
@@ -773,7 +875,7 @@ impl VpnServer {
state.config.dns.as_ref()
.map(|d| format!("DNS = {}", d.join(", ")))
.unwrap_or_default(),
state.config.public_key,
wg_server_pubkey,
wg_allowed_ips,
wg_endpoint,
);
@@ -815,6 +917,10 @@ impl VpnServer {
}))
}
"wireguard" => {
let wg_server_pubkey = match &state.config.wg_private_key {
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
None => state.config.public_key.clone(),
};
let assigned_ip = entry.assigned_ip.as_deref().unwrap_or("0.0.0.0");
let wg_endpoint = state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr);
@@ -827,7 +933,7 @@ impl VpnServer {
state.config.dns.as_ref()
.map(|d| format!("DNS = {}", d.join(", ")))
.unwrap_or_default(),
state.config.public_key,
wg_server_pubkey,
wg_allowed_ips,
wg_endpoint,
);
@@ -1219,6 +1325,11 @@ async fn handle_client_connection(
{
let mut stats = state.stats.write().await;
stats.total_connections += 1;
match transport_type {
"websocket" => stats.total_connections_websocket += 1,
"quic" => stats.total_connections_quic += 1,
_ => {}
}
}
// Send assigned IP info (encrypted), include effective MTU

View File

@@ -17,6 +17,10 @@ use crate::acl;
use crate::server::{DestinationPolicyConfig, ServerState};
use crate::tunnel;
/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer).
/// Sessions exceeding this are aborted — the client cannot keep up.
const TCP_PENDING_SEND_MAX: usize = 512 * 1024;
// ============================================================================
// Virtual IP device for smoltcp
// ============================================================================
@@ -101,7 +105,7 @@ impl Device for VirtualIpDevice {
let mut caps = DeviceCapabilities::default();
caps.medium = Medium::Ip;
caps.max_transmission_unit = self.mtu;
caps.max_burst_size = Some(1);
caps.max_burst_size = None;
caps
}
}
@@ -121,9 +125,20 @@ struct SessionKey {
struct TcpSession {
smoltcp_handle: SocketHandle,
bridge_data_tx: mpsc::Sender<Vec<u8>>,
/// Channel to send data to the bridge task. None until bridge starts.
bridge_data_tx: Option<mpsc::Sender<Vec<u8>>>,
#[allow(dead_code)]
client_ip: Ipv4Addr,
/// Bridge task has been spawned (deferred until handshake completes)
bridge_started: bool,
/// Address to connect the bridge task to (may differ from dst if policy rewrote it)
connect_addr: SocketAddr,
/// Buffered data from bridge waiting to be written to smoltcp socket
pending_send: Vec<u8>,
/// Session is closing (FIN in progress), don't accept new SYNs
closing: bool,
/// Last time data flowed through this session (for idle timeout)
last_activity: tokio::time::Instant,
}
struct UdpSession {
@@ -308,7 +323,9 @@ impl NatEngine {
// SYN without ACK = new connection
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
if is_syn && !self.tcp_sessions.contains_key(&key) {
// Skip if session exists (including closing sessions — let FIN complete)
let session_exists = self.tcp_sessions.contains_key(&key);
if is_syn && !session_exists {
match self.evaluate_destination(dst_ip, dst_port) {
DestinationAction::Drop => {
debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
@@ -375,23 +392,22 @@ impl NatEngine {
let handle = self.sockets.add(socket);
// Channel for sending data from NAT engine to bridge task
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
let session = TcpSession {
smoltcp_handle: handle,
bridge_data_tx: data_tx,
bridge_data_tx: None,
client_ip: key.src_ip,
bridge_started: false,
connect_addr,
pending_send: Vec::new(),
closing: false,
last_activity: tokio::time::Instant::now(),
};
self.tcp_sessions.insert(key.clone(), session);
// Spawn bridge task that connects to the resolved destination
let bridge_tx = self.bridge_tx.clone();
let key_clone = key.clone();
let proxy_protocol = self.proxy_protocol;
tokio::spawn(async move {
tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await;
});
// NOTE: Bridge task is NOT spawned here — it will be spawned in process()
// once the smoltcp handshake completes (socket.is_active() == true).
// This prevents data from the real server arriving before the VPN client
// handshake is done, which would cause silent data loss.
debug!(
"NAT: new TCP session {}:{} -> {}:{}",
@@ -451,15 +467,69 @@ impl NatEngine {
self.iface
.poll(now, &mut self.device, &mut self.sockets);
// Start bridge tasks for sessions whose handshake just completed
let bridge_tx_clone = self.bridge_tx.clone();
let proxy_protocol = self.proxy_protocol;
for (key, session) in self.tcp_sessions.iter_mut() {
if !session.bridge_started && !session.closing {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.is_active() {
session.bridge_started = true;
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
session.bridge_data_tx = Some(data_tx);
let btx = bridge_tx_clone.clone();
let k = key.clone();
let addr = session.connect_addr;
let pp = proxy_protocol;
tokio::spawn(async move {
tcp_bridge_task(k, data_rx, btx, pp, addr).await;
});
debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}",
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
}
}
}
// Flush pending send buffers to smoltcp sockets
for (_key, session) in self.tcp_sessions.iter_mut() {
if !session.pending_send.is_empty() {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() {
match socket.send_slice(&session.pending_send) {
Ok(written) if written > 0 => {
session.pending_send.drain(..written);
}
_ => {}
}
}
}
}
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks
let mut closed_tcp: Vec<SessionKey> = Vec::new();
let mut active_tcp: Vec<SessionKey> = Vec::new();
for (key, session) in &self.tcp_sessions {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_recv() {
let _ = socket.recv(|data| {
let _ = session.bridge_data_tx.try_send(data.to_vec());
(data.len(), ())
});
if session.bridge_started && socket.can_recv() {
if let Some(ref sender) = session.bridge_data_tx {
// Reserve channel slot BEFORE consuming from smoltcp.
// If the channel is full, we don't consume — smoltcp's RX buffer
// fills up, it stops advertising TCP window space, and the VPN
// client's TCP stack backs off. Proper end-to-end backpressure.
match sender.try_reserve() {
Ok(permit) => {
let _ = socket.recv(|data| {
permit.send(data.to_vec());
(data.len(), ())
});
active_tcp.push(key.clone());
}
Err(_) => {
debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure",
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
}
}
}
}
// Detect closed connections
if !socket.is_open() && !socket.is_listening() {
@@ -467,6 +537,14 @@ impl NatEngine {
}
}
// Update last_activity for sessions that had data flow
let now = tokio::time::Instant::now();
for key in active_tcp {
if let Some(session) = self.tcp_sessions.get_mut(&key) {
session.last_activity = now;
}
}
// Clean up closed TCP sessions
for key in closed_tcp {
if let Some(session) = self.tcp_sessions.remove(&key) {
@@ -479,7 +557,9 @@ impl NatEngine {
for (_key, session) in &self.udp_sessions {
let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
while let Ok((data, _meta)) = socket.recv() {
let _ = session.bridge_data_tx.try_send(data.to_vec());
if session.bridge_data_tx.try_send(data.to_vec()).is_err() {
debug!("NAT: bridge channel full, UDP data dropped");
}
}
}
@@ -488,7 +568,9 @@ impl NatEngine {
for packet in self.device.drain_tx() {
if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) {
if let Some(sender) = routes.get(&dst_ip) {
let _ = sender.try_send(packet);
if sender.try_send(packet).is_err() {
debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip);
}
}
}
}
@@ -497,22 +579,43 @@ impl NatEngine {
fn handle_bridge_message(&mut self, msg: BridgeMessage) {
match msg {
BridgeMessage::TcpData { key, data } => {
if let Some(session) = self.tcp_sessions.get(&key) {
if let Some(session) = self.tcp_sessions.get_mut(&key) {
session.last_activity = tokio::time::Instant::now();
// Append to pending buffer, then flush as much as possible
session.pending_send.extend_from_slice(&data);
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() {
let _ = socket.send_slice(&data);
if socket.can_send() && !session.pending_send.is_empty() {
match socket.send_slice(&session.pending_send) {
Ok(written) if written > 0 => {
session.pending_send.drain(..written);
}
_ => {}
}
}
// Cap check — abort session if client can't keep up
if session.pending_send.len() > TCP_PENDING_SEND_MAX {
warn!(
"NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting",
key.src_ip, key.src_port, key.dst_ip, key.dst_port,
TCP_PENDING_SEND_MAX / 1024
);
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.abort();
session.pending_send.clear();
session.closing = true;
}
}
}
BridgeMessage::TcpClosed { key } => {
if let Some(session) = self.tcp_sessions.remove(&key) {
if let Some(session) = self.tcp_sessions.get_mut(&key) {
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.close();
session.closing = true;
// Don't remove from SocketSet yet — let smoltcp send FIN
// It will be cleaned up in process() when is_open() returns false
self.tcp_sessions.insert(key, session);
}
}
BridgeMessage::UdpData { key, data } => {
@@ -552,6 +655,29 @@ impl NatEngine {
}
}
fn cleanup_idle_tcp_sessions(&mut self) {
let timeout = Duration::from_secs(300); // 5 minutes
let now = tokio::time::Instant::now();
let expired: Vec<SessionKey> = self
.tcp_sessions
.iter()
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
.map(|(k, _)| k.clone())
.collect();
for key in expired {
if let Some(session) = self.tcp_sessions.remove(&key) {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.abort();
self.sockets.remove(session.smoltcp_handle);
warn!(
"NAT: TCP session timed out {}:{} -> {}:{}",
key.src_ip, key.src_port, key.dst_ip, key.dst_port
);
}
}
}
/// Main async event loop for the NAT engine.
pub async fn run(
mut self,
@@ -559,9 +685,13 @@ impl NatEngine {
mut shutdown_rx: mpsc::Receiver<()>,
) -> Result<()> {
info!("Userspace NAT engine started");
let mut timer = tokio::time::interval(Duration::from_millis(50));
let default_poll_delay = Duration::from_millis(50);
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
// Dynamic poll timer — reset after each event using smoltcp's poll_delay()
let poll_sleep = tokio::time::sleep(default_poll_delay);
tokio::pin!(poll_sleep);
loop {
tokio::select! {
Some(packet) = packet_rx.recv() => {
@@ -572,18 +702,26 @@ impl NatEngine {
self.handle_bridge_message(msg);
self.process().await;
}
_ = timer.tick() => {
() = &mut poll_sleep => {
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
self.process().await;
}
_ = cleanup_timer.tick() => {
self.cleanup_idle_udp_sessions();
self.cleanup_idle_tcp_sessions();
}
_ = shutdown_rx.recv() => {
info!("Userspace NAT engine shutting down");
break;
}
}
// Reset poll delay based on smoltcp's actual timer needs
let now = self.smoltcp_now();
let delay = self.iface.poll_delay(now, &self.sockets)
.map(|d| Duration::from_millis(d.total_millis()))
.unwrap_or(default_poll_delay);
poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay);
}
Ok(())

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use boringtun::noise::errors::WireGuardError;
use boringtun::noise::rate_limiter::RateLimiter;
use boringtun::noise::{Tunn, TunnResult};
use boringtun::x25519::{PublicKey, StaticSecret};
@@ -99,6 +100,13 @@ pub fn generate_wg_keypair() -> (String, String) {
(pub_b64, priv_b64)
}
/// Derive the WireGuard public key (base64) from a private key (base64).
pub fn wg_public_key_from_private(private_key_b64: &str) -> Result<String> {
let private = parse_private_key(private_key_b64)?;
let public = PublicKey::from(&private);
Ok(BASE64.encode(public.to_bytes()))
}
fn parse_private_key(b64: &str) -> Result<StaticSecret> {
let bytes = BASE64.decode(b64)?;
if bytes.len() != 32 {
@@ -212,11 +220,20 @@ struct PeerState {
#[allow(dead_code)]
persistent_keepalive: Option<u16>,
stats: WgPeerStats,
/// Whether this peer has completed a WireGuard handshake and is in state.clients.
is_connected: bool,
/// Last time we received data or handshake activity from this peer.
last_activity_at: Option<tokio::time::Instant>,
/// VPN IP assigned during registration (used for connect/disconnect).
vpn_ip: Option<Ipv4Addr>,
/// Previous synced byte counts for aggregate stats delta tracking.
prev_synced_bytes_sent: u64,
prev_synced_bytes_received: u64,
}
impl PeerState {
fn matches_dst(&self, dst_ip: IpAddr) -> bool {
self.allowed_ips.iter().any(|aip| aip.matches(dst_ip))
fn matches_allowed_ips(&self, ip: IpAddr) -> bool {
self.allowed_ips.iter().any(|aip| aip.matches(ip))
}
}
@@ -268,6 +285,11 @@ fn add_peer_to_loop(
endpoint,
persistent_keepalive: config.persistent_keepalive,
stats: WgPeerStats::default(),
is_connected: false,
last_activity_at: None,
vpn_ip: None,
prev_synced_bytes_sent: 0,
prev_synced_bytes_received: 0,
});
info!("Added WireGuard peer: {}", config.public_key);
@@ -286,9 +308,10 @@ pub struct WgListenerConfig {
pub peers: Vec<WgPeerConfig>,
}
/// Extract the first /32 IPv4 address from a list of AllowedIp entries.
/// This is the peer's VPN IP used for return-packet routing.
/// Extract the peer's VPN IP from AllowedIp entries.
/// Prefers /32 entries (exact match); falls back to any IPv4 address.
fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
// Prefer /32 entries (exact peer VPN IP)
for aip in allowed_ips {
if let IpAddr::V4(v4) = aip.addr {
if aip.prefix_len == 32 {
@@ -296,6 +319,12 @@ fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
}
}
}
// Fallback: use the first IPv4 address from any prefix length
for aip in allowed_ips {
if let IpAddr::V4(v4) = aip.addr {
return Some(v4);
}
}
None
}
@@ -308,8 +337,9 @@ fn wg_timestamp_now() -> String {
format!("{}", duration.as_secs())
}
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool).
/// Returns the VPN IP and the per-peer return-packet receiver.
/// Register a WG peer in ServerState (tun_routes + ip_pool only).
/// Does NOT add to state.clients — peers appear there only after handshake.
/// Returns the VPN IP.
async fn register_wg_peer(
state: &Arc<ServerState>,
peer: &PeerState,
@@ -351,13 +381,23 @@ async fn register_wg_peer(
});
}
// Insert ClientInfo
info!("WG peer {} registered with IP {} (not yet connected)", peer.public_key_b64, vpn_ip);
Ok(Some(vpn_ip))
}
/// Add a WG peer to state.clients on first successful handshake (data received).
async fn connect_wg_peer(
state: &Arc<ServerState>,
peer: &PeerState,
vpn_ip: Ipv4Addr,
) {
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
let client_info = ClientInfo {
client_id: client_id.clone(),
assigned_ip: vpn_ip.to_string(),
connected_since: wg_timestamp_now(),
bytes_sent: 0,
bytes_received: 0,
bytes_sent: peer.stats.bytes_sent,
bytes_received: peer.stats.bytes_received,
packets_dropped: 0,
bytes_dropped: 0,
last_keepalive_at: None,
@@ -365,13 +405,31 @@ async fn register_wg_peer(
rate_limit_bytes_per_sec: None,
burst_bytes: None,
authenticated_key: peer.public_key_b64.clone(),
registered_client_id: client_id,
registered_client_id: client_id.clone(),
remote_addr: peer.endpoint.map(|e| e.to_string()),
transport_type: "wireguard".to_string(),
};
state.clients.write().await.insert(client_info.client_id.clone(), client_info);
Ok(Some(vpn_ip))
// Increment total_connections
{
let mut stats = state.stats.write().await;
stats.total_connections += 1;
stats.total_connections_wireguard += 1;
}
info!("WG peer {} connected (IP: {})", peer.public_key_b64, vpn_ip);
}
/// Remove a WG peer from state.clients (disconnect without unregistering).
async fn disconnect_wg_peer(
state: &Arc<ServerState>,
pubkey: &str,
) {
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
if state.clients.write().await.remove(&client_id).is_some() {
info!("WG peer {} disconnected (removed from active clients)", pubkey);
}
}
/// Unregister a WG peer from ServerState.
@@ -445,6 +503,11 @@ pub async fn run_wg_listener(
endpoint,
persistent_keepalive: peer_config.persistent_keepalive,
stats: WgPeerStats::default(),
is_connected: false,
last_activity_at: None,
vpn_ip: None,
prev_synced_bytes_sent: 0,
prev_synced_bytes_received: 0,
});
}
@@ -455,11 +518,12 @@ pub async fn run_wg_listener(
// Merged return-packet channel: all per-peer channels feed into this
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
// Register initial peers in ServerState and track their VPN IPs
// Register initial peers in ServerState (IP reservation + tun_routes only, NOT state.clients)
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
for peer in &peers {
for peer in peers.iter_mut() {
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
peer.vpn_ip = Some(ip);
}
}
@@ -468,6 +532,7 @@ pub async fn run_wg_listener(
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
let mut idle_check_timer = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
tokio::select! {
@@ -491,11 +556,13 @@ pub async fn run_wg_listener(
}
}
peer.endpoint = Some(src_addr);
// Handshake response counts as activity
peer.last_activity_at = Some(tokio::time::Instant::now());
handled = true;
break;
}
TunnResult::WriteToTunnelV4(packet, addr) => {
if peer.matches_dst(IpAddr::V4(addr)) {
if peer.matches_allowed_ips(IpAddr::V4(addr)) {
let pkt_len = packet.len() as u64;
// Forward via shared forwarding engine
let mut engine = state.forwarding_engine.lock().await;
@@ -515,11 +582,20 @@ pub async fn run_wg_listener(
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
// Track activity and detect handshake completion
peer.last_activity_at = Some(tokio::time::Instant::now());
if !peer.is_connected {
peer.is_connected = true;
peer.stats.last_handshake_time = Some(wg_timestamp_now());
if let Some(vpn_ip) = peer.vpn_ip {
connect_wg_peer(&state, peer, vpn_ip).await;
}
}
handled = true;
break;
}
TunnResult::WriteToTunnelV6(packet, addr) => {
if peer.matches_dst(IpAddr::V6(addr)) {
if peer.matches_allowed_ips(IpAddr::V6(addr)) {
let pkt_len = packet.len() as u64;
let mut engine = state.forwarding_engine.lock().await;
match &mut *engine {
@@ -538,6 +614,15 @@ pub async fn run_wg_listener(
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
// Track activity and detect handshake completion
peer.last_activity_at = Some(tokio::time::Instant::now());
if !peer.is_connected {
peer.is_connected = true;
peer.stats.last_handshake_time = Some(wg_timestamp_now());
if let Some(vpn_ip) = peer.vpn_ip {
connect_wg_peer(&state, peer, vpn_ip).await;
}
}
handled = true;
break;
}
@@ -586,6 +671,13 @@ pub async fn run_wg_listener(
udp_socket.send_to(packet, endpoint).await?;
}
}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WG peer {} connection expired", peer.public_key_b64);
if peer.is_connected {
peer.is_connected = false;
disconnect_wg_peer(&state, &peer.public_key_b64).await;
}
}
TunnResult::Err(e) => {
debug!("Timer error for WG peer {}: {:?}",
peer.public_key_b64, e);
@@ -599,19 +691,39 @@ pub async fn run_wg_listener(
_ = stats_timer.tick() => {
let mut clients = state.clients.write().await;
let mut stats = state.stats.write().await;
for peer in peers.iter() {
for peer in peers.iter_mut() {
// Always update aggregate stats (regardless of connection state)
let delta_sent = peer.stats.bytes_sent.saturating_sub(peer.prev_synced_bytes_sent);
let delta_recv = peer.stats.bytes_received.saturating_sub(peer.prev_synced_bytes_received);
if delta_sent > 0 || delta_recv > 0 {
stats.bytes_sent += delta_sent;
stats.bytes_received += delta_recv;
peer.prev_synced_bytes_sent = peer.stats.bytes_sent;
peer.prev_synced_bytes_received = peer.stats.bytes_received;
}
// Only update ClientInfo if peer is connected (in state.clients)
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
if let Some(info) = clients.get_mut(&client_id) {
// Update stats delta
let prev_sent = info.bytes_sent;
let prev_recv = info.bytes_received;
info.bytes_sent = peer.stats.bytes_sent;
info.bytes_received = peer.stats.bytes_received;
info.remote_addr = peer.endpoint.map(|e| e.to_string());
}
}
}
// Update aggregate stats
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent);
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv);
// --- Idle timeout check (every 10s) ---
_ = idle_check_timer.tick() => {
let now = tokio::time::Instant::now();
for peer in peers.iter_mut() {
if peer.is_connected {
if let Some(last) = peer.last_activity_at {
if now.duration_since(last) > std::time::Duration::from_secs(180) {
info!("WG peer {} idle timeout (180s), disconnecting", peer.public_key_b64);
peer.is_connected = false;
disconnect_wg_peer(&state, &peer.public_key_b64).await;
}
}
}
}
}
@@ -628,11 +740,12 @@ pub async fn run_wg_listener(
&config.private_key,
);
if result.is_ok() {
// Register new peer in ServerState
let peer = peers.last().unwrap();
// Register new peer in ServerState (IP + tun_routes only)
let peer = peers.last_mut().unwrap();
match register_wg_peer(&state, peer, &wg_return_tx).await {
Ok(Some(ip)) => {
peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
peer.vpn_ip = Some(ip);
}
Ok(None) => {}
Err(e) => {
@@ -796,12 +909,12 @@ impl WgClient {
let state = self.state.clone();
let assigned_ip = config.address.clone();
// Update state
// Update state — handshake hasn't completed yet
{
let mut s = state.write().await;
s.state = "connected".to_string();
s.state = "handshaking".to_string();
s.assigned_ip = Some(assigned_ip.clone());
s.connected_since = Some(chrono_now());
s.connected_since = None;
}
// Spawn client loop
@@ -868,7 +981,7 @@ async fn wg_client_loop(
endpoint: SocketAddr,
_allowed_ips: Vec<AllowedIp>,
shared_stats: Arc<RwLock<WgPeerStats>>,
_state: Arc<RwLock<WgClientState>>,
state: Arc<RwLock<WgClientState>>,
mut shutdown_rx: oneshot::Receiver<()>,
) -> Result<()> {
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
@@ -876,6 +989,7 @@ async fn wg_client_loop(
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
let mut handshake_complete = false;
let (mut tun_reader, mut tun_writer) = tokio::io::split(tun_device);
@@ -916,14 +1030,37 @@ async fn wg_client_loop(
tun_writer.write_all(packet).await?;
local_stats.bytes_received += pkt_len;
local_stats.packets_received += 1;
if !handshake_complete {
handshake_complete = true;
let mut s = state.write().await;
s.state = "connected".to_string();
s.connected_since = Some(chrono_now());
info!("WireGuard handshake completed, tunnel active");
}
}
TunnResult::WriteToTunnelV6(packet, _addr) => {
let pkt_len = packet.len() as u64;
tun_writer.write_all(packet).await?;
local_stats.bytes_received += pkt_len;
local_stats.packets_received += 1;
if !handshake_complete {
handshake_complete = true;
let mut s = state.write().await;
s.state = "connected".to_string();
s.connected_since = Some(chrono_now());
info!("WireGuard handshake completed, tunnel active");
}
}
TunnResult::Done => {}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WireGuard session expired during decapsulate, re-initiating handshake");
match tunn.format_handshake_initiation(&mut dst_buf, true) {
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, endpoint).await?;
}
_ => {}
}
}
TunnResult::Err(e) => {
debug!("Client decapsulate error: {:?}", e);
}
@@ -955,6 +1092,19 @@ async fn wg_client_loop(
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, endpoint).await?;
}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WireGuard connection expired, re-initiating handshake");
match tunn.format_handshake_initiation(&mut dst_buf, true) {
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, endpoint).await?;
debug!("Sent handshake re-initiation after expiry");
}
TunnResult::Err(e) => {
warn!("Failed to re-initiate handshake: {:?}", e);
}
_ => {}
}
}
TunnResult::Err(e) => {
debug!("Client timer error: {:?}", e);
}
@@ -1028,6 +1178,19 @@ mod tests {
assert_eq!(public.to_bytes(), derived_public.to_bytes());
}
#[test]
fn test_wg_public_key_from_private() {
let (pub_b64, priv_b64) = generate_wg_keypair();
let derived = wg_public_key_from_private(&priv_b64).unwrap();
assert_eq!(derived, pub_b64);
}
#[test]
fn test_wg_public_key_from_private_invalid() {
assert!(wg_public_key_from_private("not-valid").is_err());
assert!(wg_public_key_from_private("AAAA").is_err());
}
#[test]
fn test_parse_invalid_key() {
assert!(parse_private_key("not-valid-base64!!!").is_err());
@@ -1171,7 +1334,7 @@ mod tests {
let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b);
}
TunnResult::Done => {}
other => {
_other => {
// Drain
loop {
match client_tunn.decapsulate(None, &[], &mut buf_a) {

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartvpn',
version: '1.16.0',
version: '1.17.0',
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
}

View File

@@ -217,6 +217,14 @@ export interface IVpnClientInfo {
export interface IVpnServerStatistics extends IVpnStatistics {
activeClients: number;
totalConnections: number;
/** Per-transport active client counts. */
activeClientsWebsocket: number;
activeClientsQuic: number;
activeClientsWireguard: number;
/** Per-transport total connection counts. */
totalConnectionsWebsocket: number;
totalConnectionsQuic: number;
totalConnectionsWireguard: number;
}
export interface IVpnKeypair {