diff --git a/changelog.md b/changelog.md index bbb0623..1f99617 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-31 - 1.17.0 - feat(wireguard) +track per-transport server statistics and make WireGuard clients active only after handshake + +- add websocket, quic, and wireguard active-client and total-connection counters to server statistics +- register WireGuard peers without marking them active until handshake/data is received, and remove them from active clients on expiration or idle timeout +- sync WireGuard byte counters into aggregate server stats independently of active client presence and expose new statistics fields in TypeScript interfaces + ## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat) improve TCP session backpressure, buffering, and idle cleanup in userspace NAT diff --git a/rust/src/server.rs b/rust/src/server.rs index 76f5ae6..311cc2d 100644 --- a/rust/src/server.rs +++ b/rust/src/server.rs @@ -132,6 +132,14 @@ pub struct ServerStatistics { pub uptime_seconds: u64, pub active_clients: u64, pub total_connections: u64, + /// Per-transport active client counts. + pub active_clients_websocket: u64, + pub active_clients_quic: u64, + pub active_clients_wireguard: u64, + /// Per-transport total connection counts. + pub total_connections_websocket: u64, + pub total_connections_quic: u64, + pub total_connections_wireguard: u64, } /// The forwarding engine determines how decrypted IP packets are routed. @@ -450,7 +458,21 @@ impl VpnServer { if let Some(ref state) = self.state { let mut stats = state.stats.read().await.clone(); stats.uptime_seconds = state.started_at.elapsed().as_secs(); - stats.active_clients = state.clients.read().await.len() as u64; + let clients = state.clients.read().await; + stats.active_clients = clients.len() as u64; + // Compute per-transport active counts + stats.active_clients_websocket = 0; + stats.active_clients_quic = 0; + stats.active_clients_wireguard = 0; + for info in clients.values() { + match info.transport_type.as_str() { + "websocket" => stats.active_clients_websocket += 1, + "quic" => stats.active_clients_quic += 1, + "wireguard" => stats.active_clients_wireguard += 1, + _ => {} + } + } + drop(clients); stats } else { ServerStatistics::default() @@ -1303,6 +1325,11 @@ async fn handle_client_connection( { let mut stats = state.stats.write().await; stats.total_connections += 1; + match transport_type { + "websocket" => stats.total_connections_websocket += 1, + "quic" => stats.total_connections_quic += 1, + _ => {} + } } // Send assigned IP info (encrypted), include effective MTU diff --git a/rust/src/wireguard.rs b/rust/src/wireguard.rs index d698360..899069a 100644 --- a/rust/src/wireguard.rs +++ b/rust/src/wireguard.rs @@ -220,6 +220,15 @@ struct PeerState { #[allow(dead_code)] persistent_keepalive: Option, stats: WgPeerStats, + /// Whether this peer has completed a WireGuard handshake and is in state.clients. + is_connected: bool, + /// Last time we received data or handshake activity from this peer. + last_activity_at: Option, + /// VPN IP assigned during registration (used for connect/disconnect). + vpn_ip: Option, + /// Previous synced byte counts for aggregate stats delta tracking. + prev_synced_bytes_sent: u64, + prev_synced_bytes_received: u64, } impl PeerState { @@ -276,6 +285,11 @@ fn add_peer_to_loop( endpoint, persistent_keepalive: config.persistent_keepalive, stats: WgPeerStats::default(), + is_connected: false, + last_activity_at: None, + vpn_ip: None, + prev_synced_bytes_sent: 0, + prev_synced_bytes_received: 0, }); info!("Added WireGuard peer: {}", config.public_key); @@ -323,8 +337,9 @@ fn wg_timestamp_now() -> String { format!("{}", duration.as_secs()) } -/// Register a WG peer in ServerState (tun_routes, clients, ip_pool). -/// Returns the VPN IP and the per-peer return-packet receiver. +/// Register a WG peer in ServerState (tun_routes + ip_pool only). +/// Does NOT add to state.clients — peers appear there only after handshake. +/// Returns the VPN IP. async fn register_wg_peer( state: &Arc, peer: &PeerState, @@ -366,13 +381,23 @@ async fn register_wg_peer( }); } - // Insert ClientInfo + info!("WG peer {} registered with IP {} (not yet connected)", peer.public_key_b64, vpn_ip); + Ok(Some(vpn_ip)) +} + +/// Add a WG peer to state.clients on first successful handshake (data received). +async fn connect_wg_peer( + state: &Arc, + peer: &PeerState, + vpn_ip: Ipv4Addr, +) { + let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]); let client_info = ClientInfo { client_id: client_id.clone(), assigned_ip: vpn_ip.to_string(), connected_since: wg_timestamp_now(), - bytes_sent: 0, - bytes_received: 0, + bytes_sent: peer.stats.bytes_sent, + bytes_received: peer.stats.bytes_received, packets_dropped: 0, bytes_dropped: 0, last_keepalive_at: None, @@ -380,13 +405,31 @@ async fn register_wg_peer( rate_limit_bytes_per_sec: None, burst_bytes: None, authenticated_key: peer.public_key_b64.clone(), - registered_client_id: client_id, + registered_client_id: client_id.clone(), remote_addr: peer.endpoint.map(|e| e.to_string()), transport_type: "wireguard".to_string(), }; state.clients.write().await.insert(client_info.client_id.clone(), client_info); - Ok(Some(vpn_ip)) + // Increment total_connections + { + let mut stats = state.stats.write().await; + stats.total_connections += 1; + stats.total_connections_wireguard += 1; + } + + info!("WG peer {} connected (IP: {})", peer.public_key_b64, vpn_ip); +} + +/// Remove a WG peer from state.clients (disconnect without unregistering). +async fn disconnect_wg_peer( + state: &Arc, + pubkey: &str, +) { + let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]); + if state.clients.write().await.remove(&client_id).is_some() { + info!("WG peer {} disconnected (removed from active clients)", pubkey); + } } /// Unregister a WG peer from ServerState. @@ -460,6 +503,11 @@ pub async fn run_wg_listener( endpoint, persistent_keepalive: peer_config.persistent_keepalive, stats: WgPeerStats::default(), + is_connected: false, + last_activity_at: None, + vpn_ip: None, + prev_synced_bytes_sent: 0, + prev_synced_bytes_received: 0, }); } @@ -470,11 +518,12 @@ pub async fn run_wg_listener( // Merged return-packet channel: all per-peer channels feed into this let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec)>(1024); - // Register initial peers in ServerState and track their VPN IPs + // Register initial peers in ServerState (IP reservation + tun_routes only, NOT state.clients) let mut peer_vpn_ips: HashMap = HashMap::new(); - for peer in &peers { + for peer in peers.iter_mut() { if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await { peer_vpn_ips.insert(peer.public_key_b64.clone(), ip); + peer.vpn_ip = Some(ip); } } @@ -483,6 +532,7 @@ pub async fn run_wg_listener( let mut dst_buf = vec![0u8; WG_BUFFER_SIZE]; let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS)); let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1)); + let mut idle_check_timer = tokio::time::interval(std::time::Duration::from_secs(10)); loop { tokio::select! { @@ -506,6 +556,8 @@ pub async fn run_wg_listener( } } peer.endpoint = Some(src_addr); + // Handshake response counts as activity + peer.last_activity_at = Some(tokio::time::Instant::now()); handled = true; break; } @@ -530,6 +582,15 @@ pub async fn run_wg_listener( peer.stats.packets_received += 1; } peer.endpoint = Some(src_addr); + // Track activity and detect handshake completion + peer.last_activity_at = Some(tokio::time::Instant::now()); + if !peer.is_connected { + peer.is_connected = true; + peer.stats.last_handshake_time = Some(wg_timestamp_now()); + if let Some(vpn_ip) = peer.vpn_ip { + connect_wg_peer(&state, peer, vpn_ip).await; + } + } handled = true; break; } @@ -553,6 +614,15 @@ pub async fn run_wg_listener( peer.stats.packets_received += 1; } peer.endpoint = Some(src_addr); + // Track activity and detect handshake completion + peer.last_activity_at = Some(tokio::time::Instant::now()); + if !peer.is_connected { + peer.is_connected = true; + peer.stats.last_handshake_time = Some(wg_timestamp_now()); + if let Some(vpn_ip) = peer.vpn_ip { + connect_wg_peer(&state, peer, vpn_ip).await; + } + } handled = true; break; } @@ -603,6 +673,10 @@ pub async fn run_wg_listener( } TunnResult::Err(WireGuardError::ConnectionExpired) => { warn!("WG peer {} connection expired", peer.public_key_b64); + if peer.is_connected { + peer.is_connected = false; + disconnect_wg_peer(&state, &peer.public_key_b64).await; + } } TunnResult::Err(e) => { debug!("Timer error for WG peer {}: {:?}", @@ -617,19 +691,39 @@ pub async fn run_wg_listener( _ = stats_timer.tick() => { let mut clients = state.clients.write().await; let mut stats = state.stats.write().await; - for peer in peers.iter() { + for peer in peers.iter_mut() { + // Always update aggregate stats (regardless of connection state) + let delta_sent = peer.stats.bytes_sent.saturating_sub(peer.prev_synced_bytes_sent); + let delta_recv = peer.stats.bytes_received.saturating_sub(peer.prev_synced_bytes_received); + if delta_sent > 0 || delta_recv > 0 { + stats.bytes_sent += delta_sent; + stats.bytes_received += delta_recv; + peer.prev_synced_bytes_sent = peer.stats.bytes_sent; + peer.prev_synced_bytes_received = peer.stats.bytes_received; + } + + // Only update ClientInfo if peer is connected (in state.clients) let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]); if let Some(info) = clients.get_mut(&client_id) { - // Update stats delta - let prev_sent = info.bytes_sent; - let prev_recv = info.bytes_received; info.bytes_sent = peer.stats.bytes_sent; info.bytes_received = peer.stats.bytes_received; info.remote_addr = peer.endpoint.map(|e| e.to_string()); + } + } + } - // Update aggregate stats - stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent); - stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv); + // --- Idle timeout check (every 10s) --- + _ = idle_check_timer.tick() => { + let now = tokio::time::Instant::now(); + for peer in peers.iter_mut() { + if peer.is_connected { + if let Some(last) = peer.last_activity_at { + if now.duration_since(last) > std::time::Duration::from_secs(180) { + info!("WG peer {} idle timeout (180s), disconnecting", peer.public_key_b64); + peer.is_connected = false; + disconnect_wg_peer(&state, &peer.public_key_b64).await; + } + } } } } @@ -646,11 +740,12 @@ pub async fn run_wg_listener( &config.private_key, ); if result.is_ok() { - // Register new peer in ServerState - let peer = peers.last().unwrap(); + // Register new peer in ServerState (IP + tun_routes only) + let peer = peers.last_mut().unwrap(); match register_wg_peer(&state, peer, &wg_return_tx).await { Ok(Some(ip)) => { peer_vpn_ips.insert(peer_config.public_key.clone(), ip); + peer.vpn_ip = Some(ip); } Ok(None) => {} Err(e) => { @@ -1239,7 +1334,7 @@ mod tests { let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b); } TunnResult::Done => {} - other => { + _other => { // Drain loop { match client_tunn.decapsulate(None, &[], &mut buf_a) { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index e683453..745980b 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartvpn', - version: '1.16.5', + version: '1.17.0', description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' } diff --git a/ts/smartvpn.interfaces.ts b/ts/smartvpn.interfaces.ts index e436a5a..f990cdc 100644 --- a/ts/smartvpn.interfaces.ts +++ b/ts/smartvpn.interfaces.ts @@ -217,6 +217,14 @@ export interface IVpnClientInfo { export interface IVpnServerStatistics extends IVpnStatistics { activeClients: number; totalConnections: number; + /** Per-transport active client counts. */ + activeClientsWebsocket: number; + activeClientsQuic: number; + activeClientsWireguard: number; + /** Per-transport total connection counts. */ + totalConnectionsWebsocket: number; + totalConnectionsQuic: number; + totalConnectionsWireguard: number; } export interface IVpnKeypair {