Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fbfbe0db51 | |||
| 67542f0be7 | |||
| 13d0183e9d | |||
| 99a8a29ff1 | |||
| fe9c693ac8 | |||
| 20ef92599b |
22
changelog.md
22
changelog.md
@@ -1,5 +1,27 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-31 - 1.17.0 - feat(wireguard)
|
||||
track per-transport server statistics and make WireGuard clients active only after handshake
|
||||
|
||||
- add websocket, quic, and wireguard active-client and total-connection counters to server statistics
|
||||
- register WireGuard peers without marking them active until handshake/data is received, and remove them from active clients on expiration or idle timeout
|
||||
- sync WireGuard byte counters into aggregate server stats independently of active client presence and expose new statistics fields in TypeScript interfaces
|
||||
|
||||
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
|
||||
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT
|
||||
|
||||
- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data
|
||||
- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps
|
||||
- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up
|
||||
- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay
|
||||
|
||||
## 2026-03-31 - 1.16.4 - fix(server)
|
||||
register preloaded WireGuard clients as peers on server startup
|
||||
|
||||
- Adds configured clients from the runtime registry to the WireGuard listener when the server starts.
|
||||
- Ensures clients loaded from config can complete WireGuard handshakes without requiring separate peer registration.
|
||||
- Logs a warning if automatic peer registration fails for an individual client.
|
||||
|
||||
## 2026-03-31 - 1.16.3 - fix(rust-nat)
|
||||
defer TCP bridge startup until handshake completion and buffer partial NAT socket writes
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartvpn",
|
||||
"version": "1.16.3",
|
||||
"version": "1.17.0",
|
||||
"private": false,
|
||||
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
|
||||
"type": "module",
|
||||
|
||||
@@ -132,6 +132,14 @@ pub struct ServerStatistics {
|
||||
pub uptime_seconds: u64,
|
||||
pub active_clients: u64,
|
||||
pub total_connections: u64,
|
||||
/// Per-transport active client counts.
|
||||
pub active_clients_websocket: u64,
|
||||
pub active_clients_quic: u64,
|
||||
pub active_clients_wireguard: u64,
|
||||
/// Per-transport total connection counts.
|
||||
pub total_connections_websocket: u64,
|
||||
pub total_connections_quic: u64,
|
||||
pub total_connections_wireguard: u64,
|
||||
}
|
||||
|
||||
/// The forwarding engine determines how decrypted IP packets are routed.
|
||||
@@ -372,6 +380,28 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
info!("VPN server started (transport: {})", transport_mode);
|
||||
|
||||
// Register pre-loaded clients (from config.clients) as WG peers.
|
||||
// The WG listener only starts with config.wg_peers; clients loaded into the
|
||||
// registry need to be dynamically added so WG handshakes work.
|
||||
if self.wg_command_tx.is_some() {
|
||||
let registry = state.client_registry.read().await;
|
||||
for entry in registry.list() {
|
||||
if let (Some(ref wg_key), Some(ref ip_str)) = (&entry.wg_public_key, &entry.assigned_ip) {
|
||||
let peer_config = crate::wireguard::WgPeerConfig {
|
||||
public_key: wg_key.clone(),
|
||||
preshared_key: None,
|
||||
allowed_ips: vec![format!("{}/32", ip_str)],
|
||||
endpoint: None,
|
||||
persistent_keepalive: Some(25),
|
||||
};
|
||||
if let Err(e) = self.add_wg_peer(peer_config).await {
|
||||
warn!("Failed to register pre-loaded WG peer for {}: {}", entry.client_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -428,7 +458,21 @@ impl VpnServer {
|
||||
if let Some(ref state) = self.state {
|
||||
let mut stats = state.stats.read().await.clone();
|
||||
stats.uptime_seconds = state.started_at.elapsed().as_secs();
|
||||
stats.active_clients = state.clients.read().await.len() as u64;
|
||||
let clients = state.clients.read().await;
|
||||
stats.active_clients = clients.len() as u64;
|
||||
// Compute per-transport active counts
|
||||
stats.active_clients_websocket = 0;
|
||||
stats.active_clients_quic = 0;
|
||||
stats.active_clients_wireguard = 0;
|
||||
for info in clients.values() {
|
||||
match info.transport_type.as_str() {
|
||||
"websocket" => stats.active_clients_websocket += 1,
|
||||
"quic" => stats.active_clients_quic += 1,
|
||||
"wireguard" => stats.active_clients_wireguard += 1,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
drop(clients);
|
||||
stats
|
||||
} else {
|
||||
ServerStatistics::default()
|
||||
@@ -1281,6 +1325,11 @@ async fn handle_client_connection(
|
||||
{
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.total_connections += 1;
|
||||
match transport_type {
|
||||
"websocket" => stats.total_connections_websocket += 1,
|
||||
"quic" => stats.total_connections_quic += 1,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Send assigned IP info (encrypted), include effective MTU
|
||||
|
||||
@@ -17,6 +17,10 @@ use crate::acl;
|
||||
use crate::server::{DestinationPolicyConfig, ServerState};
|
||||
use crate::tunnel;
|
||||
|
||||
/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer).
|
||||
/// Sessions exceeding this are aborted — the client cannot keep up.
|
||||
const TCP_PENDING_SEND_MAX: usize = 512 * 1024;
|
||||
|
||||
// ============================================================================
|
||||
// Virtual IP device for smoltcp
|
||||
// ============================================================================
|
||||
@@ -121,7 +125,8 @@ struct SessionKey {
|
||||
|
||||
struct TcpSession {
|
||||
smoltcp_handle: SocketHandle,
|
||||
bridge_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Channel to send data to the bridge task. None until bridge starts.
|
||||
bridge_data_tx: Option<mpsc::Sender<Vec<u8>>>,
|
||||
#[allow(dead_code)]
|
||||
client_ip: Ipv4Addr,
|
||||
/// Bridge task has been spawned (deferred until handshake completes)
|
||||
@@ -132,6 +137,8 @@ struct TcpSession {
|
||||
pending_send: Vec<u8>,
|
||||
/// Session is closing (FIN in progress), don't accept new SYNs
|
||||
closing: bool,
|
||||
/// Last time data flowed through this session (for idle timeout)
|
||||
last_activity: tokio::time::Instant,
|
||||
}
|
||||
|
||||
struct UdpSession {
|
||||
@@ -385,17 +392,15 @@ impl NatEngine {
|
||||
|
||||
let handle = self.sockets.add(socket);
|
||||
|
||||
// Channel for sending data from NAT engine to bridge task
|
||||
let (data_tx, _data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
|
||||
let session = TcpSession {
|
||||
smoltcp_handle: handle,
|
||||
bridge_data_tx: data_tx,
|
||||
bridge_data_tx: None,
|
||||
client_ip: key.src_ip,
|
||||
bridge_started: false,
|
||||
connect_addr,
|
||||
pending_send: Vec::new(),
|
||||
closing: false,
|
||||
last_activity: tokio::time::Instant::now(),
|
||||
};
|
||||
self.tcp_sessions.insert(key.clone(), session);
|
||||
|
||||
@@ -470,9 +475,8 @@ impl NatEngine {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.is_active() {
|
||||
session.bridge_started = true;
|
||||
// Recreate the data channel — the old receiver was dropped
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
session.bridge_data_tx = data_tx;
|
||||
session.bridge_data_tx = Some(data_tx);
|
||||
let btx = bridge_tx_clone.clone();
|
||||
let k = key.clone();
|
||||
let addr = session.connect_addr;
|
||||
@@ -503,15 +507,29 @@ impl NatEngine {
|
||||
|
||||
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks
|
||||
let mut closed_tcp: Vec<SessionKey> = Vec::new();
|
||||
let mut tcp_outbound: Vec<(mpsc::Sender<Vec<u8>>, Vec<u8>)> = Vec::new();
|
||||
let mut active_tcp: Vec<SessionKey> = Vec::new();
|
||||
for (key, session) in &self.tcp_sessions {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if session.bridge_started && socket.can_recv() {
|
||||
let sender = session.bridge_data_tx.clone();
|
||||
let _ = socket.recv(|data| {
|
||||
tcp_outbound.push((sender.clone(), data.to_vec()));
|
||||
(data.len(), ())
|
||||
});
|
||||
if let Some(ref sender) = session.bridge_data_tx {
|
||||
// Reserve channel slot BEFORE consuming from smoltcp.
|
||||
// If the channel is full, we don't consume — smoltcp's RX buffer
|
||||
// fills up, it stops advertising TCP window space, and the VPN
|
||||
// client's TCP stack backs off. Proper end-to-end backpressure.
|
||||
match sender.try_reserve() {
|
||||
Ok(permit) => {
|
||||
let _ = socket.recv(|data| {
|
||||
permit.send(data.to_vec());
|
||||
(data.len(), ())
|
||||
});
|
||||
active_tcp.push(key.clone());
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Detect closed connections
|
||||
if !socket.is_open() && !socket.is_listening() {
|
||||
@@ -519,10 +537,11 @@ impl NatEngine {
|
||||
}
|
||||
}
|
||||
|
||||
// Send TCP data to bridge tasks (outside borrow of self.tcp_sessions)
|
||||
for (sender, data) in tcp_outbound {
|
||||
if sender.try_send(data).is_err() {
|
||||
debug!("NAT: bridge channel full, TCP data dropped");
|
||||
// Update last_activity for sessions that had data flow
|
||||
let now = tokio::time::Instant::now();
|
||||
for key in active_tcp {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
session.last_activity = now;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -561,39 +580,31 @@ impl NatEngine {
|
||||
match msg {
|
||||
BridgeMessage::TcpData { key, data } => {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
session.last_activity = tokio::time::Instant::now();
|
||||
// Append to pending buffer, then flush as much as possible
|
||||
session.pending_send.extend_from_slice(&data);
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_send() {
|
||||
// Try to write directly first
|
||||
let all_data = if session.pending_send.is_empty() {
|
||||
&data
|
||||
} else {
|
||||
session.pending_send.extend_from_slice(&data);
|
||||
&session.pending_send.clone()
|
||||
};
|
||||
match socket.send_slice(all_data) {
|
||||
Ok(written) if written < all_data.len() => {
|
||||
// Partial write — buffer the rest
|
||||
if session.pending_send.is_empty() {
|
||||
session.pending_send = data[written..].to_vec();
|
||||
} else {
|
||||
session.pending_send.drain(..written);
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
// Full write — clear any pending data
|
||||
session.pending_send.clear();
|
||||
}
|
||||
Err(_) => {
|
||||
// Write failed — buffer everything
|
||||
if session.pending_send.is_empty() {
|
||||
session.pending_send = data;
|
||||
}
|
||||
if socket.can_send() && !session.pending_send.is_empty() {
|
||||
match socket.send_slice(&session.pending_send) {
|
||||
Ok(written) if written > 0 => {
|
||||
session.pending_send.drain(..written);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
} else {
|
||||
// Can't send yet — buffer for later
|
||||
session.pending_send.extend_from_slice(&data);
|
||||
}
|
||||
// Cap check — abort session if client can't keep up
|
||||
if session.pending_send.len() > TCP_PENDING_SEND_MAX {
|
||||
warn!(
|
||||
"NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port,
|
||||
TCP_PENDING_SEND_MAX / 1024
|
||||
);
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.abort();
|
||||
session.pending_send.clear();
|
||||
session.closing = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -644,6 +655,29 @@ impl NatEngine {
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_idle_tcp_sessions(&mut self) {
|
||||
let timeout = Duration::from_secs(300); // 5 minutes
|
||||
let now = tokio::time::Instant::now();
|
||||
let expired: Vec<SessionKey> = self
|
||||
.tcp_sessions
|
||||
.iter()
|
||||
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
|
||||
for key in expired {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.abort();
|
||||
self.sockets.remove(session.smoltcp_handle);
|
||||
warn!(
|
||||
"NAT: TCP session timed out {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main async event loop for the NAT engine.
|
||||
pub async fn run(
|
||||
mut self,
|
||||
@@ -651,9 +685,13 @@ impl NatEngine {
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
info!("Userspace NAT engine started");
|
||||
let mut timer = tokio::time::interval(Duration::from_millis(50));
|
||||
let default_poll_delay = Duration::from_millis(50);
|
||||
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
|
||||
|
||||
// Dynamic poll timer — reset after each event using smoltcp's poll_delay()
|
||||
let poll_sleep = tokio::time::sleep(default_poll_delay);
|
||||
tokio::pin!(poll_sleep);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(packet) = packet_rx.recv() => {
|
||||
@@ -664,18 +702,26 @@ impl NatEngine {
|
||||
self.handle_bridge_message(msg);
|
||||
self.process().await;
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
() = &mut poll_sleep => {
|
||||
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
|
||||
self.process().await;
|
||||
}
|
||||
_ = cleanup_timer.tick() => {
|
||||
self.cleanup_idle_udp_sessions();
|
||||
self.cleanup_idle_tcp_sessions();
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Userspace NAT engine shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Reset poll delay based on smoltcp's actual timer needs
|
||||
let now = self.smoltcp_now();
|
||||
let delay = self.iface.poll_delay(now, &self.sockets)
|
||||
.map(|d| Duration::from_millis(d.total_millis()))
|
||||
.unwrap_or(default_poll_delay);
|
||||
poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -220,6 +220,15 @@ struct PeerState {
|
||||
#[allow(dead_code)]
|
||||
persistent_keepalive: Option<u16>,
|
||||
stats: WgPeerStats,
|
||||
/// Whether this peer has completed a WireGuard handshake and is in state.clients.
|
||||
is_connected: bool,
|
||||
/// Last time we received data or handshake activity from this peer.
|
||||
last_activity_at: Option<tokio::time::Instant>,
|
||||
/// VPN IP assigned during registration (used for connect/disconnect).
|
||||
vpn_ip: Option<Ipv4Addr>,
|
||||
/// Previous synced byte counts for aggregate stats delta tracking.
|
||||
prev_synced_bytes_sent: u64,
|
||||
prev_synced_bytes_received: u64,
|
||||
}
|
||||
|
||||
impl PeerState {
|
||||
@@ -276,6 +285,11 @@ fn add_peer_to_loop(
|
||||
endpoint,
|
||||
persistent_keepalive: config.persistent_keepalive,
|
||||
stats: WgPeerStats::default(),
|
||||
is_connected: false,
|
||||
last_activity_at: None,
|
||||
vpn_ip: None,
|
||||
prev_synced_bytes_sent: 0,
|
||||
prev_synced_bytes_received: 0,
|
||||
});
|
||||
|
||||
info!("Added WireGuard peer: {}", config.public_key);
|
||||
@@ -323,8 +337,9 @@ fn wg_timestamp_now() -> String {
|
||||
format!("{}", duration.as_secs())
|
||||
}
|
||||
|
||||
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool).
|
||||
/// Returns the VPN IP and the per-peer return-packet receiver.
|
||||
/// Register a WG peer in ServerState (tun_routes + ip_pool only).
|
||||
/// Does NOT add to state.clients — peers appear there only after handshake.
|
||||
/// Returns the VPN IP.
|
||||
async fn register_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
peer: &PeerState,
|
||||
@@ -366,13 +381,23 @@ async fn register_wg_peer(
|
||||
});
|
||||
}
|
||||
|
||||
// Insert ClientInfo
|
||||
info!("WG peer {} registered with IP {} (not yet connected)", peer.public_key_b64, vpn_ip);
|
||||
Ok(Some(vpn_ip))
|
||||
}
|
||||
|
||||
/// Add a WG peer to state.clients on first successful handshake (data received).
|
||||
async fn connect_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
peer: &PeerState,
|
||||
vpn_ip: Ipv4Addr,
|
||||
) {
|
||||
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
|
||||
let client_info = ClientInfo {
|
||||
client_id: client_id.clone(),
|
||||
assigned_ip: vpn_ip.to_string(),
|
||||
connected_since: wg_timestamp_now(),
|
||||
bytes_sent: 0,
|
||||
bytes_received: 0,
|
||||
bytes_sent: peer.stats.bytes_sent,
|
||||
bytes_received: peer.stats.bytes_received,
|
||||
packets_dropped: 0,
|
||||
bytes_dropped: 0,
|
||||
last_keepalive_at: None,
|
||||
@@ -380,13 +405,31 @@ async fn register_wg_peer(
|
||||
rate_limit_bytes_per_sec: None,
|
||||
burst_bytes: None,
|
||||
authenticated_key: peer.public_key_b64.clone(),
|
||||
registered_client_id: client_id,
|
||||
registered_client_id: client_id.clone(),
|
||||
remote_addr: peer.endpoint.map(|e| e.to_string()),
|
||||
transport_type: "wireguard".to_string(),
|
||||
};
|
||||
state.clients.write().await.insert(client_info.client_id.clone(), client_info);
|
||||
|
||||
Ok(Some(vpn_ip))
|
||||
// Increment total_connections
|
||||
{
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.total_connections += 1;
|
||||
stats.total_connections_wireguard += 1;
|
||||
}
|
||||
|
||||
info!("WG peer {} connected (IP: {})", peer.public_key_b64, vpn_ip);
|
||||
}
|
||||
|
||||
/// Remove a WG peer from state.clients (disconnect without unregistering).
|
||||
async fn disconnect_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
pubkey: &str,
|
||||
) {
|
||||
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
|
||||
if state.clients.write().await.remove(&client_id).is_some() {
|
||||
info!("WG peer {} disconnected (removed from active clients)", pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
/// Unregister a WG peer from ServerState.
|
||||
@@ -460,6 +503,11 @@ pub async fn run_wg_listener(
|
||||
endpoint,
|
||||
persistent_keepalive: peer_config.persistent_keepalive,
|
||||
stats: WgPeerStats::default(),
|
||||
is_connected: false,
|
||||
last_activity_at: None,
|
||||
vpn_ip: None,
|
||||
prev_synced_bytes_sent: 0,
|
||||
prev_synced_bytes_received: 0,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -470,11 +518,12 @@ pub async fn run_wg_listener(
|
||||
// Merged return-packet channel: all per-peer channels feed into this
|
||||
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
|
||||
|
||||
// Register initial peers in ServerState and track their VPN IPs
|
||||
// Register initial peers in ServerState (IP reservation + tun_routes only, NOT state.clients)
|
||||
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
|
||||
for peer in &peers {
|
||||
for peer in peers.iter_mut() {
|
||||
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
|
||||
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
|
||||
peer.vpn_ip = Some(ip);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,6 +532,7 @@ pub async fn run_wg_listener(
|
||||
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
|
||||
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
|
||||
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
|
||||
let mut idle_check_timer = tokio::time::interval(std::time::Duration::from_secs(10));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -506,6 +556,8 @@ pub async fn run_wg_listener(
|
||||
}
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
// Handshake response counts as activity
|
||||
peer.last_activity_at = Some(tokio::time::Instant::now());
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
@@ -530,6 +582,15 @@ pub async fn run_wg_listener(
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
// Track activity and detect handshake completion
|
||||
peer.last_activity_at = Some(tokio::time::Instant::now());
|
||||
if !peer.is_connected {
|
||||
peer.is_connected = true;
|
||||
peer.stats.last_handshake_time = Some(wg_timestamp_now());
|
||||
if let Some(vpn_ip) = peer.vpn_ip {
|
||||
connect_wg_peer(&state, peer, vpn_ip).await;
|
||||
}
|
||||
}
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
@@ -553,6 +614,15 @@ pub async fn run_wg_listener(
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
// Track activity and detect handshake completion
|
||||
peer.last_activity_at = Some(tokio::time::Instant::now());
|
||||
if !peer.is_connected {
|
||||
peer.is_connected = true;
|
||||
peer.stats.last_handshake_time = Some(wg_timestamp_now());
|
||||
if let Some(vpn_ip) = peer.vpn_ip {
|
||||
connect_wg_peer(&state, peer, vpn_ip).await;
|
||||
}
|
||||
}
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
@@ -603,6 +673,10 @@ pub async fn run_wg_listener(
|
||||
}
|
||||
TunnResult::Err(WireGuardError::ConnectionExpired) => {
|
||||
warn!("WG peer {} connection expired", peer.public_key_b64);
|
||||
if peer.is_connected {
|
||||
peer.is_connected = false;
|
||||
disconnect_wg_peer(&state, &peer.public_key_b64).await;
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("Timer error for WG peer {}: {:?}",
|
||||
@@ -617,19 +691,39 @@ pub async fn run_wg_listener(
|
||||
_ = stats_timer.tick() => {
|
||||
let mut clients = state.clients.write().await;
|
||||
let mut stats = state.stats.write().await;
|
||||
for peer in peers.iter() {
|
||||
for peer in peers.iter_mut() {
|
||||
// Always update aggregate stats (regardless of connection state)
|
||||
let delta_sent = peer.stats.bytes_sent.saturating_sub(peer.prev_synced_bytes_sent);
|
||||
let delta_recv = peer.stats.bytes_received.saturating_sub(peer.prev_synced_bytes_received);
|
||||
if delta_sent > 0 || delta_recv > 0 {
|
||||
stats.bytes_sent += delta_sent;
|
||||
stats.bytes_received += delta_recv;
|
||||
peer.prev_synced_bytes_sent = peer.stats.bytes_sent;
|
||||
peer.prev_synced_bytes_received = peer.stats.bytes_received;
|
||||
}
|
||||
|
||||
// Only update ClientInfo if peer is connected (in state.clients)
|
||||
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
// Update stats delta
|
||||
let prev_sent = info.bytes_sent;
|
||||
let prev_recv = info.bytes_received;
|
||||
info.bytes_sent = peer.stats.bytes_sent;
|
||||
info.bytes_received = peer.stats.bytes_received;
|
||||
info.remote_addr = peer.endpoint.map(|e| e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update aggregate stats
|
||||
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent);
|
||||
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv);
|
||||
// --- Idle timeout check (every 10s) ---
|
||||
_ = idle_check_timer.tick() => {
|
||||
let now = tokio::time::Instant::now();
|
||||
for peer in peers.iter_mut() {
|
||||
if peer.is_connected {
|
||||
if let Some(last) = peer.last_activity_at {
|
||||
if now.duration_since(last) > std::time::Duration::from_secs(180) {
|
||||
info!("WG peer {} idle timeout (180s), disconnecting", peer.public_key_b64);
|
||||
peer.is_connected = false;
|
||||
disconnect_wg_peer(&state, &peer.public_key_b64).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -646,11 +740,12 @@ pub async fn run_wg_listener(
|
||||
&config.private_key,
|
||||
);
|
||||
if result.is_ok() {
|
||||
// Register new peer in ServerState
|
||||
let peer = peers.last().unwrap();
|
||||
// Register new peer in ServerState (IP + tun_routes only)
|
||||
let peer = peers.last_mut().unwrap();
|
||||
match register_wg_peer(&state, peer, &wg_return_tx).await {
|
||||
Ok(Some(ip)) => {
|
||||
peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
|
||||
peer.vpn_ip = Some(ip);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
@@ -1239,7 +1334,7 @@ mod tests {
|
||||
let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b);
|
||||
}
|
||||
TunnResult::Done => {}
|
||||
other => {
|
||||
_other => {
|
||||
// Drain
|
||||
loop {
|
||||
match client_tunn.decapsulate(None, &[], &mut buf_a) {
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartvpn',
|
||||
version: '1.16.3',
|
||||
version: '1.17.0',
|
||||
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
|
||||
}
|
||||
|
||||
@@ -217,6 +217,14 @@ export interface IVpnClientInfo {
|
||||
export interface IVpnServerStatistics extends IVpnStatistics {
|
||||
activeClients: number;
|
||||
totalConnections: number;
|
||||
/** Per-transport active client counts. */
|
||||
activeClientsWebsocket: number;
|
||||
activeClientsQuic: number;
|
||||
activeClientsWireguard: number;
|
||||
/** Per-transport total connection counts. */
|
||||
totalConnectionsWebsocket: number;
|
||||
totalConnectionsQuic: number;
|
||||
totalConnectionsWireguard: number;
|
||||
}
|
||||
|
||||
export interface IVpnKeypair {
|
||||
|
||||
Reference in New Issue
Block a user