16 Commits

Author SHA1 Message Date
fbfbe0db51 v1.17.0 2026-03-31 10:55:15 +00:00
67542f0be7 feat(wireguard): track per-transport server statistics and make WireGuard clients active only after handshake 2026-03-31 10:55:15 +00:00
13d0183e9d v1.16.5 2026-03-31 08:58:27 +00:00
99a8a29ff1 fix(rust-userspace-nat): improve TCP session backpressure, buffering, and idle cleanup in userspace NAT 2026-03-31 08:58:27 +00:00
fe9c693ac8 v1.16.4 2026-03-31 03:35:54 +00:00
20ef92599b fix(server): register preloaded WireGuard clients as peers on server startup 2026-03-31 03:35:54 +00:00
c3f180e264 v1.16.3 2026-03-31 03:21:04 +00:00
667e5ff3de fix(rust-nat): defer TCP bridge startup until handshake completion and buffer partial NAT socket writes 2026-03-31 03:21:04 +00:00
ef5856bd3a v1.16.2 2026-03-31 02:11:29 +00:00
6e4cafe3c5 fix(wireguard): sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key 2026-03-31 02:11:29 +00:00
42949b1233 v1.16.1 2026-03-30 18:06:16 +00:00
7ae7d389dd fix(rust/server): add serde alias for clientAllowedIPs in server config 2026-03-30 18:06:16 +00:00
414edf7038 v1.16.0 2026-03-30 17:55:27 +00:00
a1b62f6b62 feat(server): add configurable client endpoint and allowed IPs for generated VPN configs 2026-03-30 17:55:27 +00:00
cfa91fd419 v1.15.0 2026-03-30 14:32:02 +00:00
8eb26e1920 feat(vpnserver): add nftables-backed destination policy enforcement for TUN mode 2026-03-30 14:32:02 +00:00
11 changed files with 889 additions and 98 deletions

View File

@@ -1,5 +1,63 @@
# Changelog
## 2026-03-31 - 1.17.0 - feat(wireguard)
track per-transport server statistics and make WireGuard clients active only after handshake
- add websocket, quic, and wireguard active-client and total-connection counters to server statistics
- register WireGuard peers without marking them active until handshake/data is received, and remove them from active clients on expiration or idle timeout
- sync WireGuard byte counters into aggregate server stats independently of active client presence and expose new statistics fields in TypeScript interfaces
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT
- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data
- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps
- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up
- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay
## 2026-03-31 - 1.16.4 - fix(server)
register preloaded WireGuard clients as peers on server startup
- Adds configured clients from the runtime registry to the WireGuard listener when the server starts.
- Ensures clients loaded from config can complete WireGuard handshakes without requiring separate peer registration.
- Logs a warning if automatic peer registration fails for an individual client.
## 2026-03-31 - 1.16.3 - fix(rust-nat)
defer TCP bridge startup until handshake completion and buffer partial NAT socket writes
- Start TCP bridge tasks only after the smoltcp socket becomes active to prevent server data from arriving before the client handshake completes.
- Buffer pending TCP payloads and flush partial writes so bridge-to-socket data is not silently lost under backpressure.
- Keep closing TCP sessions alive until FIN processing completes and add logging for dropped packets when bridge or route channels are full.
## 2026-03-31 - 1.16.2 - fix(wireguard)
sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key
- Register, remove, and rotate WireGuard peers in the running listener when clients are added, deleted, or rekeyed.
- Generate client WireGuard configs with the public key derived from the configured WireGuard private key instead of reusing the generic server public key.
- Handle expired WireGuard sessions by re-initiating handshakes and mark client state as handshaking until the tunnel becomes active.
- Improve allowed IP matching and peer VPN IP extraction for runtime packet routing.
## 2026-03-30 - 1.16.1 - fix(rust/server)
add serde alias for clientAllowedIPs in server config
- Accepts the camelCase clientAllowedIPs field when deserializing server configuration.
- Improves compatibility with existing or external configuration formats without changing runtime behavior.
## 2026-03-30 - 1.16.0 - feat(server)
add configurable client endpoint and allowed IPs for generated VPN configs
- adds serverEndpoint to generated SmartVPN and WireGuard client configs so remote clients can use a public address instead of the listen address
- adds clientAllowedIPs to generated WireGuard configs to support full-tunnel or split-tunnel routing
- updates TypeScript interfaces to expose the new server configuration options
## 2026-03-30 - 1.15.0 - feat(vpnserver)
add nftables-backed destination policy enforcement for TUN mode
- add @push.rocks/smartnftables dependency and export it through the plugin layer
- apply destination policy rules via nftables when starting the server in TUN mode
- add periodic nftables health checks and best-effort cleanup on server stop
- update documentation for destination routing policy, socket transport mode, trusted client tags, events, and service generation
## 2026-03-30 - 1.14.0 - feat(nat)
add destination routing policy support for socket-mode VPN traffic

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartvpn",
"version": "1.14.0",
"version": "1.17.0",
"private": false,
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
"type": "module",
@@ -29,6 +29,7 @@
],
"license": "MIT",
"dependencies": {
"@push.rocks/smartnftables": "1.1.0",
"@push.rocks/smartpath": "^6.0.0",
"@push.rocks/smartrust": "^1.3.2"
},

11
pnpm-lock.yaml generated
View File

@@ -8,6 +8,9 @@ importers:
.:
dependencies:
'@push.rocks/smartnftables':
specifier: 1.1.0
version: 1.1.0
'@push.rocks/smartpath':
specifier: ^6.0.0
version: 6.0.0
@@ -1132,6 +1135,9 @@ packages:
'@push.rocks/smartnetwork@4.5.2':
resolution: {integrity: sha512-lbMMyc2f/WWd5+qzZyF1ynXndjCtasxPWmj/d8GUuis9rDrW7sLIT1PlAPC2F6Qsy4H/K32JrYU+01d/6sWObg==}
'@push.rocks/smartnftables@1.1.0':
resolution: {integrity: sha512-7JNzerlW20HEl2wKMBIHltwneCQRpXiD2lJkXZZc02ctnfjgFejXVDIeWomhPx6PZ0Z6zmqdF6rrFDtDHyqqfA==}
'@push.rocks/smartnpm@2.0.6':
resolution: {integrity: sha512-7anKDOjX6gXWs1IAc+YWz9ZZ8gDsTwaLh+CxRnGHjAawOmK788NrrgVCg2Fb3qojrPnoxecc46F8Ivp1BT7Izw==}
@@ -5335,6 +5341,11 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@push.rocks/smartnftables@1.1.0':
dependencies:
'@push.rocks/smartlog': 3.2.1
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartnpm@2.0.6':
dependencies:
'@push.rocks/consolecolor': 2.0.3

190
readme.md
View File

