feat(wireguard): track per-transport server statistics and make WireGuard clients active only after handshake

This commit is contained in:
2026-03-31 10:55:15 +00:00
parent 13d0183e9d
commit 67542f0be7
5 changed files with 158 additions and 21 deletions

View File

@@ -1,5 +1,12 @@
# Changelog # Changelog
## 2026-03-31 - 1.17.0 - feat(wireguard)
track per-transport server statistics and make WireGuard clients active only after handshake
- add websocket, quic, and wireguard active-client and total-connection counters to server statistics
- register WireGuard peers without marking them active until handshake/data is received, and remove them from active clients on expiration or idle timeout
- sync WireGuard byte counters into aggregate server stats independently of active client presence and expose new statistics fields in TypeScript interfaces
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat) ## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT improve TCP session backpressure, buffering, and idle cleanup in userspace NAT

View File

@@ -132,6 +132,14 @@ pub struct ServerStatistics {
pub uptime_seconds: u64, pub uptime_seconds: u64,
pub active_clients: u64, pub active_clients: u64,
pub total_connections: u64, pub total_connections: u64,
/// Per-transport active client counts.
pub active_clients_websocket: u64,
pub active_clients_quic: u64,
pub active_clients_wireguard: u64,
/// Per-transport total connection counts.
pub total_connections_websocket: u64,
pub total_connections_quic: u64,
pub total_connections_wireguard: u64,
} }
/// The forwarding engine determines how decrypted IP packets are routed. /// The forwarding engine determines how decrypted IP packets are routed.
@@ -450,7 +458,21 @@ impl VpnServer {
if let Some(ref state) = self.state { if let Some(ref state) = self.state {
let mut stats = state.stats.read().await.clone(); let mut stats = state.stats.read().await.clone();
stats.uptime_seconds = state.started_at.elapsed().as_secs(); stats.uptime_seconds = state.started_at.elapsed().as_secs();
stats.active_clients = state.clients.read().await.len() as u64; let clients = state.clients.read().await;
stats.active_clients = clients.len() as u64;
// Compute per-transport active counts
stats.active_clients_websocket = 0;
stats.active_clients_quic = 0;
stats.active_clients_wireguard = 0;
for info in clients.values() {
match info.transport_type.as_str() {
"websocket" => stats.active_clients_websocket += 1,
"quic" => stats.active_clients_quic += 1,
"wireguard" => stats.active_clients_wireguard += 1,
_ => {}
}
}
drop(clients);
stats stats
} else { } else {
ServerStatistics::default() ServerStatistics::default()
@@ -1303,6 +1325,11 @@ async fn handle_client_connection(
{ {
let mut stats = state.stats.write().await; let mut stats = state.stats.write().await;
stats.total_connections += 1; stats.total_connections += 1;
match transport_type {
"websocket" => stats.total_connections_websocket += 1,
"quic" => stats.total_connections_quic += 1,
_ => {}
}
} }
// Send assigned IP info (encrypted), include effective MTU // Send assigned IP info (encrypted), include effective MTU

View File

@@ -220,6 +220,15 @@ struct PeerState {
#[allow(dead_code)] #[allow(dead_code)]
persistent_keepalive: Option<u16>, persistent_keepalive: Option<u16>,
stats: WgPeerStats, stats: WgPeerStats,
/// Whether this peer has completed a WireGuard handshake and is in state.clients.
is_connected: bool,
/// Last time we received data or handshake activity from this peer.
last_activity_at: Option<tokio::time::Instant>,
/// VPN IP assigned during registration (used for connect/disconnect).
vpn_ip: Option<Ipv4Addr>,
/// Previous synced byte counts for aggregate stats delta tracking.
prev_synced_bytes_sent: u64,
prev_synced_bytes_received: u64,
} }
impl PeerState { impl PeerState {
@@ -276,6 +285,11 @@ fn add_peer_to_loop(
endpoint, endpoint,
persistent_keepalive: config.persistent_keepalive, persistent_keepalive: config.persistent_keepalive,
stats: WgPeerStats::default(), stats: WgPeerStats::default(),
is_connected: false,
last_activity_at: None,
vpn_ip: None,
prev_synced_bytes_sent: 0,
prev_synced_bytes_received: 0,
}); });
info!("Added WireGuard peer: {}", config.public_key); info!("Added WireGuard peer: {}", config.public_key);
@@ -323,8 +337,9 @@ fn wg_timestamp_now() -> String {
format!("{}", duration.as_secs()) format!("{}", duration.as_secs())
} }
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool). /// Register a WG peer in ServerState (tun_routes + ip_pool only).
/// Returns the VPN IP and the per-peer return-packet receiver. /// Does NOT add to state.clients — peers appear there only after handshake.
/// Returns the VPN IP.
async fn register_wg_peer( async fn register_wg_peer(
state: &Arc<ServerState>, state: &Arc<ServerState>,
peer: &PeerState, peer: &PeerState,
@@ -366,13 +381,23 @@ async fn register_wg_peer(
}); });
} }
// Insert ClientInfo info!("WG peer {} registered with IP {} (not yet connected)", peer.public_key_b64, vpn_ip);
Ok(Some(vpn_ip))
}
/// Add a WG peer to state.clients on first successful handshake (data received).
async fn connect_wg_peer(
state: &Arc<ServerState>,
peer: &PeerState,
vpn_ip: Ipv4Addr,
) {
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
let client_info = ClientInfo { let client_info = ClientInfo {
client_id: client_id.clone(), client_id: client_id.clone(),
assigned_ip: vpn_ip.to_string(), assigned_ip: vpn_ip.to_string(),
connected_since: wg_timestamp_now(), connected_since: wg_timestamp_now(),
bytes_sent: 0, bytes_sent: peer.stats.bytes_sent,
bytes_received: 0, bytes_received: peer.stats.bytes_received,
packets_dropped: 0, packets_dropped: 0,
bytes_dropped: 0, bytes_dropped: 0,
last_keepalive_at: None, last_keepalive_at: None,
@@ -380,13 +405,31 @@ async fn register_wg_peer(
rate_limit_bytes_per_sec: None, rate_limit_bytes_per_sec: None,
burst_bytes: None, burst_bytes: None,
authenticated_key: peer.public_key_b64.clone(), authenticated_key: peer.public_key_b64.clone(),
registered_client_id: client_id, registered_client_id: client_id.clone(),
remote_addr: peer.endpoint.map(|e| e.to_string()), remote_addr: peer.endpoint.map(|e| e.to_string()),
transport_type: "wireguard".to_string(), transport_type: "wireguard".to_string(),
}; };
state.clients.write().await.insert(client_info.client_id.clone(), client_info); state.clients.write().await.insert(client_info.client_id.clone(), client_info);
Ok(Some(vpn_ip)) // Increment total_connections
{
let mut stats = state.stats.write().await;
stats.total_connections += 1;
stats.total_connections_wireguard += 1;
}
info!("WG peer {} connected (IP: {})", peer.public_key_b64, vpn_ip);
}
/// Remove a WG peer from state.clients (disconnect without unregistering).
async fn disconnect_wg_peer(
state: &Arc<ServerState>,
pubkey: &str,
) {
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
if state.clients.write().await.remove(&client_id).is_some() {
info!("WG peer {} disconnected (removed from active clients)", pubkey);
}
} }
/// Unregister a WG peer from ServerState. /// Unregister a WG peer from ServerState.
@@ -460,6 +503,11 @@ pub async fn run_wg_listener(
endpoint, endpoint,
persistent_keepalive: peer_config.persistent_keepalive, persistent_keepalive: peer_config.persistent_keepalive,
stats: WgPeerStats::default(), stats: WgPeerStats::default(),
is_connected: false,
last_activity_at: None,
vpn_ip: None,
prev_synced_bytes_sent: 0,
prev_synced_bytes_received: 0,
}); });
} }
@@ -470,11 +518,12 @@ pub async fn run_wg_listener(
// Merged return-packet channel: all per-peer channels feed into this // Merged return-packet channel: all per-peer channels feed into this
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024); let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
// Register initial peers in ServerState and track their VPN IPs // Register initial peers in ServerState (IP reservation + tun_routes only, NOT state.clients)
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new(); let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
for peer in &peers { for peer in peers.iter_mut() {
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await { if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip); peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
peer.vpn_ip = Some(ip);
} }
} }
@@ -483,6 +532,7 @@ pub async fn run_wg_listener(
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE]; let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS)); let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1)); let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
let mut idle_check_timer = tokio::time::interval(std::time::Duration::from_secs(10));
loop { loop {
tokio::select! { tokio::select! {
@@ -506,6 +556,8 @@ pub async fn run_wg_listener(
} }
} }
peer.endpoint = Some(src_addr); peer.endpoint = Some(src_addr);
// Handshake response counts as activity
peer.last_activity_at = Some(tokio::time::Instant::now());
handled = true; handled = true;
break; break;
} }
@@ -530,6 +582,15 @@ pub async fn run_wg_listener(
peer.stats.packets_received += 1; peer.stats.packets_received += 1;
} }
peer.endpoint = Some(src_addr); peer.endpoint = Some(src_addr);
// Track activity and detect handshake completion
peer.last_activity_at = Some(tokio::time::Instant::now());
if !peer.is_connected {
peer.is_connected = true;
peer.stats.last_handshake_time = Some(wg_timestamp_now());
if let Some(vpn_ip) = peer.vpn_ip {
connect_wg_peer(&state, peer, vpn_ip).await;
}
}
handled = true; handled = true;
break; break;
} }
@@ -553,6 +614,15 @@ pub async fn run_wg_listener(
peer.stats.packets_received += 1; peer.stats.packets_received += 1;
} }
peer.endpoint = Some(src_addr); peer.endpoint = Some(src_addr);
// Track activity and detect handshake completion
peer.last_activity_at = Some(tokio::time::Instant::now());
if !peer.is_connected {
peer.is_connected = true;
peer.stats.last_handshake_time = Some(wg_timestamp_now());
if let Some(vpn_ip) = peer.vpn_ip {
connect_wg_peer(&state, peer, vpn_ip).await;
}
}
handled = true; handled = true;
break; break;
} }
@@ -603,6 +673,10 @@ pub async fn run_wg_listener(
} }
TunnResult::Err(WireGuardError::ConnectionExpired) => { TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WG peer {} connection expired", peer.public_key_b64); warn!("WG peer {} connection expired", peer.public_key_b64);
if peer.is_connected {
peer.is_connected = false;
disconnect_wg_peer(&state, &peer.public_key_b64).await;
}
} }
TunnResult::Err(e) => { TunnResult::Err(e) => {
debug!("Timer error for WG peer {}: {:?}", debug!("Timer error for WG peer {}: {:?}",
@@ -617,19 +691,39 @@ pub async fn run_wg_listener(
_ = stats_timer.tick() => { _ = stats_timer.tick() => {
let mut clients = state.clients.write().await; let mut clients = state.clients.write().await;
let mut stats = state.stats.write().await; let mut stats = state.stats.write().await;
for peer in peers.iter() { for peer in peers.iter_mut() {
// Always update aggregate stats (regardless of connection state)
let delta_sent = peer.stats.bytes_sent.saturating_sub(peer.prev_synced_bytes_sent);
let delta_recv = peer.stats.bytes_received.saturating_sub(peer.prev_synced_bytes_received);
if delta_sent > 0 || delta_recv > 0 {
stats.bytes_sent += delta_sent;
stats.bytes_received += delta_recv;
peer.prev_synced_bytes_sent = peer.stats.bytes_sent;
peer.prev_synced_bytes_received = peer.stats.bytes_received;
}
// Only update ClientInfo if peer is connected (in state.clients)
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]); let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
if let Some(info) = clients.get_mut(&client_id) { if let Some(info) = clients.get_mut(&client_id) {
// Update stats delta
let prev_sent = info.bytes_sent;
let prev_recv = info.bytes_received;
info.bytes_sent = peer.stats.bytes_sent; info.bytes_sent = peer.stats.bytes_sent;
info.bytes_received = peer.stats.bytes_received; info.bytes_received = peer.stats.bytes_received;
info.remote_addr = peer.endpoint.map(|e| e.to_string()); info.remote_addr = peer.endpoint.map(|e| e.to_string());
}
}
}
// Update aggregate stats // --- Idle timeout check (every 10s) ---
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent); _ = idle_check_timer.tick() => {
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv); let now = tokio::time::Instant::now();
for peer in peers.iter_mut() {
if peer.is_connected {
if let Some(last) = peer.last_activity_at {
if now.duration_since(last) > std::time::Duration::from_secs(180) {
info!("WG peer {} idle timeout (180s), disconnecting", peer.public_key_b64);
peer.is_connected = false;
disconnect_wg_peer(&state, &peer.public_key_b64).await;
}
}
} }
} }
} }
@@ -646,11 +740,12 @@ pub async fn run_wg_listener(
&config.private_key, &config.private_key,
); );
if result.is_ok() { if result.is_ok() {
// Register new peer in ServerState // Register new peer in ServerState (IP + tun_routes only)
let peer = peers.last().unwrap(); let peer = peers.last_mut().unwrap();
match register_wg_peer(&state, peer, &wg_return_tx).await { match register_wg_peer(&state, peer, &wg_return_tx).await {
Ok(Some(ip)) => { Ok(Some(ip)) => {
peer_vpn_ips.insert(peer_config.public_key.clone(), ip); peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
peer.vpn_ip = Some(ip);
} }
Ok(None) => {} Ok(None) => {}
Err(e) => { Err(e) => {
@@ -1239,7 +1334,7 @@ mod tests {
let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b); let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b);
} }
TunnResult::Done => {} TunnResult::Done => {}
other => { _other => {
// Drain // Drain
loop { loop {
match client_tunn.decapsulate(None, &[], &mut buf_a) { match client_tunn.decapsulate(None, &[], &mut buf_a) {

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartvpn', name: '@push.rocks/smartvpn',
version: '1.16.5', version: '1.17.0',
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
} }

View File

@@ -217,6 +217,14 @@ export interface IVpnClientInfo {
export interface IVpnServerStatistics extends IVpnStatistics { export interface IVpnServerStatistics extends IVpnStatistics {
activeClients: number; activeClients: number;
totalConnections: number; totalConnections: number;
/** Per-transport active client counts. */
activeClientsWebsocket: number;
activeClientsQuic: number;
activeClientsWireguard: number;
/** Per-transport total connection counts. */
totalConnectionsWebsocket: number;
totalConnectionsQuic: number;
totalConnectionsWireguard: number;
} }
export interface IVpnKeypair { export interface IVpnKeypair {