@@ -10,6 +10,7 @@ A high-performance VPN solution with a **TypeScript control plane** and a **Rust
🔄 **Hub API**: one `createClient()` call generates keys, assigns IP, returns both SmartVPN + WireGuard configs
📡 **Real-time telemetry**: RTT, jitter, loss ratio, link health — all via typed APIs
🌐 **Unified forwarding pipeline**: all transports share the same engine — TUN (kernel), userspace NAT (no root), or testing mode
🎯 **Destination routing policy**: force-target, block, or allow traffic per destination with nftables integration
## Issue Reporting and Security
@@ -36,11 +37,38 @@ The package ships with pre-compiled Rust binaries for **linux/amd64** and **linu
│ Config validation │ │ WS + QUIC + WireGuard │
│ Hub: client management │ │ TUN device, IP pool, NAT │
│ WireGuard .conf generation │ │ Rate limiting, ACLs, QoS │
│ nftables destination policy │ │ Destination routing, nftables│
└──────────────────────────────┘ └───────────────────────────────┘
```
**Split-plane design** — TypeScript handles orchestration, config, and DX; Rust handles every hot-path byte with zero-copy async I/O (tokio, mimalloc).
### IPC Transport Modes
The bridge between TypeScript and Rust supports two transport modes:
| Mode | Use Case | How It Works |
|------|----------|-------------|
| **stdio** | Development, testing | Spawns the Rust daemon as a child process, communicates over stdin/stdout |
| **socket** | Production | Connects to an already-running daemon via Unix domain socket, with optional auto-reconnect |
```typescript
// Development: spawn the daemon
const server = new VpnServer({ transport: { transport: 'stdio' } });
// Production: connect to running daemon
const server = new VpnServer({
transport: {
transport: 'socket',
socketPath: '/var/run/smartvpn.sock',
autoReconnect: true,
reconnectBaseDelayMs: 100,
reconnectMaxDelayMs: 5000,
maxReconnectAttempts: 10,
},
});
```
## Quick Start 🚀
### 1. Start a VPN Server (Hub)
@@ -54,8 +82,8 @@ await server.start({
privateKey: '<server-noise-private-key-base64>',
publicKey: '<server-noise-public-key-base64>',
subnet: '10.8.0.0/24',
transportMode: 'all', // WebSocket + QUIC + WireGuard simultaneously (default)
forwardingMode: 'tun', // 'tun' (kernel), 'socket' (userspace NAT), or 'testing'
transportMode: 'all', // WebSocket + QUIC + WireGuard simultaneously (default)
forwardingMode: 'tun', // 'tun' (kernel), 'socket' (userspace NAT), or 'testing'
wgPrivateKey: '<server-wg-private-key-base64>', // required for WireGuard transport
enableNat: true,
dns: ['1.1.1.1', '8.8.8.8'],
@@ -67,7 +95,7 @@ await server.start({
```typescript
const bundle = await server.createClient({
clientId: 'alice-laptop',
tags: ['engineering'],
serverDefinedClientTags: ['engineering'], // trusted tags for access control
security: {
destinationAllowList: ['10.0.0.0/8'], // can only reach internal network
destinationBlockList: ['10.0.0.99'], // except this host
@@ -155,6 +183,47 @@ await server.start({
- `remoteAddr` field on `IVpnClientInfo` exposes the real client IP for monitoring
- **Security**: must be `false` (default) when accepting direct connections — only enable behind a trusted proxy
### 🎯 Destination Routing Policy
Control where decrypted VPN client traffic goes — force it to a specific target, block it, or allow it through. Evaluated per-packet before per-client ACLs.
```typescript
await server.start({
// ...
forwardingMode: 'socket', // userspace NAT mode
destinationPolicy: {
default: 'forceTarget', // redirect all traffic to a target
target: '127.0.0.1', // target IP for 'forceTarget' mode
allowList: ['10.0.0.0/8'], // these destinations pass through directly
blockList: ['10.0.0.99'], // always blocked (deny overrides allow)
},
});
```
**Policy modes:**
| Mode | Behavior |
|------|----------|
| `'forceTarget'` | Rewrites destination IP to `target` — funnels all traffic through a single endpoint |
| `'block'` | Drops all traffic not explicitly in `allowList` |
| `'allow'` | Passes all traffic through (default, backward compatible) |
In **TUN mode**, destination policies are enforced via **nftables** rules (using `@push.rocks/smartnftables`). A 60-second health check automatically re-applies rules if they're removed externally.
In **socket mode**, the policy is evaluated in the userspace NAT engine before per-client ACLs.
### 🔗 Socket Forward Proxy Protocol
When using `forwardingMode: 'socket'` (userspace NAT), you can prepend **PROXY protocol v2 headers** on outbound TCP connections. This conveys the VPN client's tunnel IP as the source address to downstream services (e.g., SmartProxy):
```typescript
await server.start({
// ...
forwardingMode: 'socket',
socketForwardProxyProtocol: true, // downstream sees VPN client IP, not 127.0.0.1
});
```
### 📦 Packet Forwarding Modes
SmartVPN supports three forwarding modes, configurable per-server and per-client:
@@ -190,6 +259,30 @@ The userspace NAT mode extracts destination IP/port from IP packets, opens a rea
- **Dead-peer detection**: 180s inactivity timeout
- **MTU management**: Automatic overhead calculation (IP+TCP+WS+Noise = 79 bytes)
### 🏷️ Client Tags (Trusted vs Informational)
SmartVPN separates server-managed tags from client-reported tags:
| Field | Set By | Trust Level | Use For |
|-------|--------|-------------|---------|
| `serverDefinedClientTags` | Server admin (via `createClient` / `updateClient`) | ✅ Trusted | Access control, routing, billing |
| `clientDefinedClientTags` | Client (reported after connection) | ⚠️ Informational | Diagnostics, client self-identification |
| `tags` | *(deprecated)* | — | Legacy alias for `serverDefinedClientTags` |
```typescript
// Server-side: trusted tags
await server.createClient({
clientId: 'alice-laptop',
serverDefinedClientTags: ['engineering', 'office-berlin'],
});
// Client-side: informational tags (reported to server)
await client.connect({
// ...
clientDefinedClientTags: ['macOS', 'v2.1.0'],
});
```
### 🔄 Hub Client Management
The server acts as a **hub** — one API to manage all clients:
@@ -205,7 +298,7 @@ const all = await server.listRegisteredClients();
// Update (ACLs, tags, description, rate limits...)
await server.updateClient('bob-phone', {
security: { destinationAllowList: ['0.0.0.0/0'] },
tags: ['mobile', 'field-ops'],
serverDefinedClientTags: ['mobile', 'field-ops'],
});
// Enable / Disable
@@ -243,46 +336,100 @@ const conf = WgConfigGenerator.generateClientConfig({
// → standard WireGuard .conf compatible with wg-quick, iOS, Android
```
Server configs too:
```typescript
const serverConf = WgConfigGenerator.generateServerConfig({
privateKey: '<server-wg-private-key>',
address: '10.8.0.1/24',
listenPort: 51820,
enableNat: true,
natInterface: 'eth0',
peers: [
{ publicKey: '<client-wg-public-key>', allowedIps: ['10.8.0.2/32'] },
],
});
```
### 🖥️ System Service Installation
Generate systemd (Linux) or launchd (macOS) service units:
```typescript
import { VpnInstaller } from '@push.rocks/smartvpn';
const unit = VpnInstaller.generateServiceUnit({
binaryPath: '/usr/local/bin/smartvpn_daemon',
socketPath: '/var/run/smartvpn.sock',
mode: 'server',
configPath: '/etc/smartvpn/server.json',
});
// unit.platform → 'linux' | 'macos'
// unit.content → systemd unit file or launchd plist
// unit.platform → 'linux' | 'macos'
// unit.content → systemd unit file or launchd plist
// unit.installPath → /etc/systemd/system/smartvpn-server.service
```
You can also call `generateSystemdUnit()` or `generateLaunchdPlist()` directly for platform-specific options like custom descriptions.
### 📢 Events
Both `VpnServer` and `VpnClient` extend `EventEmitter` and emit typed events:
```typescript
server.on('client-connected', (info: IVpnClientInfo) => {
console.log(`${info.registeredClientId} connected from ${info.remoteAddr} via ${info.transportType}`);
});
server.on('client-disconnected', ({ clientId, reason }) => {
console.log(`${clientId} disconnected: ${reason}`);
});
client.on('status', (status: IVpnStatus) => {
console.log(`State: ${status.state}, IP: ${status.assignedIp}`);
});
// Both server and client emit:
server.on('exit', ({ code, signal }) => { /* daemon process exited */ });
server.on('reconnected', () => { /* socket transport reconnected */ });
```
| Event | Emitted By | Payload |
|-------|-----------|---------|
| `status` | Both | `IVpnStatus` — connection state changes |
| `error` | Both | `{ message, code? }` |
| `client-connected` | Server | `IVpnClientInfo` — full client info including transport type |
| `client-disconnected` | Server | `{ clientId, reason? }` |
| `exit` | Both | `{ code, signal }` — daemon process exited |
| `reconnected` | Both | `void` — socket transport reconnected |
## API Reference 📖
### Classes
| Class | Description |
|-------|-------------|
| `VpnServer` | Manages the Rust daemon in server mode. Hub methods for client CRUD. |
| `VpnClient` | Manages the Rust daemon in client mode. Connect, disconnect, telemetry. |
| `VpnBridge<T>` | Low-level typed IPC bridge (stdio or Unix socket). |
| `VpnConfig` | Static config validation and file I/O. |
| `VpnInstaller` | Generates systemd/launchd service files. |
| `WgConfigGenerator` | Generates standard WireGuard `.conf` files. |
| `VpnServer` | Manages the Rust daemon in server mode. Hub methods for client CRUD, telemetry, rate limits, WireGuard peer management. |
| `VpnClient` | Manages the Rust daemon in client mode. Connect, disconnect, status, telemetry. |
| `VpnBridge<T>` | Low-level typed IPC bridge (stdio or Unix socket). Handles spawn, connect, reconnect, and typed command dispatch. |
| `VpnConfig` | Static config validation and JSON file I/O. Validates keys, addresses, CIDRs, MTU, etc. |
| `VpnInstaller` | Generates systemd/launchd service files for daemon deployment. |
| `WgConfigGenerator` | Generates standard WireGuard `.conf` files (client and server). |
### Key Interfaces
| Interface | Purpose |
|-----------|---------|
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode, clients, proxy protocol) |
| `IVpnClientConfig` | Client configuration (server URL, keys, transport, forwarding mode, WG options) |
| `IClientEntry` | Server-side client definition (ID, keys, security, priority, tags, expiry) |
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode, clients, proxy protocol, destination policy) |
| `IVpnClientConfig` | Client configuration (server URL, keys, transport, forwarding mode, WG options, client-defined tags) |
| `IClientEntry` | Server-side client definition (ID, keys, security, priority, server/client tags, expiry) |
| `IClientSecurity` | Per-client ACLs and rate limits (SmartProxy-aligned naming) |
| `IClientRateLimit` | Rate limiting config (bytesPerSec, burstBytes) |
| `IClientConfigBundle` | Full config bundle returned by `createClient()` |
| `IVpnClientInfo` | Connected client info (IP, stats, authenticated key, remote addr) |
| `IClientConfigBundle` | Full config bundle returned by `createClient()` — includes SmartVPN config, WireGuard .conf, and secrets |
| `IVpnClientInfo` | Connected client info (IP, stats, authenticated key, remote addr, transport type) |
| `IVpnConnectionQuality` | RTT, jitter, loss ratio, link health |
| `IVpnMtuInfo` | TUN MTU, effective MTU, overhead bytes, oversized packet stats |
| `IVpnKeypair` | Base64-encoded public/private key pair |
| `IDestinationPolicy` | Destination routing policy (forceTarget / block / allow with allow/block lists) |
| `IVpnEventMap` | Typed event map for server and client EventEmitter |
### Server IPC Commands
@@ -317,7 +464,7 @@ const unit = VpnInstaller.generateServiceUnit({
// All transports simultaneously (default) — WS + QUIC + WireGuard
{ transportMode: 'all', listenAddr: '0.0.0.0:443', wgPrivateKey: '...', wgListenPort: 51820 }
// WS + QUIC only (backward compat)
// WS + QUIC only
{ transportMode: 'both', listenAddr: '0.0.0.0:443', quicListenAddr: '0.0.0.0:4433' }
// WebSocket only
@@ -376,7 +523,7 @@ pnpm install
# Build (TypeScript + Rust cross-compile)
pnpm build
# Run all tests (79 TS + 132 Rust = 211 tests)
# Run all tests
pnpm test
# Run Rust tests directly
@@ -393,6 +540,7 @@ smartvpn/
├── ts/ # TypeScript control plane
│ ├── index.ts # All exports
│ ├── smartvpn.interfaces.ts # Interfaces, types, IPC command maps
│ ├── smartvpn.plugins.ts # Dependency imports
│ ├── smartvpn.classes.vpnserver.ts
│ ├── smartvpn.classes.vpnclient.ts
│ ├── smartvpn.classes.vpnbridge.ts
@@ -417,7 +565,7 @@ smartvpn/
│ ├── ratelimit.rs # Token bucket
│ ├── userspace_nat.rs # Userspace TCP/UDP NAT proxy
│ └── ... # tunnel, network, telemetry, qos, mtu, reconnect
├── test/ # 9 test files (79 tests)
├── test/ # Test files
├── dist_ts/ # Compiled TypeScript
└── dist_rust/ # Cross-compiled binaries (linux amd64 + arm64)
```

View File

@@ -7,7 +7,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{info, error, warn};
use tracing::{debug, info, error, warn};
use crate::acl;
use crate::client_registry::{ClientEntry, ClientRegistry};
@@ -84,6 +84,14 @@ pub struct ServerConfig {
pub wg_listen_port: Option<u16>,
/// WireGuard: pre-configured peers.
pub wg_peers: Option<Vec<crate::wireguard::WgPeerConfig>>,
/// Public endpoint address for generated client configs (e.g. "vpn.example.com:51820").
/// Used as WireGuard `Endpoint` and SmartVPN `serverUrl` host.
/// Defaults to listen_addr.
pub server_endpoint: Option<String>,
/// AllowedIPs for generated WireGuard client configs.
/// Defaults to ["0.0.0.0/0"] (full tunnel).
#[serde(alias = "clientAllowedIPs")]
pub client_allowed_ips: Option<Vec<String>>,
}
/// Information about a connected client.
@@ -124,6 +132,14 @@ pub struct ServerStatistics {
pub uptime_seconds: u64,
pub active_clients: u64,
pub total_connections: u64,
/// Per-transport active client counts.
pub active_clients_websocket: u64,
pub active_clients_quic: u64,
pub active_clients_wireguard: u64,
/// Per-transport total connection counts.
pub total_connections_websocket: u64,
pub total_connections_quic: u64,
pub total_connections_wireguard: u64,
}
/// The forwarding engine determines how decrypted IP packets are routed.
@@ -364,6 +380,28 @@ impl VpnServer {
}
info!("VPN server started (transport: {})", transport_mode);
// Register pre-loaded clients (from config.clients) as WG peers.
// The WG listener only starts with config.wg_peers; clients loaded into the
// registry need to be dynamically added so WG handshakes work.
if self.wg_command_tx.is_some() {
let registry = state.client_registry.read().await;
for entry in registry.list() {
if let (Some(ref wg_key), Some(ref ip_str)) = (&entry.wg_public_key, &entry.assigned_ip) {
let peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_key.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", ip_str)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(peer_config).await {
warn!("Failed to register pre-loaded WG peer for {}: {}", entry.client_id, e);
}
}
}
}
Ok(())
}
@@ -420,7 +458,21 @@ impl VpnServer {
if let Some(ref state) = self.state {
let mut stats = state.stats.read().await.clone();
stats.uptime_seconds = state.started_at.elapsed().as_secs();
stats.active_clients = state.clients.read().await.len() as u64;
let clients = state.clients.read().await;
stats.active_clients = clients.len() as u64;
// Compute per-transport active counts
stats.active_clients_websocket = 0;
stats.active_clients_quic = 0;
stats.active_clients_wireguard = 0;
for info in clients.values() {
match info.transport_type.as_str() {
"websocket" => stats.active_clients_websocket += 1,
"quic" => stats.active_clients_quic += 1,
"wireguard" => stats.active_clients_wireguard += 1,
_ => {}
}
}
drop(clients);
stats
} else {
ServerStatistics::default()
@@ -586,10 +638,27 @@ impl VpnServer {
// Add to registry
state.client_registry.write().await.add(entry.clone())?;
// Register WG peer with the running WG listener (if active)
if self.wg_command_tx.is_some() {
let wg_peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_pub.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", assigned_ip)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(wg_peer_config).await {
warn!("Failed to register WG peer for client {}: {}", client_id, e);
}
}
// Build SmartVPN client config
let smartvpn_server_url = format!("wss://{}",
state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr)
.replace("0.0.0.0", "localhost"));
let smartvpn_config = serde_json::json!({
"serverUrl": format!("wss://{}",
state.config.listen_addr.replace("0.0.0.0", "localhost")),
"serverUrl": smartvpn_server_url,
"serverPublicKey": state.config.public_key,
"clientPrivateKey": noise_priv,
"clientPublicKey": noise_pub,
@@ -599,15 +668,25 @@ impl VpnServer {
});
// Build WireGuard config string
let wg_server_pubkey = match &state.config.wg_private_key {
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
None => state.config.public_key.clone(),
};
let wg_endpoint = state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr);
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
.map(|ips| ips.join(", "))
.unwrap_or_else(|| "0.0.0.0/0".to_string());
let wg_config = format!(
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = 0.0.0.0/0\nEndpoint = {}\nPersistentKeepalive = 25\n",
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = {}\nEndpoint = {}\nPersistentKeepalive = 25\n",
wg_priv,
assigned_ip,
state.config.dns.as_ref()
.map(|d| format!("DNS = {}", d.join(", ")))
.unwrap_or_default(),
state.config.public_key,
state.config.listen_addr,
wg_server_pubkey,
wg_allowed_ips,
wg_endpoint,
);
let entry_json = serde_json::to_value(&entry)?;
@@ -628,6 +707,14 @@ impl VpnServer {
let state = self.state.as_ref()
.ok_or_else(|| anyhow::anyhow!("Server not running"))?;
let entry = state.client_registry.write().await.remove(client_id)?;
// Remove WG peer from running listener
if self.wg_command_tx.is_some() {
if let Some(ref wg_key) = entry.wg_public_key {
if let Err(e) = self.remove_wg_peer(wg_key).await {
debug!("Failed to remove WG peer for client {}: {}", client_id, e);
}
}
}
// Release the IP if assigned
if let Some(ref ip_str) = entry.assigned_ip {
if let Ok(ip) = ip_str.parse::<Ipv4Addr>() {
@@ -714,6 +801,14 @@ impl VpnServer {
let state = self.state.as_ref()
.ok_or_else(|| anyhow::anyhow!("Server not running"))?;
// Capture old WG key before rotation (needed to remove from WG listener)
let old_wg_pub = {
let registry = state.client_registry.read().await;
let entry = registry.get_by_id(client_id)
.ok_or_else(|| anyhow::anyhow!("Client '{}' not found", client_id))?;
entry.wg_public_key.clone()
};
let (noise_pub, noise_priv) = crypto::generate_keypair()?;
let (wg_pub, wg_priv) = crate::wireguard::generate_wg_keypair();
@@ -732,9 +827,31 @@ impl VpnServer {
.and_then(|v| v.as_str())
.unwrap_or("0.0.0.0");
// Update WG listener: remove old peer, add new peer
if self.wg_command_tx.is_some() {
if let Some(ref old_key) = old_wg_pub {
if let Err(e) = self.remove_wg_peer(old_key).await {
debug!("Failed to remove old WG peer during rotation: {}", e);
}
}
let wg_peer_config = crate::wireguard::WgPeerConfig {
public_key: wg_pub.clone(),
preshared_key: None,
allowed_ips: vec![format!("{}/32", assigned_ip)],
endpoint: None,
persistent_keepalive: Some(25),
};
if let Err(e) = self.add_wg_peer(wg_peer_config).await {
warn!("Failed to register new WG peer during rotation: {}", e);
}
}
let smartvpn_server_url = format!("wss://{}",
state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr)
.replace("0.0.0.0", "localhost"));
let smartvpn_config = serde_json::json!({
"serverUrl": format!("wss://{}",
state.config.listen_addr.replace("0.0.0.0", "localhost")),
"serverUrl": smartvpn_server_url,
"serverPublicKey": state.config.public_key,
"clientPrivateKey": noise_priv,
"clientPublicKey": noise_pub,
@@ -743,14 +860,24 @@ impl VpnServer {
"keepaliveIntervalSecs": state.config.keepalive_interval_secs,
});
let wg_server_pubkey = match &state.config.wg_private_key {
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
None => state.config.public_key.clone(),
};
let wg_endpoint = state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr);
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
.map(|ips| ips.join(", "))
.unwrap_or_else(|| "0.0.0.0/0".to_string());
let wg_config = format!(
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = 0.0.0.0/0\nEndpoint = {}\nPersistentKeepalive = 25\n",
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = {}\nEndpoint = {}\nPersistentKeepalive = 25\n",
wg_priv, assigned_ip,
state.config.dns.as_ref()
.map(|d| format!("DNS = {}", d.join(", ")))
.unwrap_or_default(),
state.config.public_key,
state.config.listen_addr,
wg_server_pubkey,
wg_allowed_ips,
wg_endpoint,
);
Ok(serde_json::json!({
@@ -774,10 +901,13 @@ impl VpnServer {
match format {
"smartvpn" => {
let smartvpn_server_url = format!("wss://{}",
state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr)
.replace("0.0.0.0", "localhost"));
Ok(serde_json::json!({
"config": {
"serverUrl": format!("wss://{}",
state.config.listen_addr.replace("0.0.0.0", "localhost")),
"serverUrl": smartvpn_server_url,
"serverPublicKey": state.config.public_key,
"clientPublicKey": entry.public_key,
"dns": state.config.dns,
@@ -787,15 +917,25 @@ impl VpnServer {
}))
}
"wireguard" => {
let wg_server_pubkey = match &state.config.wg_private_key {
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
None => state.config.public_key.clone(),
};
let assigned_ip = entry.assigned_ip.as_deref().unwrap_or("0.0.0.0");
let wg_endpoint = state.config.server_endpoint.as_deref()
.unwrap_or(&state.config.listen_addr);
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
.map(|ips| ips.join(", "))
.unwrap_or_else(|| "0.0.0.0/0".to_string());
let config = format!(
"[Interface]\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = 0.0.0.0/0\nEndpoint = {}\nPersistentKeepalive = 25\n",
"[Interface]\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = {}\nEndpoint = {}\nPersistentKeepalive = 25\n",
assigned_ip,
state.config.dns.as_ref()
.map(|d| format!("DNS = {}", d.join(", ")))
.unwrap_or_default(),
state.config.public_key,
state.config.listen_addr,
wg_server_pubkey,
wg_allowed_ips,
wg_endpoint,
);
Ok(serde_json::json!({ "config": config }))
}
@@ -1185,6 +1325,11 @@ async fn handle_client_connection(
{
let mut stats = state.stats.write().await;
stats.total_connections += 1;
match transport_type {
"websocket" => stats.total_connections_websocket += 1,
"quic" => stats.total_connections_quic += 1,
_ => {}
}
}
// Send assigned IP info (encrypted), include effective MTU

View File

@@ -17,6 +17,10 @@ use crate::acl;
use crate::server::{DestinationPolicyConfig, ServerState};
use crate::tunnel;
/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer).
/// Sessions exceeding this are aborted — the client cannot keep up.
const TCP_PENDING_SEND_MAX: usize = 512 * 1024;
// ============================================================================
// Virtual IP device for smoltcp
// ============================================================================
@@ -101,7 +105,7 @@ impl Device for VirtualIpDevice {
let mut caps = DeviceCapabilities::default();
caps.medium = Medium::Ip;
caps.max_transmission_unit = self.mtu;
caps.max_burst_size = Some(1);
caps.max_burst_size = None;
caps
}
}
@@ -121,9 +125,20 @@ struct SessionKey {
struct TcpSession {
smoltcp_handle: SocketHandle,
bridge_data_tx: mpsc::Sender<Vec<u8>>,
/// Channel to send data to the bridge task. None until bridge starts.
bridge_data_tx: Option<mpsc::Sender<Vec<u8>>>,
#[allow(dead_code)]
client_ip: Ipv4Addr,
/// Bridge task has been spawned (deferred until handshake completes)
bridge_started: bool,
/// Address to connect the bridge task to (may differ from dst if policy rewrote it)
connect_addr: SocketAddr,
/// Buffered data from bridge waiting to be written to smoltcp socket
pending_send: Vec<u8>,
/// Session is closing (FIN in progress), don't accept new SYNs
closing: bool,
/// Last time data flowed through this session (for idle timeout)
last_activity: tokio::time::Instant,
}
struct UdpSession {
@@ -308,7 +323,9 @@ impl NatEngine {
// SYN without ACK = new connection
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
if is_syn && !self.tcp_sessions.contains_key(&key) {
// Skip if session exists (including closing sessions — let FIN complete)
let session_exists = self.tcp_sessions.contains_key(&key);
if is_syn && !session_exists {
match self.evaluate_destination(dst_ip, dst_port) {
DestinationAction::Drop => {
debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
@@ -375,23 +392,22 @@ impl NatEngine {
let handle = self.sockets.add(socket);
// Channel for sending data from NAT engine to bridge task
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
let session = TcpSession {
smoltcp_handle: handle,
bridge_data_tx: data_tx,
bridge_data_tx: None,
client_ip: key.src_ip,
bridge_started: false,
connect_addr,
pending_send: Vec::new(),
closing: false,
last_activity: tokio::time::Instant::now(),
};
self.tcp_sessions.insert(key.clone(), session);
// Spawn bridge task that connects to the resolved destination
let bridge_tx = self.bridge_tx.clone();
let key_clone = key.clone();
let proxy_protocol = self.proxy_protocol;
tokio::spawn(async move {
tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await;
});
// NOTE: Bridge task is NOT spawned here — it will be spawned in process()
// once the smoltcp handshake completes (socket.is_active() == true).
// This prevents data from the real server arriving before the VPN client
// handshake is done, which would cause silent data loss.
debug!(
"NAT: new TCP session {}:{} -> {}:{}",
@@ -451,15 +467,69 @@ impl NatEngine {
self.iface
.poll(now, &mut self.device, &mut self.sockets);
// Start bridge tasks for sessions whose handshake just completed
let bridge_tx_clone = self.bridge_tx.clone();
let proxy_protocol = self.proxy_protocol;
for (key, session) in self.tcp_sessions.iter_mut() {
if !session.bridge_started && !session.closing {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.is_active() {
session.bridge_started = true;
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
session.bridge_data_tx = Some(data_tx);
let btx = bridge_tx_clone.clone();
let k = key.clone();
let addr = session.connect_addr;
let pp = proxy_protocol;
tokio::spawn(async move {
tcp_bridge_task(k, data_rx, btx, pp, addr).await;
});
debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}",
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
}
}
}
// Flush pending send buffers to smoltcp sockets
for (_key, session) in self.tcp_sessions.iter_mut() {
if !session.pending_send.is_empty() {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() {
match socket.send_slice(&session.pending_send) {
Ok(written) if written > 0 => {
session.pending_send.drain(..written);
}
_ => {}
}
}
}
}
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks
let mut closed_tcp: Vec<SessionKey> = Vec::new();
let mut active_tcp: Vec<SessionKey> = Vec::new();
for (key, session) in &self.tcp_sessions {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_recv() {
let _ = socket.recv(|data| {
let _ = session.bridge_data_tx.try_send(data.to_vec());
(data.len(), ())
});
if session.bridge_started && socket.can_recv() {
if let Some(ref sender) = session.bridge_data_tx {
// Reserve channel slot BEFORE consuming from smoltcp.
// If the channel is full, we don't consume — smoltcp's RX buffer
// fills up, it stops advertising TCP window space, and the VPN
// client's TCP stack backs off. Proper end-to-end backpressure.
match sender.try_reserve() {
Ok(permit) => {
let _ = socket.recv(|data| {
permit.send(data.to_vec());
(data.len(), ())
});
active_tcp.push(key.clone());
}
Err(_) => {
debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure",
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
}
}
}
}
// Detect closed connections
if !socket.is_open() && !socket.is_listening() {
@@ -467,6 +537,14 @@ impl NatEngine {
}
}
// Update last_activity for sessions that had data flow
let now = tokio::time::Instant::now();
for key in active_tcp {
if let Some(session) = self.tcp_sessions.get_mut(&key) {
session.last_activity = now;
}
}
// Clean up closed TCP sessions
for key in closed_tcp {
if let Some(session) = self.tcp_sessions.remove(&key) {
@@ -479,7 +557,9 @@ impl NatEngine {
for (_key, session) in &self.udp_sessions {
let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
while let Ok((data, _meta)) = socket.recv() {
let _ = session.bridge_data_tx.try_send(data.to_vec());
if session.bridge_data_tx.try_send(data.to_vec()).is_err() {
debug!("NAT: bridge channel full, UDP data dropped");
}
}
}
@@ -488,7 +568,9 @@ impl NatEngine {
for packet in self.device.drain_tx() {
if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) {
if let Some(sender) = routes.get(&dst_ip) {
let _ = sender.try_send(packet);
if sender.try_send(packet).is_err() {
debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip);
}
}
}
}
@@ -497,22 +579,43 @@ impl NatEngine {
fn handle_bridge_message(&mut self, msg: BridgeMessage) {
match msg {
BridgeMessage::TcpData { key, data } => {
if let Some(session) = self.tcp_sessions.get(&key) {
if let Some(session) = self.tcp_sessions.get_mut(&key) {
session.last_activity = tokio::time::Instant::now();
// Append to pending buffer, then flush as much as possible
session.pending_send.extend_from_slice(&data);
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
if socket.can_send() {
let _ = socket.send_slice(&data);
if socket.can_send() && !session.pending_send.is_empty() {
match socket.send_slice(&session.pending_send) {
Ok(written) if written > 0 => {
session.pending_send.drain(..written);
}
_ => {}
}
}
// Cap check — abort session if client can't keep up
if session.pending_send.len() > TCP_PENDING_SEND_MAX {
warn!(
"NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting",
key.src_ip, key.src_port, key.dst_ip, key.dst_port,
TCP_PENDING_SEND_MAX / 1024
);
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.abort();
session.pending_send.clear();
session.closing = true;
}
}
}
BridgeMessage::TcpClosed { key } => {
if let Some(session) = self.tcp_sessions.remove(&key) {
if let Some(session) = self.tcp_sessions.get_mut(&key) {
let socket =
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.close();
session.closing = true;
// Don't remove from SocketSet yet — let smoltcp send FIN
// It will be cleaned up in process() when is_open() returns false
self.tcp_sessions.insert(key, session);
}
}
BridgeMessage::UdpData { key, data } => {
@@ -552,6 +655,29 @@ impl NatEngine {
}
}
fn cleanup_idle_tcp_sessions(&mut self) {
let timeout = Duration::from_secs(300); // 5 minutes
let now = tokio::time::Instant::now();
let expired: Vec<SessionKey> = self
.tcp_sessions
.iter()
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
.map(|(k, _)| k.clone())
.collect();
for key in expired {
if let Some(session) = self.tcp_sessions.remove(&key) {
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
socket.abort();
self.sockets.remove(session.smoltcp_handle);
warn!(
"NAT: TCP session timed out {}:{} -> {}:{}",
key.src_ip, key.src_port, key.dst_ip, key.dst_port
);
}
}
}
/// Main async event loop for the NAT engine.
pub async fn run(
mut self,
@@ -559,9 +685,13 @@ impl NatEngine {
mut shutdown_rx: mpsc::Receiver<()>,
) -> Result<()> {
info!("Userspace NAT engine started");
let mut timer = tokio::time::interval(Duration::from_millis(50));
let default_poll_delay = Duration::from_millis(50);
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
// Dynamic poll timer — reset after each event using smoltcp's poll_delay()
let poll_sleep = tokio::time::sleep(default_poll_delay);
tokio::pin!(poll_sleep);
loop {
tokio::select! {
Some(packet) = packet_rx.recv() => {
@@ -572,18 +702,26 @@ impl NatEngine {
self.handle_bridge_message(msg);
self.process().await;
}
_ = timer.tick() => {
() = &mut poll_sleep => {
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
self.process().await;
}
_ = cleanup_timer.tick() => {
self.cleanup_idle_udp_sessions();
self.cleanup_idle_tcp_sessions();
}
_ = shutdown_rx.recv() => {
info!("Userspace NAT engine shutting down");
break;
}
}
// Reset poll delay based on smoltcp's actual timer needs
let now = self.smoltcp_now();
let delay = self.iface.poll_delay(now, &self.sockets)
.map(|d| Duration::from_millis(d.total_millis()))
.unwrap_or(default_poll_delay);
poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay);
}
Ok(())

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use boringtun::noise::errors::WireGuardError;
use boringtun::noise::rate_limiter::RateLimiter;
use boringtun::noise::{Tunn, TunnResult};
use boringtun::x25519::{PublicKey, StaticSecret};
@@ -99,6 +100,13 @@ pub fn generate_wg_keypair() -> (String, String) {
(pub_b64, priv_b64)
}
/// Derive the WireGuard public key (base64) from a private key (base64).
pub fn wg_public_key_from_private(private_key_b64: &str) -> Result<String> {
let private = parse_private_key(private_key_b64)?;
let public = PublicKey::from(&private);
Ok(BASE64.encode(public.to_bytes()))
}
fn parse_private_key(b64: &str) -> Result<StaticSecret> {
let bytes = BASE64.decode(b64)?;
if bytes.len() != 32 {
@@ -212,11 +220,20 @@ struct PeerState {
#[allow(dead_code)]
persistent_keepalive: Option<u16>,
stats: WgPeerStats,
/// Whether this peer has completed a WireGuard handshake and is in state.clients.
is_connected: bool,
/// Last time we received data or handshake activity from this peer.
last_activity_at: Option<tokio::time::Instant>,
/// VPN IP assigned during registration (used for connect/disconnect).
vpn_ip: Option<Ipv4Addr>,
/// Previous synced byte counts for aggregate stats delta tracking.
prev_synced_bytes_sent: u64,
prev_synced_bytes_received: u64,
}
impl PeerState {
fn matches_dst(&self, dst_ip: IpAddr) -> bool {
self.allowed_ips.iter().any(|aip| aip.matches(dst_ip))
fn matches_allowed_ips(&self, ip: IpAddr) -> bool {
self.allowed_ips.iter().any(|aip| aip.matches(ip))
}
}
@@ -268,6 +285,11 @@ fn add_peer_to_loop(
endpoint,
persistent_keepalive: config.persistent_keepalive,
stats: WgPeerStats::default(),
is_connected: false,
last_activity_at: None,
vpn_ip: None,
prev_synced_bytes_sent: 0,
prev_synced_bytes_received: 0,
});
info!("Added WireGuard peer: {}", config.public_key);
@@ -286,9 +308,10 @@ pub struct WgListenerConfig {
pub peers: Vec<WgPeerConfig>,
}
/// Extract the first /32 IPv4 address from a list of AllowedIp entries.
/// This is the peer's VPN IP used for return-packet routing.
/// Extract the peer's VPN IP from AllowedIp entries.
/// Prefers /32 entries (exact match); falls back to any IPv4 address.
fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
// Prefer /32 entries (exact peer VPN IP)
for aip in allowed_ips {
if let IpAddr::V4(v4) = aip.addr {
if aip.prefix_len == 32 {
@@ -296,6 +319,12 @@ fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
}
}
}
// Fallback: use the first IPv4 address from any prefix length
for aip in allowed_ips {
if let IpAddr::V4(v4) = aip.addr {
return Some(v4);
}
}
None
}
@@ -308,8 +337,9 @@ fn wg_timestamp_now() -> String {
format!("{}", duration.as_secs())
}
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool).
/// Returns the VPN IP and the per-peer return-packet receiver.
/// Register a WG peer in ServerState (tun_routes + ip_pool only).
/// Does NOT add to state.clients — peers appear there only after handshake.
/// Returns the VPN IP.
async fn register_wg_peer(
state: &Arc<ServerState>,
peer: &PeerState,
@@ -351,13 +381,23 @@ async fn register_wg_peer(
});
}
// Insert ClientInfo
info!("WG peer {} registered with IP {} (not yet connected)", peer.public_key_b64, vpn_ip);
Ok(Some(vpn_ip))
}
/// Add a WG peer to state.clients on first successful handshake (data received).
async fn connect_wg_peer(
state: &Arc<ServerState>,
peer: &PeerState,
vpn_ip: Ipv4Addr,
) {
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
let client_info = ClientInfo {
client_id: client_id.clone(),
assigned_ip: vpn_ip.to_string(),
connected_since: wg_timestamp_now(),
bytes_sent: 0,
bytes_received: 0,
bytes_sent: peer.stats.bytes_sent,
bytes_received: peer.stats.bytes_received,
packets_dropped: 0,
bytes_dropped: 0,
last_keepalive_at: None,
@@ -365,13 +405,31 @@ async fn register_wg_peer(
rate_limit_bytes_per_sec: None,
burst_bytes: None,
authenticated_key: peer.public_key_b64.clone(),
registered_client_id: client_id,
registered_client_id: client_id.clone(),
remote_addr: peer.endpoint.map(|e| e.to_string()),
transport_type: "wireguard".to_string(),
};
state.clients.write().await.insert(client_info.client_id.clone(), client_info);
Ok(Some(vpn_ip))
// Increment total_connections
{
let mut stats = state.stats.write().await;
stats.total_connections += 1;
stats.total_connections_wireguard += 1;
}
info!("WG peer {} connected (IP: {})", peer.public_key_b64, vpn_ip);
}
/// Remove a WG peer from state.clients (disconnect without unregistering).
async fn disconnect_wg_peer(
state: &Arc<ServerState>,
pubkey: &str,
) {
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
if state.clients.write().await.remove(&client_id).is_some() {
info!("WG peer {} disconnected (removed from active clients)", pubkey);
}
}
/// Unregister a WG peer from ServerState.
@@ -445,6 +503,11 @@ pub async fn run_wg_listener(
endpoint,
persistent_keepalive: peer_config.persistent_keepalive,
stats: WgPeerStats::default(),
is_connected: false,
last_activity_at: None,
vpn_ip: None,
prev_synced_bytes_sent: 0,
prev_synced_bytes_received: 0,
});
}
@@ -455,11 +518,12 @@ pub async fn run_wg_listener(
// Merged return-packet channel: all per-peer channels feed into this
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
// Register initial peers in ServerState and track their VPN IPs
// Register initial peers in ServerState (IP reservation + tun_routes only, NOT state.clients)
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
for peer in &peers {
for peer in peers.iter_mut() {
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
peer.vpn_ip = Some(ip);
}
}
@@ -468,6 +532,7 @@ pub async fn run_wg_listener(
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
let mut idle_check_timer = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
tokio::select! {
@@ -491,11 +556,13 @@ pub async fn run_wg_listener(
}
}
peer.endpoint = Some(src_addr);
// Handshake response counts as activity
peer.last_activity_at = Some(tokio::time::Instant::now());
handled = true;
break;
}
TunnResult::WriteToTunnelV4(packet, addr) => {
if peer.matches_dst(IpAddr::V4(addr)) {
if peer.matches_allowed_ips(IpAddr::V4(addr)) {
let pkt_len = packet.len() as u64;
// Forward via shared forwarding engine
let mut engine = state.forwarding_engine.lock().await;
@@ -515,11 +582,20 @@ pub async fn run_wg_listener(
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
// Track activity and detect handshake completion
peer.last_activity_at = Some(tokio::time::Instant::now());
if !peer.is_connected {
peer.is_connected = true;
peer.stats.last_handshake_time = Some(wg_timestamp_now());
if let Some(vpn_ip) = peer.vpn_ip {
connect_wg_peer(&state, peer, vpn_ip).await;
}
}
handled = true;
break;
}
TunnResult::WriteToTunnelV6(packet, addr) => {
if peer.matches_dst(IpAddr::V6(addr)) {
if peer.matches_allowed_ips(IpAddr::V6(addr)) {
let pkt_len = packet.len() as u64;
let mut engine = state.forwarding_engine.lock().await;
match &mut *engine {
@@ -538,6 +614,15 @@ pub async fn run_wg_listener(
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
// Track activity and detect handshake completion
peer.last_activity_at = Some(tokio::time::Instant::now());
if !peer.is_connected {
peer.is_connected = true;
peer.stats.last_handshake_time = Some(wg_timestamp_now());
if let Some(vpn_ip) = peer.vpn_ip {
connect_wg_peer(&state, peer, vpn_ip).await;
}
}
handled = true;
break;
}
@@ -586,6 +671,13 @@ pub async fn run_wg_listener(
udp_socket.send_to(packet, endpoint).await?;
}
}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WG peer {} connection expired", peer.public_key_b64);
if peer.is_connected {
peer.is_connected = false;
disconnect_wg_peer(&state, &peer.public_key_b64).await;
}
}
TunnResult::Err(e) => {
debug!("Timer error for WG peer {}: {:?}",
peer.public_key_b64, e);
@@ -599,19 +691,39 @@ pub async fn run_wg_listener(
_ = stats_timer.tick() => {
let mut clients = state.clients.write().await;
let mut stats = state.stats.write().await;
for peer in peers.iter() {
for peer in peers.iter_mut() {
// Always update aggregate stats (regardless of connection state)
let delta_sent = peer.stats.bytes_sent.saturating_sub(peer.prev_synced_bytes_sent);
let delta_recv = peer.stats.bytes_received.saturating_sub(peer.prev_synced_bytes_received);
if delta_sent > 0 || delta_recv > 0 {
stats.bytes_sent += delta_sent;
stats.bytes_received += delta_recv;
peer.prev_synced_bytes_sent = peer.stats.bytes_sent;
peer.prev_synced_bytes_received = peer.stats.bytes_received;
}
// Only update ClientInfo if peer is connected (in state.clients)
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
if let Some(info) = clients.get_mut(&client_id) {
// Update stats delta
let prev_sent = info.bytes_sent;
let prev_recv = info.bytes_received;
info.bytes_sent = peer.stats.bytes_sent;
info.bytes_received = peer.stats.bytes_received;
info.remote_addr = peer.endpoint.map(|e| e.to_string());
}
}
}
// Update aggregate stats
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent);
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv);
// --- Idle timeout check (every 10s) ---
_ = idle_check_timer.tick() => {
let now = tokio::time::Instant::now();
for peer in peers.iter_mut() {
if peer.is_connected {
if let Some(last) = peer.last_activity_at {
if now.duration_since(last) > std::time::Duration::from_secs(180) {
info!("WG peer {} idle timeout (180s), disconnecting", peer.public_key_b64);
peer.is_connected = false;
disconnect_wg_peer(&state, &peer.public_key_b64).await;
}
}
}
}
}
@@ -628,11 +740,12 @@ pub async fn run_wg_listener(
&config.private_key,
);
if result.is_ok() {
// Register new peer in ServerState
let peer = peers.last().unwrap();
// Register new peer in ServerState (IP + tun_routes only)
let peer = peers.last_mut().unwrap();
match register_wg_peer(&state, peer, &wg_return_tx).await {
Ok(Some(ip)) => {
peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
peer.vpn_ip = Some(ip);
}
Ok(None) => {}
Err(e) => {
@@ -796,12 +909,12 @@ impl WgClient {
let state = self.state.clone();
let assigned_ip = config.address.clone();
// Update state
// Update state — handshake hasn't completed yet
{
let mut s = state.write().await;
s.state = "connected".to_string();
s.state = "handshaking".to_string();
s.assigned_ip = Some(assigned_ip.clone());
s.connected_since = Some(chrono_now());
s.connected_since = None;
}
// Spawn client loop
@@ -868,7 +981,7 @@ async fn wg_client_loop(
endpoint: SocketAddr,
_allowed_ips: Vec<AllowedIp>,
shared_stats: Arc<RwLock<WgPeerStats>>,
_state: Arc<RwLock<WgClientState>>,
state: Arc<RwLock<WgClientState>>,
mut shutdown_rx: oneshot::Receiver<()>,
) -> Result<()> {
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
@@ -876,6 +989,7 @@ async fn wg_client_loop(
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
let mut handshake_complete = false;
let (mut tun_reader, mut tun_writer) = tokio::io::split(tun_device);
@@ -916,14 +1030,37 @@ async fn wg_client_loop(
tun_writer.write_all(packet).await?;
local_stats.bytes_received += pkt_len;
local_stats.packets_received += 1;
if !handshake_complete {
handshake_complete = true;
let mut s = state.write().await;
s.state = "connected".to_string();
s.connected_since = Some(chrono_now());
info!("WireGuard handshake completed, tunnel active");
}
}
TunnResult::WriteToTunnelV6(packet, _addr) => {
let pkt_len = packet.len() as u64;
tun_writer.write_all(packet).await?;
local_stats.bytes_received += pkt_len;
local_stats.packets_received += 1;
if !handshake_complete {
handshake_complete = true;
let mut s = state.write().await;
s.state = "connected".to_string();
s.connected_since = Some(chrono_now());
info!("WireGuard handshake completed, tunnel active");
}
}
TunnResult::Done => {}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WireGuard session expired during decapsulate, re-initiating handshake");
match tunn.format_handshake_initiation(&mut dst_buf, true) {
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, endpoint).await?;
}
_ => {}
}
}
TunnResult::Err(e) => {
debug!("Client decapsulate error: {:?}", e);
}
@@ -955,6 +1092,19 @@ async fn wg_client_loop(
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, endpoint).await?;
}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("WireGuard connection expired, re-initiating handshake");
match tunn.format_handshake_initiation(&mut dst_buf, true) {
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, endpoint).await?;
debug!("Sent handshake re-initiation after expiry");
}
TunnResult::Err(e) => {
warn!("Failed to re-initiate handshake: {:?}", e);
}
_ => {}
}
}
TunnResult::Err(e) => {
debug!("Client timer error: {:?}", e);
}
@@ -1028,6 +1178,19 @@ mod tests {
assert_eq!(public.to_bytes(), derived_public.to_bytes());
}
#[test]
fn test_wg_public_key_from_private() {
let (pub_b64, priv_b64) = generate_wg_keypair();
let derived = wg_public_key_from_private(&priv_b64).unwrap();
assert_eq!(derived, pub_b64);
}
#[test]
fn test_wg_public_key_from_private_invalid() {
assert!(wg_public_key_from_private("not-valid").is_err());
assert!(wg_public_key_from_private("AAAA").is_err());
}
#[test]
fn test_parse_invalid_key() {
assert!(parse_private_key("not-valid-base64!!!").is_err());
@@ -1171,7 +1334,7 @@ mod tests {
let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b);
}
TunnResult::Done => {}
other => {
_other => {
// Drain
loop {
match client_tunn.decapsulate(None, &[], &mut buf_a) {

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartvpn',
version: '1.14.0',
version: '1.17.0',
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
}

View File

@@ -12,6 +12,7 @@ import type {
IWgPeerInfo,
IClientEntry,
IClientConfigBundle,
IDestinationPolicy,
TVpnServerCommands,
} from './smartvpn.interfaces.js';
@@ -21,6 +22,10 @@ import type {
export class VpnServer extends plugins.events.EventEmitter {
private bridge: VpnBridge<TVpnServerCommands>;
private options: IVpnServerOptions;
private nft?: plugins.smartnftables.SmartNftables;
private nftHealthInterval?: ReturnType<typeof setInterval>;
private nftSubnet?: string;
private nftPolicy?: IDestinationPolicy;
constructor(options: IVpnServerOptions) {
super();
@@ -50,6 +55,11 @@ export class VpnServer extends plugins.events.EventEmitter {
const cfg = config || this.options.config;
if (cfg) {
await this.bridge.sendCommand('start', { config: cfg });
// For TUN mode with a destination policy, set up nftables rules
if (cfg.forwardingMode === 'tun' && cfg.destinationPolicy) {
await this.setupTunDestinationPolicy(cfg.subnet, cfg.destinationPolicy);
}
}
}
@@ -229,10 +239,110 @@ export class VpnServer extends plugins.events.EventEmitter {
return this.bridge.sendCommand('generateClientKeypair', {} as Record<string, never>);
}
// ── TUN Destination Policy via nftables ──────────────────────────────
/**
* Set up nftables rules for TUN mode destination policy.
* Also starts a 60-second health check interval to re-apply if rules are removed externally.
*/
private async setupTunDestinationPolicy(subnet: string, policy: IDestinationPolicy): Promise<void> {
this.nftSubnet = subnet;
this.nftPolicy = policy;
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'smartvpn_tun',
dryRun: process.getuid?.() !== 0,
});
await this.nft.initialize();
await this.applyDestinationPolicyRules();
// Health check: re-apply rules if they disappear
this.nftHealthInterval = setInterval(async () => {
if (!this.nft) return;
try {
const exists = await this.nft.tableExists();
if (!exists) {
console.warn('[smartvpn] nftables rules missing, re-applying destination policy');
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'smartvpn_tun',
});
await this.nft.initialize();
await this.applyDestinationPolicyRules();
}
} catch (err) {
console.warn(`[smartvpn] nftables health check failed: ${err}`);
}
}, 60_000);
}
/**
* Apply destination policy as nftables rules.
* Order: blockList (drop) → allowList (accept) → default action.
*/
private async applyDestinationPolicyRules(): Promise<void> {
if (!this.nft || !this.nftSubnet || !this.nftPolicy) return;
const subnet = this.nftSubnet;
const policy = this.nftPolicy;
const family = 'ip';
const table = 'smartvpn_tun';
const commands: string[] = [];
// 1. Block list (deny wins — evaluated first)
if (policy.blockList) {
for (const dest of policy.blockList) {
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} ip daddr ${dest} drop`
);
}
}
// 2. Allow list (pass through directly — skip DNAT)
if (policy.allowList) {
for (const dest of policy.allowList) {
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} ip daddr ${dest} accept`
);
}
}
// 3. Default action
switch (policy.default) {
case 'forceTarget': {
const target = policy.target || '127.0.0.1';
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} dnat to ${target}`
);
break;
}
case 'block':
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} drop`
);
break;
case 'allow':
// No rule needed — kernel default allows
break;
}
if (commands.length > 0) {
await this.nft.applyRuleGroup('vpn-destination-policy', commands);
}
}
/**
* Stop the daemon bridge.
*/
public stop(): void {
// Clean up nftables rules
if (this.nftHealthInterval) {
clearInterval(this.nftHealthInterval);
this.nftHealthInterval = undefined;
}
if (this.nft) {
this.nft.cleanup().catch(() => {}); // best-effort cleanup
this.nft = undefined;
}
this.bridge.stop();
}

View File

@@ -129,6 +129,14 @@ export interface IVpnServerConfig {
* Controls where decrypted traffic goes: allow through, block, or redirect to a target.
* Default: all traffic passes through (backward compatible). */
destinationPolicy?: IDestinationPolicy;
/** Public endpoint address for generated client configs (e.g. 'vpn.example.com:51820').
* Used as the WireGuard `Endpoint =` and SmartVPN `serverUrl` host.
* Defaults to listenAddr (which is typically wrong for remote clients). */
serverEndpoint?: string;
/** AllowedIPs for generated WireGuard client configs.
* Controls what traffic the client routes through the VPN tunnel.
* Defaults to ['0.0.0.0/0'] (full tunnel). Set to e.g. ['10.8.0.0/24'] for split tunnel. */
clientAllowedIPs?: string[];
}
/**
@@ -209,6 +217,14 @@ export interface IVpnClientInfo {
export interface IVpnServerStatistics extends IVpnStatistics {
activeClients: number;
totalConnections: number;
/** Per-transport active client counts. */
activeClientsWebsocket: number;
activeClientsQuic: number;
activeClientsWireguard: number;
/** Per-transport total connection counts. */
totalConnectionsWebsocket: number;
totalConnectionsQuic: number;
totalConnectionsWireguard: number;
}
export interface IVpnKeypair {

View File

@@ -8,7 +8,8 @@ import * as events from 'events';
export { path, fs, os, url, events };
// @push.rocks
import * as smartnftables from '@push.rocks/smartnftables';
import * as smartpath from '@push.rocks/smartpath';
import * as smartrust from '@push.rocks/smartrust';
export { smartpath, smartrust };
export { smartnftables, smartpath, smartrust };