Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c3afb83470 | |||
| 2d7a507cf2 | |||
| a757a4bb73 | |||
| 5bf21ab4ac | |||
| af46dc9b39 | |||
| 79d9928485 | |||
| 70e838c8ff | |||
| dbcfdb1fb6 | |||
| c97beed6e0 | |||
| c3cc237db5 | |||
| 17c27a92d6 | |||
| 9d105e8034 |
45
changelog.md
45
changelog.md
@@ -1,5 +1,50 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-30 - 1.13.0 - feat(client-registry)
|
||||
separate trusted server-defined client tags from client-reported tags with legacy tag compatibility
|
||||
|
||||
- Adds distinct serverDefinedClientTags and clientDefinedClientTags fields to client registry and TypeScript interfaces.
|
||||
- Treats legacy tags values as serverDefinedClientTags during deserialization and server-side create/update flows for backward compatibility.
|
||||
- Clarifies that only server-defined tags are trusted for access control while client-defined tags are informational only.
|
||||
|
||||
## 2026-03-30 - 1.12.0 - feat(server)
|
||||
add optional PROXY protocol v2 headers for socket-based userspace NAT forwarding
|
||||
|
||||
- introduce a socketForwardProxyProtocol server option in Rust and TypeScript interfaces
|
||||
- pass the new setting into the userspace NAT engine and TCP bridge tasks
|
||||
- prepend PROXY protocol v2 headers on outbound TCP connections when socket forwarding is enabled
|
||||
|
||||
## 2026-03-30 - 1.11.0 - feat(server)
|
||||
unify WireGuard into the shared server transport pipeline
|
||||
|
||||
- add integrated WireGuard server support to VpnServer with shared startup, shutdown, status, statistics, and peer management
|
||||
- introduce transportMode 'all' as the default and add server config support for wgPrivateKey, wgListenPort, and preconfigured peers
|
||||
- register WireGuard peers in the shared client registry and IP pool so they use the same forwarding engine, routing, and monitoring as WebSocket and QUIC clients
|
||||
- expose transportType in server client info and update TypeScript interfaces and documentation to reflect unified multi-transport forwarding
|
||||
|
||||
## 2026-03-30 - 1.10.2 - fix(client)
|
||||
wait for the connection task to shut down cleanly before disconnecting and increase test timeout
|
||||
|
||||
- store the spawned client connection task handle and await it during disconnect with a 5 second timeout so the disconnect frame can be sent before closing
|
||||
- increase the test script timeout from 60 seconds to 90 seconds to reduce flaky test runs
|
||||
|
||||
## 2026-03-29 - 1.10.1 - fix(test, docs, scripts)
|
||||
correct test command verbosity, shorten load test timings, and document forwarding modes
|
||||
|
||||
- Fixes the test script by removing the duplicated verbose flag in package.json.
|
||||
- Reduces load test delays and burst sizes to keep keepalive and connection tests faster and more stable.
|
||||
- Updates the README to describe forwardingMode options, userspace NAT support, and related configuration examples.
|
||||
|
||||
## 2026-03-29 - 1.10.0 - feat(rust-server, rust-client, ts-interfaces)
|
||||
add configurable packet forwarding with TUN and userspace NAT modes
|
||||
|
||||
- introduce forwardingMode options for client and server configuration interfaces
|
||||
- add server-side forwarding engines for kernel TUN, userspace socket NAT, and testing mode
|
||||
- add a smoltcp-based userspace NAT implementation for packet forwarding without root-only TUN routing
|
||||
- enable client-side TUN forwarding support with route setup, packet I/O, and cleanup
|
||||
- centralize raw packet destination IP extraction in tunnel utilities for shared routing logic
|
||||
- update test command timeout and logging flags
|
||||
|
||||
## 2026-03-29 - 1.9.0 - feat(server)
|
||||
add PROXY protocol v2 support for real client IP handling and connection ACLs
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartvpn",
|
||||
"version": "1.9.0",
|
||||
"version": "1.13.0",
|
||||
"private": false,
|
||||
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
|
||||
"type": "module",
|
||||
@@ -12,7 +12,7 @@
|
||||
"scripts": {
|
||||
"build": "(tsbuild tsfolders) && (tsrust)",
|
||||
"test:before": "(tsrust)",
|
||||
"test": "tstest test/ --verbose",
|
||||
"test": "tstest test/ --verbose --logfile --timeout 90",
|
||||
"buildDocs": "tsdoc"
|
||||
},
|
||||
"repository": {
|
||||
|
||||
56
readme.md
56
readme.md
@@ -9,6 +9,7 @@ A high-performance VPN solution with a **TypeScript control plane** and a **Rust
|
||||
📊 **Adaptive QoS**: per-client rate limiting, priority queues, connection quality tracking
|
||||
🔄 **Hub API**: one `createClient()` call generates keys, assigns IP, returns both SmartVPN + WireGuard configs
|
||||
📡 **Real-time telemetry**: RTT, jitter, loss ratio, link health — all via typed APIs
|
||||
🌐 **Unified forwarding pipeline**: all transports share the same engine — TUN (kernel), userspace NAT (no root), or testing mode
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
@@ -53,7 +54,9 @@ await server.start({
|
||||
privateKey: '<server-noise-private-key-base64>',
|
||||
publicKey: '<server-noise-public-key-base64>',
|
||||
subnet: '10.8.0.0/24',
|
||||
transportMode: 'both', // WebSocket + QUIC simultaneously
|
||||
transportMode: 'all', // WebSocket + QUIC + WireGuard simultaneously (default)
|
||||
forwardingMode: 'tun', // 'tun' (kernel), 'socket' (userspace NAT), or 'testing'
|
||||
wgPrivateKey: '<server-wg-private-key-base64>', // required for WireGuard transport
|
||||
enableNat: true,
|
||||
dns: ['1.1.1.1', '8.8.8.8'],
|
||||
});
|
||||
@@ -107,7 +110,7 @@ Every client authenticates with a **Noise IK handshake** (`Noise_IK_25519_ChaCha
|
||||
| **QUIC** | UDP (via quinn) | Low latency, datagram support for IP packets |
|
||||
| **WireGuard** | UDP (via boringtun) | Standard WG clients (iOS, Android, wg-quick) |
|
||||
|
||||
The server can run **all three simultaneously** with `transportMode: 'both'` (WS + QUIC) or `'wireguard'`. Clients auto-negotiate with `transport: 'auto'` (tries QUIC first, falls back to WS).
|
||||
The server runs **all three simultaneously** by default with `transportMode: 'all'`. All transports share the same unified forwarding pipeline (`ForwardingEngine`), IP pool, client registry, and stats — so WireGuard peers get the same userspace NAT, rate limiting, and monitoring as WS/QUIC clients. Clients auto-negotiate with `transport: 'auto'` (tries QUIC first, falls back to WS).
|
||||
|
||||
### 🛡️ ACL Engine (SmartProxy-Aligned)
|
||||
|
||||
@@ -152,6 +155,33 @@ await server.start({
|
||||
- `remoteAddr` field on `IVpnClientInfo` exposes the real client IP for monitoring
|
||||
- **Security**: must be `false` (default) when accepting direct connections — only enable behind a trusted proxy
|
||||
|
||||
### 📦 Packet Forwarding Modes
|
||||
|
||||
SmartVPN supports three forwarding modes, configurable per-server and per-client:
|
||||
|
||||
| Mode | Flag | Description | Root Required |
|
||||
|------|------|-------------|---------------|
|
||||
| **TUN** | `'tun'` | Kernel TUN device — real packet forwarding with system routing | ✅ Yes |
|
||||
| **Userspace NAT** | `'socket'` | Userspace TCP/UDP proxy via `connect(2)` — no TUN, no root needed | ❌ No |
|
||||
| **Testing** | `'testing'` | Monitoring only — packets are counted but not forwarded | ❌ No |
|
||||
|
||||
```typescript
|
||||
// Server with userspace NAT (no root required)
|
||||
await server.start({
|
||||
// ...
|
||||
forwardingMode: 'socket',
|
||||
enableNat: true,
|
||||
});
|
||||
|
||||
// Client with TUN device
|
||||
const { assignedIp } = await client.connect({
|
||||
// ...
|
||||
forwardingMode: 'tun',
|
||||
});
|
||||
```
|
||||
|
||||
The userspace NAT mode extracts destination IP/port from IP packets, opens a real socket to the destination, and relays data — supporting both TCP streams and UDP datagrams without requiring `CAP_NET_ADMIN` or root privileges.
|
||||
|
||||
### 📊 Telemetry & QoS
|
||||
|
||||
- **Connection quality**: Smoothed RTT, jitter, min/max RTT, loss ratio, link health (`healthy` / `degraded` / `critical`)
|
||||
@@ -244,8 +274,8 @@ const unit = VpnInstaller.generateServiceUnit({
|
||||
|
||||
| Interface | Purpose |
|
||||
|-----------|---------|
|
||||
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, clients, proxy protocol) |
|
||||
| `IVpnClientConfig` | Client configuration (server URL, keys, transport, WG options) |
|
||||
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode, clients, proxy protocol) |
|
||||
| `IVpnClientConfig` | Client configuration (server URL, keys, transport, forwarding mode, WG options) |
|
||||
| `IClientEntry` | Server-side client definition (ID, keys, security, priority, tags, expiry) |
|
||||
| `IClientSecurity` | Per-client ACLs and rate limits (SmartProxy-aligned naming) |
|
||||
| `IClientRateLimit` | Rate limiting config (bytesPerSec, burstBytes) |
|
||||
@@ -284,19 +314,24 @@ const unit = VpnInstaller.generateServiceUnit({
|
||||
### Server Configuration
|
||||
|
||||
```typescript
|
||||
// All transports simultaneously (default) — WS + QUIC + WireGuard
|
||||
{ transportMode: 'all', listenAddr: '0.0.0.0:443', wgPrivateKey: '...', wgListenPort: 51820 }
|
||||
|
||||
// WS + QUIC only (backward compat)
|
||||
{ transportMode: 'both', listenAddr: '0.0.0.0:443', quicListenAddr: '0.0.0.0:4433' }
|
||||
|
||||
// WebSocket only
|
||||
{ transportMode: 'websocket', listenAddr: '0.0.0.0:443' }
|
||||
|
||||
// QUIC only
|
||||
{ transportMode: 'quic', listenAddr: '0.0.0.0:443' }
|
||||
|
||||
// Both (WS + QUIC on same or different ports)
|
||||
{ transportMode: 'both', listenAddr: '0.0.0.0:443', quicListenAddr: '0.0.0.0:4433' }
|
||||
|
||||
// WireGuard
|
||||
{ transportMode: 'wireguard', wgListenPort: 51820, wgPeers: [...] }
|
||||
// WireGuard only
|
||||
{ transportMode: 'wireguard', wgPrivateKey: '...', wgListenPort: 51820, wgPeers: [...] }
|
||||
```
|
||||
|
||||
All transport modes share the same `forwardingMode` — WireGuard peers can use `'socket'` (userspace NAT) just like WS/QUIC clients.
|
||||
|
||||
### Client Configuration
|
||||
|
||||
```typescript
|
||||
@@ -341,7 +376,7 @@ pnpm install
|
||||
# Build (TypeScript + Rust cross-compile)
|
||||
pnpm build
|
||||
|
||||
# Run all tests (79 TS + 129 Rust = 208 tests)
|
||||
# Run all tests (79 TS + 132 Rust = 211 tests)
|
||||
pnpm test
|
||||
|
||||
# Run Rust tests directly
|
||||
@@ -380,6 +415,7 @@ smartvpn/
|
||||
│ ├── codec.rs # Binary frame protocol
|
||||
│ ├── keepalive.rs # Adaptive keepalives
|
||||
│ ├── ratelimit.rs # Token bucket
|
||||
│ ├── userspace_nat.rs # Userspace TCP/UDP NAT proxy
|
||||
│ └── ... # tunnel, network, telemetry, qos, mtu, reconnect
|
||||
├── test/ # 9 test files (79 tests)
|
||||
├── dist_ts/ # Compiled TypeScript
|
||||
|
||||
115
rust/Cargo.lock
generated
115
rust/Cargo.lock
generated
@@ -237,6 +237,12 @@ version = "3.20.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.11.1"
|
||||
@@ -488,6 +494,47 @@ version = "2.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea"
|
||||
|
||||
[[package]]
|
||||
name = "defmt"
|
||||
version = "0.3.100"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f0963443817029b2024136fc4dd07a5107eb8f977eaf18fcd1fdeb11306b64ad"
|
||||
dependencies = [
|
||||
"defmt 1.0.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "defmt"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "548d977b6da32fa1d1fda2876453da1e7df63ad0304c8b3dae4dbe7b96f39b78"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"defmt-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "defmt-macros"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d4fc12a85bcf441cfe44344c4b72d58493178ce635338a3f3b78943aceb258e"
|
||||
dependencies = [
|
||||
"defmt-parser",
|
||||
"proc-macro-error2",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "defmt-parser"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10d60334b3b2e7c9d91ef8150abfb6fa4c1c39ebbcf4a81c2e346aad939fee3e"
|
||||
dependencies = [
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.5.8"
|
||||
@@ -714,6 +761,25 @@ dependencies = [
|
||||
"polyval",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hash32"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2af2455f757db2b292a9b1768c4b70186d443bcb3b316252d6b540aec1cd89ed"
|
||||
dependencies = [
|
||||
"hash32",
|
||||
"stable_deref_trait",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.5.0"
|
||||
@@ -915,6 +981,12 @@ version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
|
||||
|
||||
[[package]]
|
||||
name = "managed"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.2.0"
|
||||
@@ -1116,6 +1188,28 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr2"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error2"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802"
|
||||
dependencies = [
|
||||
"proc-macro-error-attr2",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.106"
|
||||
@@ -1598,6 +1692,7 @@ dependencies = [
|
||||
"rustls-pki-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"smoltcp",
|
||||
"snow",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
@@ -1609,6 +1704,20 @@ dependencies = [
|
||||
"webpki-roots 1.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smoltcp"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac729b0a77bd092a3f06ddaddc59fe0d67f48ba0de45a9abe707c2842c7f8767"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"byteorder",
|
||||
"cfg-if",
|
||||
"defmt 0.3.100",
|
||||
"heapless",
|
||||
"managed",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snow"
|
||||
version = "0.9.6"
|
||||
@@ -1635,6 +1744,12 @@ dependencies = [
|
||||
"windows-sys 0.60.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.11.1"
|
||||
|
||||
@@ -35,6 +35,7 @@ rustls-pemfile = "2"
|
||||
webpki-roots = "1"
|
||||
mimalloc = "0.1"
|
||||
boringtun = "0.7"
|
||||
smoltcp = { version = "0.13", default-features = false, features = ["medium-ip", "proto-ipv4", "socket-tcp", "socket-udp", "alloc"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
ipnet = "2"
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use bytes::BytesMut;
|
||||
use serde::Deserialize;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, watch, RwLock};
|
||||
use tracing::{info, error, warn, debug};
|
||||
@@ -12,6 +13,7 @@ use crate::telemetry::ConnectionQuality;
|
||||
use crate::transport;
|
||||
use crate::transport_trait::{self, TransportSink, TransportStream};
|
||||
use crate::quic_transport;
|
||||
use crate::tunnel::{self, TunConfig};
|
||||
|
||||
/// Client configuration (matches TS IVpnClientConfig).
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
@@ -30,6 +32,9 @@ pub struct ClientConfig {
|
||||
pub transport: Option<String>,
|
||||
/// For QUIC: SHA-256 hash of server certificate (base64) for cert pinning.
|
||||
pub server_cert_hash: Option<String>,
|
||||
/// Forwarding mode: "tun" (TUN device, requires root) or "testing" (no TUN).
|
||||
/// Default: "testing".
|
||||
pub forwarding_mode: Option<String>,
|
||||
}
|
||||
|
||||
/// Client statistics.
|
||||
@@ -76,6 +81,7 @@ pub struct VpnClient {
|
||||
connected_since: Arc<RwLock<Option<std::time::Instant>>>,
|
||||
quality_rx: Option<watch::Receiver<ConnectionQuality>>,
|
||||
link_health: Arc<RwLock<LinkHealth>>,
|
||||
connection_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl VpnClient {
|
||||
@@ -88,6 +94,7 @@ impl VpnClient {
|
||||
connected_since: Arc::new(RwLock::new(None)),
|
||||
quality_rx: None,
|
||||
link_health: Arc::new(RwLock::new(LinkHealth::Degraded)),
|
||||
connection_handle: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,6 +241,31 @@ impl VpnClient {
|
||||
|
||||
info!("Connected to VPN, assigned IP: {}", assigned_ip);
|
||||
|
||||
// Optionally create TUN device for IP packet forwarding (requires root)
|
||||
let tun_enabled = config.forwarding_mode.as_deref() == Some("tun");
|
||||
let (tun_reader, tun_writer, tun_subnet) = if tun_enabled {
|
||||
let client_tun_ip: Ipv4Addr = assigned_ip.parse()?;
|
||||
let mtu = ip_info["mtu"].as_u64().unwrap_or(1420) as u16;
|
||||
let tun_config = TunConfig {
|
||||
name: "svpn-client0".to_string(),
|
||||
address: client_tun_ip,
|
||||
netmask: Ipv4Addr::new(255, 255, 255, 0),
|
||||
mtu,
|
||||
};
|
||||
let tun_device = tunnel::create_tun(&tun_config)?;
|
||||
|
||||
// Add route for VPN subnet through the TUN device
|
||||
let gateway_str = ip_info["gateway"].as_str().unwrap_or("10.8.0.1");
|
||||
let gateway: Ipv4Addr = gateway_str.parse().unwrap_or(Ipv4Addr::new(10, 8, 0, 1));
|
||||
let subnet = format!("{}/24", Ipv4Addr::from(u32::from(gateway) & 0xFFFFFF00));
|
||||
tunnel::add_route(&subnet, &tun_config.name).await?;
|
||||
|
||||
let (reader, writer) = tokio::io::split(tun_device);
|
||||
(Some(reader), Some(writer), Some(subnet))
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
// Create adaptive keepalive monitor (use custom interval if configured)
|
||||
let ka_config = config.keepalive_interval_secs.map(|secs| {
|
||||
let mut cfg = keepalive::AdaptiveKeepaliveConfig::default();
|
||||
@@ -250,7 +282,7 @@ impl VpnClient {
|
||||
|
||||
// Spawn packet forwarding loop
|
||||
let assigned_ip_clone = assigned_ip.clone();
|
||||
tokio::spawn(client_loop(
|
||||
let join_handle = tokio::spawn(client_loop(
|
||||
sink,
|
||||
stream,
|
||||
noise_transport,
|
||||
@@ -260,7 +292,11 @@ impl VpnClient {
|
||||
handle.signal_rx,
|
||||
handle.ack_tx,
|
||||
link_health,
|
||||
tun_reader,
|
||||
tun_writer,
|
||||
tun_subnet,
|
||||
));
|
||||
self.connection_handle = Some(join_handle);
|
||||
|
||||
Ok(assigned_ip_clone)
|
||||
}
|
||||
@@ -270,6 +306,13 @@ impl VpnClient {
|
||||
if let Some(tx) = self.shutdown_tx.take() {
|
||||
let _ = tx.send(()).await;
|
||||
}
|
||||
// Wait for the connection task to send the Disconnect frame and close
|
||||
if let Some(handle) = self.connection_handle.take() {
|
||||
let _ = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
handle,
|
||||
).await;
|
||||
}
|
||||
*self.assigned_ip.write().await = None;
|
||||
*self.connected_since.write().await = None;
|
||||
*self.state.write().await = ClientState::Disconnected;
|
||||
@@ -356,8 +399,14 @@ async fn client_loop(
|
||||
mut signal_rx: mpsc::Receiver<KeepaliveSignal>,
|
||||
ack_tx: mpsc::Sender<()>,
|
||||
link_health: Arc<RwLock<LinkHealth>>,
|
||||
mut tun_reader: Option<tokio::io::ReadHalf<tun::AsyncDevice>>,
|
||||
mut tun_writer: Option<tokio::io::WriteHalf<tun::AsyncDevice>>,
|
||||
tun_subnet: Option<String>,
|
||||
) {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
let mut buf = vec![0u8; 65535];
|
||||
let mut tun_buf = vec![0u8; 65536];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -373,6 +422,14 @@ async fn client_loop(
|
||||
let mut s = stats.write().await;
|
||||
s.bytes_received += len as u64;
|
||||
s.packets_received += 1;
|
||||
drop(s);
|
||||
|
||||
// Write decrypted packet to TUN device (if enabled)
|
||||
if let Some(ref mut writer) = tun_writer {
|
||||
if let Err(e) = writer.write_all(&buf[..len]).await {
|
||||
warn!("TUN write error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Decrypt error: {}", e);
|
||||
@@ -407,6 +464,50 @@ async fn client_loop(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Read outbound packets from TUN and send to server (only when TUN enabled)
|
||||
result = async {
|
||||
match tun_reader {
|
||||
Some(ref mut reader) => reader.read(&mut tun_buf).await,
|
||||
None => std::future::pending::<std::io::Result<usize>>().await,
|
||||
}
|
||||
} => {
|
||||
match result {
|
||||
Ok(0) => {
|
||||
info!("TUN device closed");
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
match noise_transport.write_message(&tun_buf[..n], &mut buf) {
|
||||
Ok(len) => {
|
||||
let frame = Frame {
|
||||
packet_type: PacketType::IpPacket,
|
||||
payload: buf[..len].to_vec(),
|
||||
};
|
||||
let mut frame_bytes = BytesMut::new();
|
||||
if <FrameCodec as tokio_util::codec::Encoder<Frame>>::encode(
|
||||
&mut FrameCodec, frame, &mut frame_bytes
|
||||
).is_ok() {
|
||||
if sink.send_reliable(frame_bytes.to_vec()).await.is_err() {
|
||||
warn!("Failed to send TUN packet to server");
|
||||
break;
|
||||
}
|
||||
let mut s = stats.write().await;
|
||||
s.bytes_sent += n as u64;
|
||||
s.packets_sent += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Noise encrypt error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("TUN read error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
signal = signal_rx.recv() => {
|
||||
match signal {
|
||||
Some(KeepaliveSignal::SendPing(timestamp_ms)) => {
|
||||
@@ -456,6 +557,13 @@ async fn client_loop(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup: remove TUN route if enabled
|
||||
if let Some(ref subnet) = tun_subnet {
|
||||
if let Err(e) = tunnel::remove_route(subnet, "svpn-client0").await {
|
||||
warn!("Failed to remove client TUN route: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to connect via QUIC. Returns transport halves on success.
|
||||
|
||||
@@ -44,7 +44,12 @@ pub struct ClientEntry {
|
||||
pub priority: Option<u32>,
|
||||
/// Whether this client is enabled (default: true).
|
||||
pub enabled: Option<bool>,
|
||||
/// Tags for grouping.
|
||||
/// Tags assigned by the server admin — trusted, used for access control.
|
||||
pub server_defined_client_tags: Option<Vec<String>>,
|
||||
/// Tags reported by the connecting client — informational only.
|
||||
pub client_defined_client_tags: Option<Vec<String>>,
|
||||
/// Legacy tags field — treated as serverDefinedClientTags during deserialization.
|
||||
#[serde(default)]
|
||||
pub tags: Option<Vec<String>>,
|
||||
/// Optional description.
|
||||
pub description: Option<String>,
|
||||
@@ -90,7 +95,11 @@ impl ClientRegistry {
|
||||
/// Build a registry from a list of client entries.
|
||||
pub fn from_entries(entries: Vec<ClientEntry>) -> Result<Self> {
|
||||
let mut registry = Self::new();
|
||||
for entry in entries {
|
||||
for mut entry in entries {
|
||||
// Migrate legacy `tags` → `serverDefinedClientTags`
|
||||
if entry.server_defined_client_tags.is_none() && entry.tags.is_some() {
|
||||
entry.server_defined_client_tags = entry.tags.take();
|
||||
}
|
||||
registry.add(entry)?;
|
||||
}
|
||||
Ok(registry)
|
||||
@@ -193,6 +202,8 @@ mod tests {
|
||||
security: None,
|
||||
priority: None,
|
||||
enabled: None,
|
||||
server_defined_client_tags: None,
|
||||
client_defined_client_tags: None,
|
||||
tags: None,
|
||||
description: None,
|
||||
expires_at: None,
|
||||
|
||||
@@ -21,3 +21,4 @@ pub mod wireguard;
|
||||
pub mod client_registry;
|
||||
pub mod acl;
|
||||
pub mod proxy_protocol;
|
||||
pub mod userspace_nat;
|
||||
|
||||
@@ -7,7 +7,7 @@ use tracing::{info, error, warn};
|
||||
use crate::client::{ClientConfig, VpnClient};
|
||||
use crate::crypto;
|
||||
use crate::server::{ServerConfig, VpnServer};
|
||||
use crate::wireguard::{self, WgClient, WgClientConfig, WgPeerConfig, WgServer, WgServerConfig};
|
||||
use crate::wireguard::{self, WgClient, WgClientConfig, WgPeerConfig};
|
||||
|
||||
// ============================================================================
|
||||
// IPC protocol types
|
||||
@@ -95,7 +95,6 @@ pub async fn management_loop_stdio(mode: &str) -> Result<()> {
|
||||
let mut vpn_client = VpnClient::new();
|
||||
let mut vpn_server = VpnServer::new();
|
||||
let mut wg_client = WgClient::new();
|
||||
let mut wg_server = WgServer::new();
|
||||
|
||||
send_event_stdout("ready", serde_json::json!({ "mode": mode }));
|
||||
|
||||
@@ -131,7 +130,7 @@ pub async fn management_loop_stdio(mode: &str) -> Result<()> {
|
||||
|
||||
let response = match mode {
|
||||
"client" => handle_client_request(&request, &mut vpn_client, &mut wg_client).await,
|
||||
"server" => handle_server_request(&request, &mut vpn_server, &mut wg_server).await,
|
||||
"server" => handle_server_request(&request, &mut vpn_server).await,
|
||||
_ => ManagementResponse::err(request.id.clone(), format!("Unknown mode: {}", mode)),
|
||||
};
|
||||
send_response_stdout(&response);
|
||||
@@ -154,7 +153,6 @@ pub async fn management_loop_socket(socket_path: &str, mode: &str) -> Result<()>
|
||||
let vpn_client = std::sync::Arc::new(Mutex::new(VpnClient::new()));
|
||||
let vpn_server = std::sync::Arc::new(Mutex::new(VpnServer::new()));
|
||||
let wg_client = std::sync::Arc::new(Mutex::new(WgClient::new()));
|
||||
let wg_server = std::sync::Arc::new(Mutex::new(WgServer::new()));
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
@@ -163,10 +161,9 @@ pub async fn management_loop_socket(socket_path: &str, mode: &str) -> Result<()>
|
||||
let client = vpn_client.clone();
|
||||
let server = vpn_server.clone();
|
||||
let wg_c = wg_client.clone();
|
||||
let wg_s = wg_server.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) =
|
||||
handle_socket_connection(stream, &mode, client, server, wg_c, wg_s).await
|
||||
handle_socket_connection(stream, &mode, client, server, wg_c).await
|
||||
{
|
||||
warn!("Socket connection error: {}", e);
|
||||
}
|
||||
@@ -185,7 +182,6 @@ async fn handle_socket_connection(
|
||||
vpn_client: std::sync::Arc<Mutex<VpnClient>>,
|
||||
vpn_server: std::sync::Arc<Mutex<VpnServer>>,
|
||||
wg_client: std::sync::Arc<Mutex<WgClient>>,
|
||||
wg_server: std::sync::Arc<Mutex<WgServer>>,
|
||||
) -> Result<()> {
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
let buf_reader = BufReader::new(reader);
|
||||
@@ -241,8 +237,7 @@ async fn handle_socket_connection(
|
||||
}
|
||||
"server" => {
|
||||
let mut server = vpn_server.lock().await;
|
||||
let mut wg_s = wg_server.lock().await;
|
||||
handle_server_request(&request, &mut server, &mut wg_s).await
|
||||
handle_server_request(&request, &mut server).await
|
||||
}
|
||||
_ => ManagementResponse::err(request.id.clone(), format!("Unknown mode: {}", mode)),
|
||||
};
|
||||
@@ -381,92 +376,46 @@ async fn handle_client_request(
|
||||
async fn handle_server_request(
|
||||
request: &ManagementRequest,
|
||||
vpn_server: &mut VpnServer,
|
||||
wg_server: &mut WgServer,
|
||||
) -> ManagementResponse {
|
||||
let id = request.id.clone();
|
||||
|
||||
match request.method.as_str() {
|
||||
"start" => {
|
||||
// Check if transportMode is "wireguard"
|
||||
let transport_mode = request.params
|
||||
.get("config")
|
||||
.and_then(|c| c.get("transportMode"))
|
||||
.and_then(|t| t.as_str())
|
||||
.unwrap_or("");
|
||||
|
||||
if transport_mode == "wireguard" {
|
||||
let config: WgServerConfig = match serde_json::from_value(
|
||||
request.params.get("config").cloned().unwrap_or_default(),
|
||||
) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
return ManagementResponse::err(id, format!("Invalid WG config: {}", e));
|
||||
}
|
||||
};
|
||||
match wg_server.start(config).await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("WG start failed: {}", e)),
|
||||
}
|
||||
} else {
|
||||
let config: ServerConfig = match serde_json::from_value(
|
||||
request.params.get("config").cloned().unwrap_or_default(),
|
||||
) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
return ManagementResponse::err(id, format!("Invalid config: {}", e));
|
||||
}
|
||||
};
|
||||
match vpn_server.start(config).await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("Start failed: {}", e)),
|
||||
let config: ServerConfig = match serde_json::from_value(
|
||||
request.params.get("config").cloned().unwrap_or_default(),
|
||||
) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
return ManagementResponse::err(id, format!("Invalid config: {}", e));
|
||||
}
|
||||
};
|
||||
match vpn_server.start(config).await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("Start failed: {}", e)),
|
||||
}
|
||||
}
|
||||
"stop" => {
|
||||
if wg_server.is_running() {
|
||||
match wg_server.stop().await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("WG stop failed: {}", e)),
|
||||
}
|
||||
} else {
|
||||
match vpn_server.stop().await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("Stop failed: {}", e)),
|
||||
}
|
||||
match vpn_server.stop().await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("Stop failed: {}", e)),
|
||||
}
|
||||
}
|
||||
"getStatus" => {
|
||||
if wg_server.is_running() {
|
||||
ManagementResponse::ok(id, wg_server.get_status())
|
||||
} else {
|
||||
let status = vpn_server.get_status();
|
||||
ManagementResponse::ok(id, status)
|
||||
}
|
||||
let status = vpn_server.get_status();
|
||||
ManagementResponse::ok(id, status)
|
||||
}
|
||||
"getStatistics" => {
|
||||
if wg_server.is_running() {
|
||||
ManagementResponse::ok(id, wg_server.get_statistics().await)
|
||||
} else {
|
||||
let stats = vpn_server.get_statistics().await;
|
||||
match serde_json::to_value(&stats) {
|
||||
Ok(v) => ManagementResponse::ok(id, v),
|
||||
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
|
||||
}
|
||||
let stats = vpn_server.get_statistics().await;
|
||||
match serde_json::to_value(&stats) {
|
||||
Ok(v) => ManagementResponse::ok(id, v),
|
||||
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
|
||||
}
|
||||
}
|
||||
"listClients" => {
|
||||
if wg_server.is_running() {
|
||||
let peers = wg_server.list_peers().await;
|
||||
match serde_json::to_value(&peers) {
|
||||
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "clients": v })),
|
||||
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
|
||||
}
|
||||
} else {
|
||||
let clients = vpn_server.list_clients().await;
|
||||
match serde_json::to_value(&clients) {
|
||||
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "clients": v })),
|
||||
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
|
||||
}
|
||||
let clients = vpn_server.list_clients().await;
|
||||
match serde_json::to_value(&clients) {
|
||||
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "clients": v })),
|
||||
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
|
||||
}
|
||||
}
|
||||
"disconnectClient" => {
|
||||
@@ -546,9 +495,6 @@ async fn handle_server_request(
|
||||
)
|
||||
}
|
||||
"addWgPeer" => {
|
||||
if !wg_server.is_running() {
|
||||
return ManagementResponse::err(id, "WireGuard server not running".to_string());
|
||||
}
|
||||
let config: WgPeerConfig = match serde_json::from_value(
|
||||
request.params.get("peer").cloned().unwrap_or_default(),
|
||||
) {
|
||||
@@ -557,29 +503,23 @@ async fn handle_server_request(
|
||||
return ManagementResponse::err(id, format!("Invalid peer config: {}", e));
|
||||
}
|
||||
};
|
||||
match wg_server.add_peer(config).await {
|
||||
match vpn_server.add_wg_peer(config).await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("Add peer failed: {}", e)),
|
||||
}
|
||||
}
|
||||
"removeWgPeer" => {
|
||||
if !wg_server.is_running() {
|
||||
return ManagementResponse::err(id, "WireGuard server not running".to_string());
|
||||
}
|
||||
let public_key = match request.params.get("publicKey").and_then(|v| v.as_str()) {
|
||||
Some(k) => k.to_string(),
|
||||
None => return ManagementResponse::err(id, "Missing publicKey".to_string()),
|
||||
};
|
||||
match wg_server.remove_peer(&public_key).await {
|
||||
match vpn_server.remove_wg_peer(&public_key).await {
|
||||
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
|
||||
Err(e) => ManagementResponse::err(id, format!("Remove peer failed: {}", e)),
|
||||
}
|
||||
}
|
||||
"listWgPeers" => {
|
||||
if !wg_server.is_running() {
|
||||
return ManagementResponse::err(id, "WireGuard server not running".to_string());
|
||||
}
|
||||
let peers = wg_server.list_peers().await;
|
||||
let peers = vpn_server.list_wg_peers().await;
|
||||
match serde_json::to_value(&peers) {
|
||||
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "peers": v })),
|
||||
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
|
||||
|
||||
@@ -86,6 +86,16 @@ impl IpPool {
|
||||
client_id
|
||||
}
|
||||
|
||||
/// Reserve a specific IP for a client (e.g., WireGuard static IP from allowed_ips).
|
||||
pub fn reserve(&mut self, ip: Ipv4Addr, client_id: &str) -> Result<()> {
|
||||
if self.allocated.contains_key(&ip) {
|
||||
anyhow::bail!("IP {} is already allocated", ip);
|
||||
}
|
||||
self.allocated.insert(ip, client_id.to_string());
|
||||
info!("Reserved IP {} for client {}", ip, client_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Number of currently allocated IPs.
|
||||
pub fn allocated_count(&self) -> usize {
|
||||
self.allocated.len()
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::ratelimit::TokenBucket;
|
||||
use crate::transport;
|
||||
use crate::transport_trait::{self, TransportSink, TransportStream};
|
||||
use crate::quic_transport;
|
||||
use crate::tunnel::{self, TunConfig};
|
||||
|
||||
/// Dead-peer timeout: 3x max keepalive interval (Healthy=60s).
|
||||
const DEAD_PEER_TIMEOUT: Duration = Duration::from_secs(180);
|
||||
@@ -37,6 +38,9 @@ pub struct ServerConfig {
|
||||
pub mtu: Option<u16>,
|
||||
pub keepalive_interval_secs: Option<u64>,
|
||||
pub enable_nat: Option<bool>,
|
||||
/// Forwarding mode: "tun" (kernel TUN, requires root), "socket" (userspace NAT),
|
||||
/// or "testing" (monitoring only, no forwarding). Default: "testing".
|
||||
pub forwarding_mode: Option<String>,
|
||||
/// Default rate limit for new clients (bytes/sec). None = unlimited.
|
||||
pub default_rate_limit_bytes_per_sec: Option<u64>,
|
||||
/// Default burst size for new clients (bytes). None = unlimited.
|
||||
@@ -54,6 +58,16 @@ pub struct ServerConfig {
|
||||
pub proxy_protocol: Option<bool>,
|
||||
/// Server-level IP block list — applied at TCP accept, before Noise handshake.
|
||||
pub connection_ip_block_list: Option<Vec<String>>,
|
||||
/// When true and forwarding_mode is "socket", the userspace NAT engine prepends
|
||||
/// PROXY protocol v2 headers on outbound TCP connections, conveying the VPN client's
|
||||
/// tunnel IP as the source address.
|
||||
pub socket_forward_proxy_protocol: Option<bool>,
|
||||
/// WireGuard: server X25519 private key (base64). Required when transport includes WG.
|
||||
pub wg_private_key: Option<String>,
|
||||
/// WireGuard: UDP listen port (default: 51820).
|
||||
pub wg_listen_port: Option<u16>,
|
||||
/// WireGuard: pre-configured peers.
|
||||
pub wg_peers: Option<Vec<crate::wireguard::WgPeerConfig>>,
|
||||
}
|
||||
|
||||
/// Information about a connected client.
|
||||
@@ -77,6 +91,8 @@ pub struct ClientInfo {
|
||||
pub registered_client_id: String,
|
||||
/// Real client IP:port (from PROXY protocol header or direct TCP connection).
|
||||
pub remote_addr: Option<String>,
|
||||
/// Transport used for this connection: "websocket", "quic", or "wireguard".
|
||||
pub transport_type: String,
|
||||
}
|
||||
|
||||
/// Server statistics.
|
||||
@@ -94,6 +110,16 @@ pub struct ServerStatistics {
|
||||
pub total_connections: u64,
|
||||
}
|
||||
|
||||
/// The forwarding engine determines how decrypted IP packets are routed.
|
||||
pub enum ForwardingEngine {
|
||||
/// Kernel TUN device — packets written to the TUN, kernel handles routing.
|
||||
Tun(tokio::io::WriteHalf<tun::AsyncDevice>),
|
||||
/// Userspace NAT — packets sent to smoltcp-based NAT engine via channel.
|
||||
Socket(mpsc::Sender<Vec<u8>>),
|
||||
/// Testing/monitoring — packets are counted but not forwarded.
|
||||
Testing,
|
||||
}
|
||||
|
||||
/// Shared server state.
|
||||
pub struct ServerState {
|
||||
pub config: ServerConfig,
|
||||
@@ -104,12 +130,19 @@ pub struct ServerState {
|
||||
pub mtu_config: MtuConfig,
|
||||
pub started_at: std::time::Instant,
|
||||
pub client_registry: RwLock<ClientRegistry>,
|
||||
/// The forwarding engine for decrypted IP packets.
|
||||
pub forwarding_engine: Mutex<ForwardingEngine>,
|
||||
/// Routing table: assigned VPN IP → channel sender for return packets.
|
||||
pub tun_routes: RwLock<HashMap<Ipv4Addr, mpsc::Sender<Vec<u8>>>>,
|
||||
/// Shutdown signal for the forwarding background task (TUN reader or NAT engine).
|
||||
pub tun_shutdown: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
/// The VPN server.
|
||||
pub struct VpnServer {
|
||||
state: Option<Arc<ServerState>>,
|
||||
shutdown_tx: Option<mpsc::Sender<()>>,
|
||||
wg_command_tx: Option<mpsc::Sender<crate::wireguard::WgCommand>>,
|
||||
}
|
||||
|
||||
impl VpnServer {
|
||||
@@ -117,6 +150,7 @@ impl VpnServer {
|
||||
Self {
|
||||
state: None,
|
||||
shutdown_tx: None,
|
||||
wg_command_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,6 +173,51 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
let link_mtu = config.mtu.unwrap_or(1420);
|
||||
let mode = config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let gateway_ip = ip_pool.gateway_addr();
|
||||
|
||||
// Create forwarding engine based on mode
|
||||
enum ForwardingSetup {
|
||||
Tun {
|
||||
writer: tokio::io::WriteHalf<tun::AsyncDevice>,
|
||||
reader: tokio::io::ReadHalf<tun::AsyncDevice>,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
Socket {
|
||||
packet_tx: mpsc::Sender<Vec<u8>>,
|
||||
packet_rx: mpsc::Receiver<Vec<u8>>,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
Testing,
|
||||
}
|
||||
|
||||
let (setup, fwd_shutdown_tx) = match mode {
|
||||
"tun" => {
|
||||
let tun_config = TunConfig {
|
||||
name: "svpn0".to_string(),
|
||||
address: gateway_ip,
|
||||
netmask: Ipv4Addr::new(255, 255, 255, 0),
|
||||
mtu: link_mtu,
|
||||
};
|
||||
let tun_device = tunnel::create_tun(&tun_config)?;
|
||||
tunnel::add_route(&config.subnet, &tun_config.name).await?;
|
||||
let (reader, writer) = tokio::io::split(tun_device);
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Tun { writer, reader, shutdown_rx: rx }, tx)
|
||||
}
|
||||
"socket" => {
|
||||
info!("Starting userspace NAT forwarding (no root required)");
|
||||
let (packet_tx, packet_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx: rx }, tx)
|
||||
}
|
||||
_ => {
|
||||
info!("Forwarding disabled (testing/monitoring mode)");
|
||||
let (tx, _rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Testing, tx)
|
||||
}
|
||||
};
|
||||
|
||||
// Compute effective MTU from overhead
|
||||
let overhead = TunnelOverhead::default_overhead();
|
||||
let mtu_config = MtuConfig::new(overhead.effective_tun_mtu(1500).max(link_mtu));
|
||||
@@ -158,61 +237,113 @@ impl VpnServer {
|
||||
mtu_config,
|
||||
started_at: std::time::Instant::now(),
|
||||
client_registry: RwLock::new(registry),
|
||||
forwarding_engine: Mutex::new(ForwardingEngine::Testing),
|
||||
tun_routes: RwLock::new(HashMap::new()),
|
||||
tun_shutdown: fwd_shutdown_tx,
|
||||
});
|
||||
|
||||
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
|
||||
self.state = Some(state.clone());
|
||||
self.shutdown_tx = Some(shutdown_tx);
|
||||
// Spawn the forwarding background task and set the engine
|
||||
match setup {
|
||||
ForwardingSetup::Tun { writer, reader, shutdown_rx } => {
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Tun(writer);
|
||||
let tun_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_tun_reader(tun_state, reader, shutdown_rx).await {
|
||||
error!("TUN reader error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx } => {
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Socket(packet_tx);
|
||||
let proxy_protocol = config.socket_forward_proxy_protocol.unwrap_or(false);
|
||||
let nat_engine = crate::userspace_nat::NatEngine::new(
|
||||
gateway_ip,
|
||||
link_mtu as usize,
|
||||
state.clone(),
|
||||
proxy_protocol,
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = nat_engine.run(packet_rx, shutdown_rx).await {
|
||||
error!("NAT engine error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
ForwardingSetup::Testing => {}
|
||||
}
|
||||
|
||||
let transport_mode = config.transport_mode.as_deref().unwrap_or("both");
|
||||
self.state = Some(state.clone());
|
||||
|
||||
let transport_mode = config.transport_mode.as_deref().unwrap_or("all");
|
||||
let listen_addr = config.listen_addr.clone();
|
||||
|
||||
match transport_mode {
|
||||
"quic" => {
|
||||
let quic_addr = config.quic_listen_addr.clone().unwrap_or_else(|| listen_addr.clone());
|
||||
let idle_timeout = config.quic_idle_timeout_secs.unwrap_or(30);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_quic_listener(state, quic_addr, idle_timeout, &mut shutdown_rx).await {
|
||||
error!("QUIC listener error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
"both" => {
|
||||
let quic_addr = config.quic_listen_addr.clone().unwrap_or_else(|| listen_addr.clone());
|
||||
let idle_timeout = config.quic_idle_timeout_secs.unwrap_or(30);
|
||||
let state2 = state.clone();
|
||||
let (shutdown_tx2, mut shutdown_rx2) = mpsc::channel::<()>(1);
|
||||
// Store second shutdown sender so both listeners stop
|
||||
let shutdown_tx_orig = self.shutdown_tx.take().unwrap();
|
||||
let (combined_tx, mut combined_rx) = mpsc::channel::<()>(1);
|
||||
self.shutdown_tx = Some(combined_tx);
|
||||
// Determine if WG should be included
|
||||
let include_wg = config.wg_private_key.is_some()
|
||||
&& matches!(transport_mode, "all" | "wireguard");
|
||||
|
||||
// Forward combined shutdown to both listeners
|
||||
tokio::spawn(async move {
|
||||
combined_rx.recv().await;
|
||||
let _ = shutdown_tx_orig.send(()).await;
|
||||
let _ = shutdown_tx2.send(()).await;
|
||||
});
|
||||
// Collect shutdown senders for all listeners
|
||||
let mut listener_shutdown_txs: Vec<mpsc::Sender<()>> = Vec::new();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_ws_listener(state, listen_addr, &mut shutdown_rx).await {
|
||||
error!("WebSocket listener error: {}", e);
|
||||
}
|
||||
});
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_quic_listener(state2, quic_addr, idle_timeout, &mut shutdown_rx2).await {
|
||||
error!("QUIC listener error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
// "websocket" (default)
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_ws_listener(state, listen_addr, &mut shutdown_rx).await {
|
||||
error!("Server listener error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
// Spawn transport listeners based on mode
|
||||
let spawn_ws = matches!(transport_mode, "all" | "both" | "websocket");
|
||||
let spawn_quic = matches!(transport_mode, "all" | "both" | "quic");
|
||||
|
||||
if spawn_ws {
|
||||
let (tx, mut rx) = mpsc::channel::<()>(1);
|
||||
listener_shutdown_txs.push(tx);
|
||||
let ws_state = state.clone();
|
||||
let ws_addr = listen_addr.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_ws_listener(ws_state, ws_addr, &mut rx).await {
|
||||
error!("WebSocket listener error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if spawn_quic {
|
||||
let quic_addr = config.quic_listen_addr.clone().unwrap_or_else(|| listen_addr.clone());
|
||||
let idle_timeout = config.quic_idle_timeout_secs.unwrap_or(30);
|
||||
let (tx, mut rx) = mpsc::channel::<()>(1);
|
||||
listener_shutdown_txs.push(tx);
|
||||
let quic_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_quic_listener(quic_state, quic_addr, idle_timeout, &mut rx).await {
|
||||
error!("QUIC listener error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if include_wg {
|
||||
let wg_config = crate::wireguard::WgListenerConfig {
|
||||
private_key: config.wg_private_key.clone().unwrap(),
|
||||
listen_port: config.wg_listen_port.unwrap_or(51820),
|
||||
peers: config.wg_peers.clone().unwrap_or_default(),
|
||||
};
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
listener_shutdown_txs.push(tx);
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel::<crate::wireguard::WgCommand>(32);
|
||||
self.wg_command_tx = Some(cmd_tx);
|
||||
let wg_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = crate::wireguard::run_wg_listener(wg_state, wg_config, rx, cmd_rx).await {
|
||||
error!("WireGuard listener error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Replace self.shutdown_tx with a combined sender that fans out to all listeners
|
||||
if listener_shutdown_txs.len() > 1 {
|
||||
let (combined_tx, mut combined_rx) = mpsc::channel::<()>(1);
|
||||
// Take the original shutdown_tx (from line above)
|
||||
let _ = self.shutdown_tx.take();
|
||||
self.shutdown_tx = Some(combined_tx);
|
||||
tokio::spawn(async move {
|
||||
combined_rx.recv().await;
|
||||
for tx in listener_shutdown_txs {
|
||||
let _ = tx.send(()).await;
|
||||
}
|
||||
});
|
||||
} else if let Some(single_tx) = listener_shutdown_txs.into_iter().next() {
|
||||
self.shutdown_tx = Some(single_tx);
|
||||
}
|
||||
|
||||
info!("VPN server started (transport: {})", transport_mode);
|
||||
@@ -220,9 +351,38 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) -> Result<()> {
|
||||
if let Some(ref state) = self.state {
|
||||
let mode = state.config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
|
||||
match mode {
|
||||
"tun" => {
|
||||
let _ = state.tun_shutdown.send(()).await;
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Testing;
|
||||
if let Err(e) = tunnel::remove_route(&state.config.subnet, "svpn0").await {
|
||||
warn!("Failed to remove TUN route: {}", e);
|
||||
}
|
||||
}
|
||||
"socket" => {
|
||||
let _ = state.tun_shutdown.send(()).await;
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Testing;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Clean up NAT rules
|
||||
if state.config.enable_nat.unwrap_or(false) {
|
||||
if let Ok(iface) = crate::network::get_default_interface() {
|
||||
if let Err(e) = crate::network::remove_nat(&state.config.subnet, &iface).await {
|
||||
warn!("Failed to remove NAT rules: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(tx) = self.shutdown_tx.take() {
|
||||
let _ = tx.send(()).await;
|
||||
}
|
||||
self.wg_command_tx = None;
|
||||
self.state = None;
|
||||
info!("VPN server stopped");
|
||||
Ok(())
|
||||
@@ -311,6 +471,54 @@ impl VpnServer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── WireGuard Peer Management ────────────────────────────────────────
|
||||
|
||||
/// Add a WireGuard peer dynamically (delegates to the WG event loop).
|
||||
pub async fn add_wg_peer(&self, config: crate::wireguard::WgPeerConfig) -> Result<()> {
|
||||
let tx = self.wg_command_tx.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("WireGuard listener not running"))?;
|
||||
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
|
||||
tx.send(crate::wireguard::WgCommand::AddPeer(config, resp_tx))
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("WG event loop closed"))?;
|
||||
resp_rx.await.map_err(|_| anyhow::anyhow!("No response from WG loop"))?
|
||||
}
|
||||
|
||||
/// Remove a WireGuard peer dynamically (delegates to the WG event loop).
|
||||
pub async fn remove_wg_peer(&self, public_key: &str) -> Result<()> {
|
||||
let tx = self.wg_command_tx.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("WireGuard listener not running"))?;
|
||||
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
|
||||
tx.send(crate::wireguard::WgCommand::RemovePeer(public_key.to_string(), resp_tx))
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("WG event loop closed"))?;
|
||||
resp_rx.await.map_err(|_| anyhow::anyhow!("No response from WG loop"))?
|
||||
}
|
||||
|
||||
/// List WireGuard peers from the unified client list.
|
||||
pub async fn list_wg_peers(&self) -> Vec<crate::wireguard::WgPeerInfo> {
|
||||
if let Some(ref state) = self.state {
|
||||
state.clients.read().await.values()
|
||||
.filter(|c| c.transport_type == "wireguard")
|
||||
.map(|c| crate::wireguard::WgPeerInfo {
|
||||
public_key: c.authenticated_key.clone(),
|
||||
allowed_ips: vec![format!("{}/32", c.assigned_ip)],
|
||||
endpoint: c.remote_addr.clone(),
|
||||
persistent_keepalive: None,
|
||||
stats: crate::wireguard::WgPeerStats {
|
||||
bytes_sent: c.bytes_sent,
|
||||
bytes_received: c.bytes_received,
|
||||
packets_sent: 0,
|
||||
packets_received: 0,
|
||||
last_handshake_time: None,
|
||||
},
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
// ── Client Registry (Hub) Methods ───────────────────────────────────
|
||||
|
||||
/// Create a new client entry. Generates keypairs and assigns an IP.
|
||||
@@ -343,9 +551,16 @@ impl VpnServer {
|
||||
).ok(),
|
||||
priority: partial.get("priority").and_then(|v| v.as_u64()).map(|v| v as u32),
|
||||
enabled: partial.get("enabled").and_then(|v| v.as_bool()).or(Some(true)),
|
||||
tags: partial.get("tags").and_then(|v| {
|
||||
server_defined_client_tags: partial.get("serverDefinedClientTags").and_then(|v| {
|
||||
v.as_array().map(|a| a.iter().filter_map(|s| s.as_str().map(String::from)).collect())
|
||||
}).or_else(|| {
|
||||
// Legacy: accept "tags" as serverDefinedClientTags
|
||||
partial.get("tags").and_then(|v| {
|
||||
v.as_array().map(|a| a.iter().filter_map(|s| s.as_str().map(String::from)).collect())
|
||||
})
|
||||
}),
|
||||
client_defined_client_tags: None, // Only set by connecting client
|
||||
tags: None, // Legacy field — not used for new entries
|
||||
description: partial.get("description").and_then(|v| v.as_str()).map(String::from),
|
||||
expires_at: partial.get("expiresAt").and_then(|v| v.as_str()).map(String::from),
|
||||
assigned_ip: Some(assigned_ip.to_string()),
|
||||
@@ -440,8 +655,11 @@ impl VpnServer {
|
||||
if let Some(enabled) = update.get("enabled").and_then(|v| v.as_bool()) {
|
||||
entry.enabled = Some(enabled);
|
||||
}
|
||||
if let Some(tags) = update.get("tags").and_then(|v| v.as_array()) {
|
||||
entry.tags = Some(tags.iter().filter_map(|s| s.as_str().map(String::from)).collect());
|
||||
if let Some(tags) = update.get("serverDefinedClientTags").and_then(|v| v.as_array()) {
|
||||
entry.server_defined_client_tags = Some(tags.iter().filter_map(|s| s.as_str().map(String::from)).collect());
|
||||
} else if let Some(tags) = update.get("tags").and_then(|v| v.as_array()) {
|
||||
// Legacy: accept "tags" as serverDefinedClientTags
|
||||
entry.server_defined_client_tags = Some(tags.iter().filter_map(|s| s.as_str().map(String::from)).collect());
|
||||
}
|
||||
if let Some(desc) = update.get("description").and_then(|v| v.as_str()) {
|
||||
entry.description = Some(desc.to_string());
|
||||
@@ -628,6 +846,7 @@ async fn run_ws_listener(
|
||||
Box::new(sink),
|
||||
Box::new(stream),
|
||||
remote_addr,
|
||||
"websocket",
|
||||
).await {
|
||||
warn!("Client connection error: {}", e);
|
||||
}
|
||||
@@ -704,6 +923,7 @@ async fn run_quic_listener(
|
||||
Box::new(sink),
|
||||
Box::new(stream),
|
||||
Some(remote),
|
||||
"quic",
|
||||
).await {
|
||||
warn!("QUIC client error: {}", e);
|
||||
}
|
||||
@@ -736,6 +956,56 @@ async fn run_quic_listener(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// TUN reader task: reads IP packets from the TUN device and dispatches them
|
||||
/// to the correct client via the routing table.
|
||||
async fn run_tun_reader(
|
||||
state: Arc<ServerState>,
|
||||
mut tun_reader: tokio::io::ReadHalf<tun::AsyncDevice>,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = tun_reader.read(&mut buf) => {
|
||||
let n = match result {
|
||||
Ok(0) => {
|
||||
info!("TUN reader: device closed");
|
||||
break;
|
||||
}
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
error!("TUN reader error: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// Extract destination IP from the raw IP packet
|
||||
let dst_ip = match tunnel::extract_dst_ip(&buf[..n]) {
|
||||
Some(std::net::IpAddr::V4(v4)) => v4,
|
||||
_ => continue, // IPv6 or malformed — skip
|
||||
};
|
||||
|
||||
// Look up client by destination IP
|
||||
let routes = state.tun_routes.read().await;
|
||||
if let Some(sender) = routes.get(&dst_ip) {
|
||||
if sender.try_send(buf[..n].to_vec()).is_err() {
|
||||
// Channel full or closed — drop packet (correct for IP best-effort)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("TUN reader shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Transport-agnostic client handler. Performs the Noise IK handshake, authenticates
|
||||
/// the client against the registry, and runs the main packet forwarding loop.
|
||||
async fn handle_client_connection(
|
||||
@@ -743,6 +1013,7 @@ async fn handle_client_connection(
|
||||
mut sink: Box<dyn TransportSink>,
|
||||
mut stream: Box<dyn TransportStream>,
|
||||
remote_addr: Option<std::net::SocketAddr>,
|
||||
transport_type: &str,
|
||||
) -> Result<()> {
|
||||
let server_private_key = base64::Engine::decode(
|
||||
&base64::engine::general_purpose::STANDARD,
|
||||
@@ -846,6 +1117,14 @@ async fn handle_client_connection(
|
||||
// Allocate IP
|
||||
let assigned_ip = state.ip_pool.lock().await.allocate(&client_id)?;
|
||||
|
||||
// Create return-packet channel for forwarding engine -> client
|
||||
let (tun_return_tx, mut tun_return_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
let fwd_mode = state.config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let forwarding_active = fwd_mode == "tun" || fwd_mode == "socket";
|
||||
if forwarding_active {
|
||||
state.tun_routes.write().await.insert(assigned_ip, tun_return_tx);
|
||||
}
|
||||
|
||||
// Determine rate limits: per-client security overrides server defaults
|
||||
let (rate_limit, burst) = if let Some(ref sec) = client_security {
|
||||
if let Some(ref rl) = sec.rate_limit {
|
||||
@@ -873,6 +1152,7 @@ async fn handle_client_connection(
|
||||
authenticated_key: client_pub_key_b64.clone(),
|
||||
registered_client_id: registered_client_id.clone(),
|
||||
remote_addr: remote_addr.map(|a| a.to_string()),
|
||||
transport_type: transport_type.to_string(),
|
||||
};
|
||||
state.clients.write().await.insert(client_id.clone(), client_info);
|
||||
|
||||
@@ -973,6 +1253,24 @@ async fn handle_client_connection(
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
info.bytes_received += len as u64;
|
||||
}
|
||||
drop(clients);
|
||||
|
||||
// Forward decrypted packet via the active engine
|
||||
{
|
||||
let mut engine = state.forwarding_engine.lock().await;
|
||||
match &mut *engine {
|
||||
ForwardingEngine::Tun(writer) => {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
if let Err(e) = writer.write_all(&buf[..len]).await {
|
||||
warn!("TUN write error for client {}: {}", client_id, e);
|
||||
}
|
||||
}
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(buf[..len].to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Decrypt error from {}: {}", client_id, e);
|
||||
@@ -1029,6 +1327,37 @@ async fn handle_client_connection(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return packets from TUN device destined for this client
|
||||
Some(packet) = tun_return_rx.recv() => {
|
||||
let pkt_len = packet.len();
|
||||
match noise_transport.write_message(&packet, &mut buf) {
|
||||
Ok(len) => {
|
||||
let frame = Frame {
|
||||
packet_type: PacketType::IpPacket,
|
||||
payload: buf[..len].to_vec(),
|
||||
};
|
||||
let mut frame_bytes = BytesMut::new();
|
||||
<FrameCodec as tokio_util::codec::Encoder<Frame>>::encode(
|
||||
&mut FrameCodec, frame, &mut frame_bytes
|
||||
)?;
|
||||
sink.send_reliable(frame_bytes.to_vec()).await?;
|
||||
|
||||
// Update stats
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.bytes_sent += pkt_len as u64;
|
||||
stats.packets_sent += 1;
|
||||
drop(stats);
|
||||
let mut clients = state.clients.write().await;
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
info.bytes_sent += pkt_len as u64;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Noise encrypt error for return packet to {}: {}", client_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep_until(last_activity + DEAD_PEER_TIMEOUT) => {
|
||||
warn!("Client {} dead-peer timeout ({}s inactivity)", client_id, DEAD_PEER_TIMEOUT.as_secs());
|
||||
break;
|
||||
@@ -1037,6 +1366,9 @@ async fn handle_client_connection(
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
if forwarding_active {
|
||||
state.tun_routes.write().await.remove(&assigned_ip);
|
||||
}
|
||||
state.clients.write().await.remove(&client_id);
|
||||
state.ip_pool.lock().await.release(&assigned_ip);
|
||||
state.rate_limiters.lock().await.remove(&client_id);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
use tracing::info;
|
||||
|
||||
/// Configuration for creating a TUN device.
|
||||
@@ -80,6 +80,26 @@ pub fn check_tun_mtu(packet: &[u8], mtu_config: &crate::mtu::MtuConfig) -> TunMt
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract destination IP from a raw IP packet header.
|
||||
pub fn extract_dst_ip(packet: &[u8]) -> Option<IpAddr> {
|
||||
if packet.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let version = packet[0] >> 4;
|
||||
match version {
|
||||
4 if packet.len() >= 20 => {
|
||||
let dst = Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]);
|
||||
Some(IpAddr::V4(dst))
|
||||
}
|
||||
6 if packet.len() >= 40 => {
|
||||
let mut octets = [0u8; 16];
|
||||
octets.copy_from_slice(&packet[24..40]);
|
||||
Some(IpAddr::V6(Ipv6Addr::from(octets)))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove a route.
|
||||
pub async fn remove_route(subnet: &str, device_name: &str) -> Result<()> {
|
||||
let output = tokio::process::Command::new("ip")
|
||||
|
||||
658
rust/src/userspace_nat.rs
Normal file
658
rust/src/userspace_nat.rs
Normal file
@@ -0,0 +1,658 @@
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use smoltcp::iface::{Config, Interface, SocketHandle, SocketSet};
|
||||
use smoltcp::phy::{self, Device, DeviceCapabilities, Medium};
|
||||
use smoltcp::socket::{tcp, udp};
|
||||
use smoltcp::wire::{HardwareAddress, IpAddress, IpCidr, IpEndpoint};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpStream, UdpSocket};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::server::ServerState;
|
||||
use crate::tunnel;
|
||||
|
||||
// ============================================================================
|
||||
// Virtual IP device for smoltcp
|
||||
// ============================================================================
|
||||
|
||||
pub struct VirtualIpDevice {
|
||||
rx_queue: VecDeque<Vec<u8>>,
|
||||
tx_queue: VecDeque<Vec<u8>>,
|
||||
mtu: usize,
|
||||
}
|
||||
|
||||
impl VirtualIpDevice {
|
||||
pub fn new(mtu: usize) -> Self {
|
||||
Self {
|
||||
rx_queue: VecDeque::new(),
|
||||
tx_queue: VecDeque::new(),
|
||||
mtu,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inject_packet(&mut self, packet: Vec<u8>) {
|
||||
self.rx_queue.push_back(packet);
|
||||
}
|
||||
|
||||
pub fn drain_tx(&mut self) -> impl Iterator<Item = Vec<u8>> + '_ {
|
||||
self.tx_queue.drain(..)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VirtualRxToken {
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl phy::RxToken for VirtualRxToken {
|
||||
fn consume<R, F>(self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&[u8]) -> R,
|
||||
{
|
||||
f(&self.buffer)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VirtualTxToken<'a> {
|
||||
queue: &'a mut VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<'a> phy::TxToken for VirtualTxToken<'a> {
|
||||
fn consume<R, F>(self, len: usize, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> R,
|
||||
{
|
||||
let mut buffer = vec![0u8; len];
|
||||
let result = f(&mut buffer);
|
||||
self.queue.push_back(buffer);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl Device for VirtualIpDevice {
|
||||
type RxToken<'a> = VirtualRxToken;
|
||||
type TxToken<'a> = VirtualTxToken<'a>;
|
||||
|
||||
fn receive(
|
||||
&mut self,
|
||||
_timestamp: smoltcp::time::Instant,
|
||||
) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
|
||||
self.rx_queue.pop_front().map(|buffer| {
|
||||
let rx = VirtualRxToken { buffer };
|
||||
let tx = VirtualTxToken {
|
||||
queue: &mut self.tx_queue,
|
||||
};
|
||||
(rx, tx)
|
||||
})
|
||||
}
|
||||
|
||||
fn transmit(&mut self, _timestamp: smoltcp::time::Instant) -> Option<Self::TxToken<'_>> {
|
||||
Some(VirtualTxToken {
|
||||
queue: &mut self.tx_queue,
|
||||
})
|
||||
}
|
||||
|
||||
fn capabilities(&self) -> DeviceCapabilities {
|
||||
let mut caps = DeviceCapabilities::default();
|
||||
caps.medium = Medium::Ip;
|
||||
caps.max_transmission_unit = self.mtu;
|
||||
caps.max_burst_size = Some(1);
|
||||
caps
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Session tracking
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
|
||||
struct SessionKey {
|
||||
src_ip: Ipv4Addr,
|
||||
src_port: u16,
|
||||
dst_ip: Ipv4Addr,
|
||||
dst_port: u16,
|
||||
protocol: u8,
|
||||
}
|
||||
|
||||
struct TcpSession {
|
||||
smoltcp_handle: SocketHandle,
|
||||
bridge_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
#[allow(dead_code)]
|
||||
client_ip: Ipv4Addr,
|
||||
}
|
||||
|
||||
struct UdpSession {
|
||||
smoltcp_handle: SocketHandle,
|
||||
bridge_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
#[allow(dead_code)]
|
||||
client_ip: Ipv4Addr,
|
||||
last_activity: tokio::time::Instant,
|
||||
}
|
||||
|
||||
enum BridgeMessage {
|
||||
TcpData { key: SessionKey, data: Vec<u8> },
|
||||
TcpClosed { key: SessionKey },
|
||||
UdpData { key: SessionKey, data: Vec<u8> },
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// IP packet parsing helpers
|
||||
// ============================================================================
|
||||
|
||||
fn parse_ipv4_header(packet: &[u8]) -> Option<(u8, Ipv4Addr, Ipv4Addr, u8)> {
|
||||
if packet.len() < 20 {
|
||||
return None;
|
||||
}
|
||||
let version = packet[0] >> 4;
|
||||
if version != 4 {
|
||||
return None;
|
||||
}
|
||||
let ihl = (packet[0] & 0x0F) as usize * 4;
|
||||
let protocol = packet[9];
|
||||
let src = Ipv4Addr::new(packet[12], packet[13], packet[14], packet[15]);
|
||||
let dst = Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]);
|
||||
Some((ihl as u8, src, dst, protocol))
|
||||
}
|
||||
|
||||
fn parse_tcp_ports(packet: &[u8], ihl: usize) -> Option<(u16, u16, u8)> {
|
||||
if packet.len() < ihl + 14 {
|
||||
return None;
|
||||
}
|
||||
let src_port = u16::from_be_bytes([packet[ihl], packet[ihl + 1]]);
|
||||
let dst_port = u16::from_be_bytes([packet[ihl + 2], packet[ihl + 3]]);
|
||||
let flags = packet[ihl + 13];
|
||||
Some((src_port, dst_port, flags))
|
||||
}
|
||||
|
||||
fn parse_udp_ports(packet: &[u8], ihl: usize) -> Option<(u16, u16)> {
|
||||
if packet.len() < ihl + 4 {
|
||||
return None;
|
||||
}
|
||||
let src_port = u16::from_be_bytes([packet[ihl], packet[ihl + 1]]);
|
||||
let dst_port = u16::from_be_bytes([packet[ihl + 2], packet[ihl + 3]]);
|
||||
Some((src_port, dst_port))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// NAT Engine
|
||||
// ============================================================================
|
||||
|
||||
pub struct NatEngine {
|
||||
device: VirtualIpDevice,
|
||||
iface: Interface,
|
||||
sockets: SocketSet<'static>,
|
||||
tcp_sessions: HashMap<SessionKey, TcpSession>,
|
||||
udp_sessions: HashMap<SessionKey, UdpSession>,
|
||||
state: Arc<ServerState>,
|
||||
bridge_rx: mpsc::Receiver<BridgeMessage>,
|
||||
bridge_tx: mpsc::Sender<BridgeMessage>,
|
||||
start_time: std::time::Instant,
|
||||
/// When true, outbound TCP connections prepend PROXY protocol v2 headers
|
||||
/// with the VPN client's tunnel IP as source address.
|
||||
proxy_protocol: bool,
|
||||
}
|
||||
|
||||
impl NatEngine {
|
||||
pub fn new(gateway_ip: Ipv4Addr, mtu: usize, state: Arc<ServerState>, proxy_protocol: bool) -> Self {
|
||||
let mut device = VirtualIpDevice::new(mtu);
|
||||
let config = Config::new(HardwareAddress::Ip);
|
||||
let now = smoltcp::time::Instant::from_millis(0);
|
||||
let mut iface = Interface::new(config, &mut device, now);
|
||||
|
||||
// Accept packets to ANY destination IP (essential for NAT)
|
||||
iface.set_any_ip(true);
|
||||
|
||||
// Assign the gateway IP as the interface address
|
||||
iface.update_ip_addrs(|addrs| {
|
||||
addrs
|
||||
.push(IpCidr::new(IpAddress::Ipv4(gateway_ip.into()), 24))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
// Add a default route so smoltcp knows where to send packets
|
||||
iface.routes_mut().add_default_ipv4_route(gateway_ip.into()).unwrap();
|
||||
|
||||
let sockets = SocketSet::new(Vec::with_capacity(256));
|
||||
let (bridge_tx, bridge_rx) = mpsc::channel(4096);
|
||||
|
||||
Self {
|
||||
device,
|
||||
iface,
|
||||
sockets,
|
||||
tcp_sessions: HashMap::new(),
|
||||
udp_sessions: HashMap::new(),
|
||||
state,
|
||||
bridge_rx,
|
||||
bridge_tx,
|
||||
start_time: std::time::Instant::now(),
|
||||
proxy_protocol,
|
||||
}
|
||||
}
|
||||
|
||||
fn smoltcp_now(&self) -> smoltcp::time::Instant {
|
||||
smoltcp::time::Instant::from_millis(self.start_time.elapsed().as_millis() as i64)
|
||||
}
|
||||
|
||||
/// Inject a raw IP packet from a VPN client and handle new session creation.
|
||||
fn inject_packet(&mut self, packet: Vec<u8>) {
|
||||
let Some((ihl, src_ip, dst_ip, protocol)) = parse_ipv4_header(&packet) else {
|
||||
return;
|
||||
};
|
||||
let ihl = ihl as usize;
|
||||
|
||||
match protocol {
|
||||
6 => {
|
||||
// TCP
|
||||
let Some((src_port, dst_port, flags)) = parse_tcp_ports(&packet, ihl) else {
|
||||
return;
|
||||
};
|
||||
let key = SessionKey {
|
||||
src_ip,
|
||||
src_port,
|
||||
dst_ip,
|
||||
dst_port,
|
||||
protocol: 6,
|
||||
};
|
||||
|
||||
// SYN without ACK = new connection
|
||||
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
|
||||
if is_syn && !self.tcp_sessions.contains_key(&key) {
|
||||
self.create_tcp_session(&key);
|
||||
}
|
||||
}
|
||||
17 => {
|
||||
// UDP
|
||||
let Some((src_port, dst_port)) = parse_udp_ports(&packet, ihl) else {
|
||||
return;
|
||||
};
|
||||
let key = SessionKey {
|
||||
src_ip,
|
||||
src_port,
|
||||
dst_ip,
|
||||
dst_port,
|
||||
protocol: 17,
|
||||
};
|
||||
|
||||
if !self.udp_sessions.contains_key(&key) {
|
||||
self.create_udp_session(&key);
|
||||
}
|
||||
|
||||
// Update last_activity for existing sessions
|
||||
if let Some(session) = self.udp_sessions.get_mut(&key) {
|
||||
session.last_activity = tokio::time::Instant::now();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// ICMP and other protocols — not forwarded in socket mode
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.device.inject_packet(packet);
|
||||
}
|
||||
|
||||
fn create_tcp_session(&mut self, key: &SessionKey) {
|
||||
// Create smoltcp TCP socket
|
||||
let tcp_rx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]);
|
||||
let tcp_tx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]);
|
||||
let mut socket = tcp::Socket::new(tcp_rx_buf, tcp_tx_buf);
|
||||
|
||||
// Listen on the destination address so smoltcp accepts the SYN
|
||||
let endpoint = IpEndpoint::new(
|
||||
IpAddress::Ipv4(key.dst_ip.into()),
|
||||
key.dst_port,
|
||||
);
|
||||
if socket.listen(endpoint).is_err() {
|
||||
warn!("NAT: failed to listen on {:?}", endpoint);
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = self.sockets.add(socket);
|
||||
|
||||
// Channel for sending data from NAT engine to bridge task
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
|
||||
let session = TcpSession {
|
||||
smoltcp_handle: handle,
|
||||
bridge_data_tx: data_tx,
|
||||
client_ip: key.src_ip,
|
||||
};
|
||||
self.tcp_sessions.insert(key.clone(), session);
|
||||
|
||||
// Spawn bridge task that connects to the real destination
|
||||
let bridge_tx = self.bridge_tx.clone();
|
||||
let key_clone = key.clone();
|
||||
let proxy_protocol = self.proxy_protocol;
|
||||
tokio::spawn(async move {
|
||||
tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol).await;
|
||||
});
|
||||
|
||||
debug!(
|
||||
"NAT: new TCP session {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port
|
||||
);
|
||||
}
|
||||
|
||||
fn create_udp_session(&mut self, key: &SessionKey) {
|
||||
// Create smoltcp UDP socket
|
||||
let udp_rx_buf = udp::PacketBuffer::new(
|
||||
vec![udp::PacketMetadata::EMPTY; 32],
|
||||
vec![0u8; 65535],
|
||||
);
|
||||
let udp_tx_buf = udp::PacketBuffer::new(
|
||||
vec![udp::PacketMetadata::EMPTY; 32],
|
||||
vec![0u8; 65535],
|
||||
);
|
||||
let mut socket = udp::Socket::new(udp_rx_buf, udp_tx_buf);
|
||||
|
||||
let endpoint = IpEndpoint::new(
|
||||
IpAddress::Ipv4(key.dst_ip.into()),
|
||||
key.dst_port,
|
||||
);
|
||||
if socket.bind(endpoint).is_err() {
|
||||
warn!("NAT: failed to bind UDP on {:?}", endpoint);
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = self.sockets.add(socket);
|
||||
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
|
||||
let session = UdpSession {
|
||||
smoltcp_handle: handle,
|
||||
bridge_data_tx: data_tx,
|
||||
client_ip: key.src_ip,
|
||||
last_activity: tokio::time::Instant::now(),
|
||||
};
|
||||
self.udp_sessions.insert(key.clone(), session);
|
||||
|
||||
let bridge_tx = self.bridge_tx.clone();
|
||||
let key_clone = key.clone();
|
||||
tokio::spawn(async move {
|
||||
udp_bridge_task(key_clone, data_rx, bridge_tx).await;
|
||||
});
|
||||
|
||||
debug!(
|
||||
"NAT: new UDP session {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port
|
||||
);
|
||||
}
|
||||
|
||||
/// Poll smoltcp, bridge data between smoltcp sockets and bridge tasks,
|
||||
/// and dispatch outgoing packets to VPN clients.
|
||||
async fn process(&mut self) {
|
||||
let now = self.smoltcp_now();
|
||||
self.iface
|
||||
.poll(now, &mut self.device, &mut self.sockets);
|
||||
|
||||
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks
|
||||
let mut closed_tcp: Vec<SessionKey> = Vec::new();
|
||||
for (key, session) in &self.tcp_sessions {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_recv() {
|
||||
let _ = socket.recv(|data| {
|
||||
let _ = session.bridge_data_tx.try_send(data.to_vec());
|
||||
(data.len(), ())
|
||||
});
|
||||
}
|
||||
// Detect closed connections
|
||||
if !socket.is_open() && !socket.is_listening() {
|
||||
closed_tcp.push(key.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up closed TCP sessions
|
||||
for key in closed_tcp {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
self.sockets.remove(session.smoltcp_handle);
|
||||
debug!("NAT: TCP session closed {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port);
|
||||
}
|
||||
}
|
||||
|
||||
// Bridge: read data from smoltcp UDP sockets → send to bridge tasks
|
||||
for (_key, session) in &self.udp_sessions {
|
||||
let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
|
||||
while let Ok((data, _meta)) = socket.recv() {
|
||||
let _ = session.bridge_data_tx.try_send(data.to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch outgoing packets from smoltcp to VPN clients
|
||||
let routes = self.state.tun_routes.read().await;
|
||||
for packet in self.device.drain_tx() {
|
||||
if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) {
|
||||
if let Some(sender) = routes.get(&dst_ip) {
|
||||
let _ = sender.try_send(packet);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_bridge_message(&mut self, msg: BridgeMessage) {
|
||||
match msg {
|
||||
BridgeMessage::TcpData { key, data } => {
|
||||
if let Some(session) = self.tcp_sessions.get(&key) {
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_send() {
|
||||
let _ = socket.send_slice(&data);
|
||||
}
|
||||
}
|
||||
}
|
||||
BridgeMessage::TcpClosed { key } => {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.close();
|
||||
// Don't remove from SocketSet yet — let smoltcp send FIN
|
||||
// It will be cleaned up in process() when is_open() returns false
|
||||
self.tcp_sessions.insert(key, session);
|
||||
}
|
||||
}
|
||||
BridgeMessage::UdpData { key, data } => {
|
||||
if let Some(session) = self.udp_sessions.get_mut(&key) {
|
||||
session.last_activity = tokio::time::Instant::now();
|
||||
let socket =
|
||||
self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
|
||||
let dst_endpoint = IpEndpoint::new(
|
||||
IpAddress::Ipv4(key.src_ip.into()),
|
||||
key.src_port,
|
||||
);
|
||||
// Send response: from the "server" (dst) back to the "client" (src)
|
||||
let _ = socket.send_slice(&data, dst_endpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_idle_udp_sessions(&mut self) {
|
||||
let timeout = Duration::from_secs(60);
|
||||
let now = tokio::time::Instant::now();
|
||||
let expired: Vec<SessionKey> = self
|
||||
.udp_sessions
|
||||
.iter()
|
||||
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
|
||||
for key in expired {
|
||||
if let Some(session) = self.udp_sessions.remove(&key) {
|
||||
self.sockets.remove(session.smoltcp_handle);
|
||||
debug!(
|
||||
"NAT: UDP session timed out {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main async event loop for the NAT engine.
|
||||
pub async fn run(
|
||||
mut self,
|
||||
mut packet_rx: mpsc::Receiver<Vec<u8>>,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
info!("Userspace NAT engine started");
|
||||
let mut timer = tokio::time::interval(Duration::from_millis(50));
|
||||
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(packet) = packet_rx.recv() => {
|
||||
self.inject_packet(packet);
|
||||
self.process().await;
|
||||
}
|
||||
Some(msg) = self.bridge_rx.recv() => {
|
||||
self.handle_bridge_message(msg);
|
||||
self.process().await;
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
|
||||
self.process().await;
|
||||
}
|
||||
_ = cleanup_timer.tick() => {
|
||||
self.cleanup_idle_udp_sessions();
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Userspace NAT engine shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Bridge tasks
|
||||
// ============================================================================
|
||||
|
||||
async fn tcp_bridge_task(
|
||||
key: SessionKey,
|
||||
mut data_rx: mpsc::Receiver<Vec<u8>>,
|
||||
bridge_tx: mpsc::Sender<BridgeMessage>,
|
||||
proxy_protocol: bool,
|
||||
) {
|
||||
let addr = SocketAddr::new(key.dst_ip.into(), key.dst_port);
|
||||
|
||||
// Connect to real destination with timeout
|
||||
let stream = match tokio::time::timeout(Duration::from_secs(30), TcpStream::connect(addr)).await
|
||||
{
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
debug!("NAT TCP connect to {} failed: {}", addr, e);
|
||||
let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await;
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("NAT TCP connect to {} timed out", addr);
|
||||
let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let (mut reader, mut writer) = stream.into_split();
|
||||
|
||||
// Send PROXY protocol v2 header with VPN client's tunnel IP as source
|
||||
if proxy_protocol {
|
||||
let src = SocketAddr::new(key.src_ip.into(), key.src_port);
|
||||
let dst = SocketAddr::new(key.dst_ip.into(), key.dst_port);
|
||||
let pp_header = crate::proxy_protocol::build_pp_v2_header(src, dst);
|
||||
if let Err(e) = writer.write_all(&pp_header).await {
|
||||
debug!("NAT: failed to send PP v2 header to {}: {}", addr, e);
|
||||
let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Read from real socket → send to NAT engine
|
||||
let bridge_tx2 = bridge_tx.clone();
|
||||
let key2 = key.clone();
|
||||
let read_task = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match reader.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
if bridge_tx2
|
||||
.send(BridgeMessage::TcpData {
|
||||
key: key2.clone(),
|
||||
data: buf[..n].to_vec(),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
let _ = bridge_tx2
|
||||
.send(BridgeMessage::TcpClosed { key: key2 })
|
||||
.await;
|
||||
});
|
||||
|
||||
// Receive from NAT engine → write to real socket
|
||||
while let Some(data) = data_rx.recv().await {
|
||||
if writer.write_all(&data).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
read_task.abort();
|
||||
}
|
||||
|
||||
async fn udp_bridge_task(
|
||||
key: SessionKey,
|
||||
mut data_rx: mpsc::Receiver<Vec<u8>>,
|
||||
bridge_tx: mpsc::Sender<BridgeMessage>,
|
||||
) {
|
||||
let socket = match UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!("NAT UDP bind failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let dest = SocketAddr::new(key.dst_ip.into(), key.dst_port);
|
||||
|
||||
let socket = Arc::new(socket);
|
||||
let socket2 = socket.clone();
|
||||
let bridge_tx2 = bridge_tx.clone();
|
||||
let key2 = key.clone();
|
||||
|
||||
// Read responses from real socket
|
||||
let read_task = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match socket2.recv_from(&mut buf).await {
|
||||
Ok((n, _src)) => {
|
||||
if bridge_tx2
|
||||
.send(BridgeMessage::UdpData {
|
||||
key: key2.clone(),
|
||||
data: buf[..n].to_vec(),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Forward data from NAT engine to real socket
|
||||
while let Some(data) = data_rx.recv().await {
|
||||
let _ = socket.send_to(&data, dest).await;
|
||||
}
|
||||
|
||||
read_task.abort();
|
||||
}
|
||||
@@ -1,9 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||
use base64::Engine;
|
||||
@@ -17,7 +15,7 @@ use tokio::net::UdpSocket;
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::network;
|
||||
use crate::server::{ClientInfo, ForwardingEngine, ServerState};
|
||||
use crate::tunnel::{self, TunConfig};
|
||||
|
||||
// ============================================================================
|
||||
@@ -29,9 +27,6 @@ const WG_BUFFER_SIZE: usize = MAX_UDP_PACKET;
|
||||
/// Minimum dst buffer size for boringtun encapsulate/decapsulate
|
||||
const _MIN_DST_BUF: usize = 148;
|
||||
const TIMER_TICK_MS: u64 = 100;
|
||||
const DEFAULT_WG_PORT: u16 = 51820;
|
||||
const DEFAULT_TUN_ADDRESS: &str = "10.8.0.1";
|
||||
const DEFAULT_TUN_NETMASK: &str = "255.255.255.0";
|
||||
const DEFAULT_MTU: u16 = 1420;
|
||||
|
||||
// ============================================================================
|
||||
@@ -51,27 +46,6 @@ pub struct WgPeerConfig {
|
||||
pub persistent_keepalive: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WgServerConfig {
|
||||
pub private_key: String,
|
||||
#[serde(default)]
|
||||
pub listen_port: Option<u16>,
|
||||
#[serde(default)]
|
||||
pub tun_address: Option<String>,
|
||||
#[serde(default)]
|
||||
pub tun_netmask: Option<String>,
|
||||
#[serde(default)]
|
||||
pub mtu: Option<u16>,
|
||||
pub peers: Vec<WgPeerConfig>,
|
||||
#[serde(default)]
|
||||
pub dns: Option<Vec<String>>,
|
||||
#[serde(default)]
|
||||
pub enable_nat: Option<bool>,
|
||||
#[serde(default)]
|
||||
pub subnet: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WgClientConfig {
|
||||
@@ -111,17 +85,6 @@ pub struct WgPeerInfo {
|
||||
pub stats: WgPeerStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WgServerStats {
|
||||
pub total_bytes_sent: u64,
|
||||
pub total_bytes_received: u64,
|
||||
pub total_packets_sent: u64,
|
||||
pub total_packets_received: u64,
|
||||
pub active_peers: usize,
|
||||
pub uptime_seconds: f64,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Key generation and parsing
|
||||
// ============================================================================
|
||||
@@ -228,31 +191,11 @@ impl AllowedIp {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract destination IP from an IP packet header.
|
||||
fn extract_dst_ip(packet: &[u8]) -> Option<IpAddr> {
|
||||
if packet.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let version = packet[0] >> 4;
|
||||
match version {
|
||||
4 if packet.len() >= 20 => {
|
||||
let dst = Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]);
|
||||
Some(IpAddr::V4(dst))
|
||||
}
|
||||
6 if packet.len() >= 40 => {
|
||||
let mut octets = [0u8; 16];
|
||||
octets.copy_from_slice(&packet[24..40]);
|
||||
Some(IpAddr::V6(Ipv6Addr::from(octets)))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Dynamic peer management commands
|
||||
// ============================================================================
|
||||
|
||||
enum WgCommand {
|
||||
pub enum WgCommand {
|
||||
AddPeer(WgPeerConfig, oneshot::Sender<Result<()>>),
|
||||
RemovePeer(String, oneshot::Sender<Result<()>>),
|
||||
}
|
||||
@@ -277,451 +220,6 @@ impl PeerState {
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// WgServer
|
||||
// ============================================================================
|
||||
|
||||
pub struct WgServer {
|
||||
shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
command_tx: Option<mpsc::Sender<WgCommand>>,
|
||||
shared_stats: Arc<RwLock<HashMap<String, WgPeerStats>>>,
|
||||
server_stats: Arc<RwLock<WgServerStats>>,
|
||||
started_at: Option<Instant>,
|
||||
listen_port: Option<u16>,
|
||||
}
|
||||
|
||||
impl WgServer {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
shutdown_tx: None,
|
||||
command_tx: None,
|
||||
shared_stats: Arc::new(RwLock::new(HashMap::new())),
|
||||
server_stats: Arc::new(RwLock::new(WgServerStats::default())),
|
||||
started_at: None,
|
||||
listen_port: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
self.shutdown_tx.is_some()
|
||||
}
|
||||
|
||||
pub async fn start(&mut self, config: WgServerConfig) -> Result<()> {
|
||||
if self.is_running() {
|
||||
return Err(anyhow!("WireGuard server is already running"));
|
||||
}
|
||||
|
||||
let listen_port = config.listen_port.unwrap_or(DEFAULT_WG_PORT);
|
||||
let tun_address = config
|
||||
.tun_address
|
||||
.as_deref()
|
||||
.unwrap_or(DEFAULT_TUN_ADDRESS);
|
||||
let tun_netmask = config
|
||||
.tun_netmask
|
||||
.as_deref()
|
||||
.unwrap_or(DEFAULT_TUN_NETMASK);
|
||||
let mtu = config.mtu.unwrap_or(DEFAULT_MTU);
|
||||
|
||||
// Parse server private key
|
||||
let server_private = parse_private_key(&config.private_key)?;
|
||||
let server_public = PublicKey::from(&server_private);
|
||||
|
||||
// Create rate limiter for DDoS protection
|
||||
let rate_limiter = Arc::new(RateLimiter::new(&server_public, TIMER_TICK_MS as u64));
|
||||
|
||||
// Build peer state
|
||||
let peer_index = AtomicU32::new(0);
|
||||
let mut peers: Vec<PeerState> = Vec::with_capacity(config.peers.len());
|
||||
|
||||
for peer_config in &config.peers {
|
||||
let peer_public = parse_public_key(&peer_config.public_key)?;
|
||||
let psk = match &peer_config.preshared_key {
|
||||
Some(k) => Some(parse_preshared_key(k)?),
|
||||
None => None,
|
||||
};
|
||||
let idx = peer_index.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Clone the private key for each Tunn (StaticSecret doesn't implement Clone,
|
||||
// so re-parse from config)
|
||||
let priv_copy = parse_private_key(&config.private_key)?;
|
||||
|
||||
let tunn = Tunn::new(
|
||||
priv_copy,
|
||||
peer_public,
|
||||
psk,
|
||||
peer_config.persistent_keepalive,
|
||||
idx,
|
||||
Some(rate_limiter.clone()),
|
||||
);
|
||||
|
||||
let allowed_ips: Vec<AllowedIp> = peer_config
|
||||
.allowed_ips
|
||||
.iter()
|
||||
.map(|cidr| AllowedIp::parse(cidr))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let endpoint = match &peer_config.endpoint {
|
||||
Some(ep) => Some(ep.parse::<SocketAddr>()?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
peers.push(PeerState {
|
||||
tunn,
|
||||
public_key_b64: peer_config.public_key.clone(),
|
||||
allowed_ips,
|
||||
endpoint,
|
||||
persistent_keepalive: peer_config.persistent_keepalive,
|
||||
stats: WgPeerStats::default(),
|
||||
});
|
||||
}
|
||||
|
||||
// Create TUN device
|
||||
let tun_config = TunConfig {
|
||||
name: "wg0".to_string(),
|
||||
address: tun_address.parse()?,
|
||||
netmask: tun_netmask.parse()?,
|
||||
mtu,
|
||||
};
|
||||
let tun_device = tunnel::create_tun(&tun_config)?;
|
||||
info!("WireGuard TUN device created: {}", tun_config.name);
|
||||
|
||||
// Bind UDP socket
|
||||
let udp_socket = UdpSocket::bind(format!("0.0.0.0:{}", listen_port)).await?;
|
||||
info!("WireGuard server listening on UDP port {}", listen_port);
|
||||
|
||||
// Enable IP forwarding and NAT if requested
|
||||
if config.enable_nat.unwrap_or(false) {
|
||||
network::enable_ip_forwarding()?;
|
||||
let subnet = config
|
||||
.subnet
|
||||
.as_deref()
|
||||
.unwrap_or("10.8.0.0/24");
|
||||
let iface = network::get_default_interface()?;
|
||||
network::setup_nat(subnet, &iface).await?;
|
||||
info!("NAT enabled for subnet {} via {}", subnet, iface);
|
||||
}
|
||||
|
||||
// Channels
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||
let (command_tx, command_rx) = mpsc::channel::<WgCommand>(32);
|
||||
|
||||
let shared_stats = self.shared_stats.clone();
|
||||
let server_stats = self.server_stats.clone();
|
||||
let started_at = Instant::now();
|
||||
|
||||
// Initialize shared stats
|
||||
{
|
||||
let mut stats = shared_stats.write().await;
|
||||
for peer in &peers {
|
||||
stats.insert(peer.public_key_b64.clone(), WgPeerStats::default());
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn the event loop
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = wg_server_loop(
|
||||
udp_socket,
|
||||
tun_device,
|
||||
peers,
|
||||
peer_index,
|
||||
rate_limiter,
|
||||
config.private_key.clone(),
|
||||
shared_stats,
|
||||
server_stats,
|
||||
started_at,
|
||||
shutdown_rx,
|
||||
command_rx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("WireGuard server loop error: {}", e);
|
||||
}
|
||||
info!("WireGuard server loop exited");
|
||||
});
|
||||
|
||||
self.shutdown_tx = Some(shutdown_tx);
|
||||
self.command_tx = Some(command_tx);
|
||||
self.started_at = Some(started_at);
|
||||
self.listen_port = Some(listen_port);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) -> Result<()> {
|
||||
if let Some(tx) = self.shutdown_tx.take() {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
self.command_tx = None;
|
||||
self.started_at = None;
|
||||
self.listen_port = None;
|
||||
info!("WireGuard server stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_status(&self) -> serde_json::Value {
|
||||
if self.is_running() {
|
||||
serde_json::json!({
|
||||
"state": "running",
|
||||
"listenPort": self.listen_port,
|
||||
"uptimeSeconds": self.started_at.map(|t| t.elapsed().as_secs_f64()).unwrap_or(0.0),
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({ "state": "stopped" })
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_statistics(&self) -> serde_json::Value {
|
||||
let mut stats = self.server_stats.write().await;
|
||||
if let Some(started) = self.started_at {
|
||||
stats.uptime_seconds = started.elapsed().as_secs_f64();
|
||||
}
|
||||
// Aggregate from peer stats
|
||||
let peer_stats = self.shared_stats.read().await;
|
||||
stats.active_peers = peer_stats.len();
|
||||
stats.total_bytes_sent = peer_stats.values().map(|s| s.bytes_sent).sum();
|
||||
stats.total_bytes_received = peer_stats.values().map(|s| s.bytes_received).sum();
|
||||
stats.total_packets_sent = peer_stats.values().map(|s| s.packets_sent).sum();
|
||||
stats.total_packets_received = peer_stats.values().map(|s| s.packets_received).sum();
|
||||
serde_json::to_value(&*stats).unwrap_or_default()
|
||||
}
|
||||
|
||||
pub async fn list_peers(&self) -> Vec<WgPeerInfo> {
|
||||
let stats = self.shared_stats.read().await;
|
||||
stats
|
||||
.iter()
|
||||
.map(|(key, s)| WgPeerInfo {
|
||||
public_key: key.clone(),
|
||||
allowed_ips: vec![], // populated from event loop snapshots
|
||||
endpoint: None,
|
||||
persistent_keepalive: None,
|
||||
stats: s.clone(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn add_peer(&self, config: WgPeerConfig) -> Result<()> {
|
||||
let tx = self
|
||||
.command_tx
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("Server not running"))?;
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
tx.send(WgCommand::AddPeer(config, resp_tx))
|
||||
.await
|
||||
.map_err(|_| anyhow!("Server event loop closed"))?;
|
||||
resp_rx.await.map_err(|_| anyhow!("No response"))?
|
||||
}
|
||||
|
||||
pub async fn remove_peer(&self, public_key: &str) -> Result<()> {
|
||||
let tx = self
|
||||
.command_tx
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("Server not running"))?;
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
tx.send(WgCommand::RemovePeer(public_key.to_string(), resp_tx))
|
||||
.await
|
||||
.map_err(|_| anyhow!("Server event loop closed"))?;
|
||||
resp_rx.await.map_err(|_| anyhow!("No response"))?
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Server event loop
|
||||
// ============================================================================
|
||||
|
||||
async fn wg_server_loop(
|
||||
udp_socket: UdpSocket,
|
||||
tun_device: tun::AsyncDevice,
|
||||
mut peers: Vec<PeerState>,
|
||||
peer_index: AtomicU32,
|
||||
rate_limiter: Arc<RateLimiter>,
|
||||
server_private_key_b64: String,
|
||||
shared_stats: Arc<RwLock<HashMap<String, WgPeerStats>>>,
|
||||
_server_stats: Arc<RwLock<WgServerStats>>,
|
||||
_started_at: Instant,
|
||||
mut shutdown_rx: oneshot::Receiver<()>,
|
||||
mut command_rx: mpsc::Receiver<WgCommand>,
|
||||
) -> Result<()> {
|
||||
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
|
||||
let mut tun_buf = vec![0u8; MAX_UDP_PACKET];
|
||||
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
|
||||
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
|
||||
|
||||
// Split TUN for concurrent read/write in select
|
||||
let (mut tun_reader, mut tun_writer) = tokio::io::split(tun_device);
|
||||
|
||||
// Stats sync interval
|
||||
let mut stats_timer =
|
||||
tokio::time::interval(std::time::Duration::from_secs(1));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// --- UDP receive ---
|
||||
result = udp_socket.recv_from(&mut udp_buf) => {
|
||||
let (n, src_addr) = result?;
|
||||
if n == 0 { continue; }
|
||||
|
||||
// Find which peer this packet belongs to by trying decapsulate
|
||||
let mut handled = false;
|
||||
for peer in peers.iter_mut() {
|
||||
match peer.tunn.decapsulate(Some(src_addr.ip()), &udp_buf[..n], &mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
udp_socket.send_to(packet, src_addr).await?;
|
||||
// Drain loop
|
||||
loop {
|
||||
match peer.tunn.decapsulate(None, &[], &mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(pkt) => {
|
||||
let ep = peer.endpoint.unwrap_or(src_addr);
|
||||
udp_socket.send_to(pkt, ep).await?;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::WriteToTunnelV4(packet, addr) => {
|
||||
if peer.matches_dst(IpAddr::V4(addr)) {
|
||||
let pkt_len = packet.len() as u64;
|
||||
tun_writer.write_all(packet).await?;
|
||||
peer.stats.bytes_received += pkt_len;
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::WriteToTunnelV6(packet, addr) => {
|
||||
if peer.matches_dst(IpAddr::V6(addr)) {
|
||||
let pkt_len = packet.len() as u64;
|
||||
tun_writer.write_all(packet).await?;
|
||||
peer.stats.bytes_received += pkt_len;
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::Done => {
|
||||
// This peer didn't recognize the packet, try next
|
||||
continue;
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("decapsulate error from {}: {:?}", src_addr, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !handled {
|
||||
debug!("No peer matched UDP packet from {}", src_addr);
|
||||
}
|
||||
}
|
||||
|
||||
// --- TUN read ---
|
||||
result = tun_reader.read(&mut tun_buf) => {
|
||||
let n = result?;
|
||||
if n == 0 { continue; }
|
||||
|
||||
let dst_ip = match extract_dst_ip(&tun_buf[..n]) {
|
||||
Some(ip) => ip,
|
||||
None => { continue; }
|
||||
};
|
||||
|
||||
// Find peer whose AllowedIPs match the destination
|
||||
for peer in peers.iter_mut() {
|
||||
if !peer.matches_dst(dst_ip) {
|
||||
continue;
|
||||
}
|
||||
match peer.tunn.encapsulate(&tun_buf[..n], &mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
if let Some(endpoint) = peer.endpoint {
|
||||
let pkt_len = n as u64;
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
peer.stats.bytes_sent += pkt_len;
|
||||
peer.stats.packets_sent += 1;
|
||||
} else {
|
||||
debug!("No endpoint for peer {}, dropping packet", peer.public_key_b64);
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("encapsulate error for peer {}: {:?}", peer.public_key_b64, e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// --- Timer tick (100ms) for WireGuard timers ---
|
||||
_ = timer.tick() => {
|
||||
for peer in peers.iter_mut() {
|
||||
match peer.tunn.update_timers(&mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
if let Some(endpoint) = peer.endpoint {
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("Timer error for peer {}: {:?}", peer.public_key_b64, e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Sync stats to shared state ---
|
||||
_ = stats_timer.tick() => {
|
||||
let mut shared = shared_stats.write().await;
|
||||
for peer in peers.iter() {
|
||||
shared.insert(peer.public_key_b64.clone(), peer.stats.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// --- Dynamic peer commands ---
|
||||
cmd = command_rx.recv() => {
|
||||
match cmd {
|
||||
Some(WgCommand::AddPeer(config, resp_tx)) => {
|
||||
let result = add_peer_to_loop(
|
||||
&mut peers,
|
||||
&config,
|
||||
&peer_index,
|
||||
&rate_limiter,
|
||||
&server_private_key_b64,
|
||||
);
|
||||
if result.is_ok() {
|
||||
let mut shared = shared_stats.write().await;
|
||||
shared.insert(config.public_key.clone(), WgPeerStats::default());
|
||||
}
|
||||
let _ = resp_tx.send(result);
|
||||
}
|
||||
Some(WgCommand::RemovePeer(pubkey, resp_tx)) => {
|
||||
let prev_len = peers.len();
|
||||
peers.retain(|p| p.public_key_b64 != pubkey);
|
||||
if peers.len() < prev_len {
|
||||
let mut shared = shared_stats.write().await;
|
||||
shared.remove(&pubkey);
|
||||
let _ = resp_tx.send(Ok(()));
|
||||
} else {
|
||||
let _ = resp_tx.send(Err(anyhow!("Peer not found: {}", pubkey)));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("Command channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Shutdown ---
|
||||
_ = &mut shutdown_rx => {
|
||||
info!("WireGuard server shutdown signal received");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_peer_to_loop(
|
||||
peers: &mut Vec<PeerState>,
|
||||
@@ -776,6 +274,410 @@ fn add_peer_to_loop(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Integrated WG listener (shares ServerState with WS/QUIC)
|
||||
// ============================================================================
|
||||
|
||||
/// Configuration for the integrated WireGuard listener.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WgListenerConfig {
|
||||
pub private_key: String,
|
||||
pub listen_port: u16,
|
||||
pub peers: Vec<WgPeerConfig>,
|
||||
}
|
||||
|
||||
/// Extract the first /32 IPv4 address from a list of AllowedIp entries.
|
||||
/// This is the peer's VPN IP used for return-packet routing.
|
||||
fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
|
||||
for aip in allowed_ips {
|
||||
if let IpAddr::V4(v4) = aip.addr {
|
||||
if aip.prefix_len == 32 {
|
||||
return Some(v4);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Timestamp helper (mirrors server.rs timestamp_now).
|
||||
fn wg_timestamp_now() -> String {
|
||||
use std::time::SystemTime;
|
||||
let duration = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default();
|
||||
format!("{}", duration.as_secs())
|
||||
}
|
||||
|
||||
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool).
|
||||
/// Returns the VPN IP and the per-peer return-packet receiver.
|
||||
async fn register_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
peer: &PeerState,
|
||||
wg_return_tx: &mpsc::Sender<(String, Vec<u8>)>,
|
||||
) -> Result<Option<Ipv4Addr>> {
|
||||
let vpn_ip = match extract_peer_vpn_ip(&peer.allowed_ips) {
|
||||
Some(ip) => ip,
|
||||
None => {
|
||||
warn!("WG peer {} has no /32 IPv4 in allowed_ips, skipping registration",
|
||||
peer.public_key_b64);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
|
||||
|
||||
// Reserve IP in the pool
|
||||
if let Err(e) = state.ip_pool.lock().await.reserve(vpn_ip, &client_id) {
|
||||
warn!("Failed to reserve IP {} for WG peer {}: {}", vpn_ip, client_id, e);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Create per-peer return channel and register in tun_routes
|
||||
let fwd_mode = state.config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let forwarding_active = fwd_mode == "tun" || fwd_mode == "socket";
|
||||
if forwarding_active {
|
||||
let (peer_return_tx, mut peer_return_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
state.tun_routes.write().await.insert(vpn_ip, peer_return_tx);
|
||||
|
||||
// Spawn relay task: per-peer channel → merged channel tagged with pubkey
|
||||
let relay_tx = wg_return_tx.clone();
|
||||
let pubkey = peer.public_key_b64.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(packet) = peer_return_rx.recv().await {
|
||||
if relay_tx.send((pubkey.clone(), packet)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Insert ClientInfo
|
||||
let client_info = ClientInfo {
|
||||
client_id: client_id.clone(),
|
||||
assigned_ip: vpn_ip.to_string(),
|
||||
connected_since: wg_timestamp_now(),
|
||||
bytes_sent: 0,
|
||||
bytes_received: 0,
|
||||
packets_dropped: 0,
|
||||
bytes_dropped: 0,
|
||||
last_keepalive_at: None,
|
||||
keepalives_received: 0,
|
||||
rate_limit_bytes_per_sec: None,
|
||||
burst_bytes: None,
|
||||
authenticated_key: peer.public_key_b64.clone(),
|
||||
registered_client_id: client_id,
|
||||
remote_addr: peer.endpoint.map(|e| e.to_string()),
|
||||
transport_type: "wireguard".to_string(),
|
||||
};
|
||||
state.clients.write().await.insert(client_info.client_id.clone(), client_info);
|
||||
|
||||
Ok(Some(vpn_ip))
|
||||
}
|
||||
|
||||
/// Unregister a WG peer from ServerState.
|
||||
async fn unregister_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
pubkey: &str,
|
||||
vpn_ip: Option<Ipv4Addr>,
|
||||
) {
|
||||
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
|
||||
|
||||
if let Some(ip) = vpn_ip {
|
||||
state.tun_routes.write().await.remove(&ip);
|
||||
state.ip_pool.lock().await.release(&ip);
|
||||
}
|
||||
state.clients.write().await.remove(&client_id);
|
||||
state.rate_limiters.lock().await.remove(&client_id);
|
||||
}
|
||||
|
||||
/// Integrated WireGuard listener that shares ServerState with WS/QUIC listeners.
|
||||
/// Uses the shared ForwardingEngine for packet routing instead of its own TUN device.
|
||||
pub async fn run_wg_listener(
|
||||
state: Arc<ServerState>,
|
||||
config: WgListenerConfig,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
mut command_rx: mpsc::Receiver<WgCommand>,
|
||||
) -> Result<()> {
|
||||
// Parse server private key
|
||||
let server_private = parse_private_key(&config.private_key)?;
|
||||
let server_public = PublicKey::from(&server_private);
|
||||
|
||||
// Create rate limiter for DDoS protection
|
||||
let rate_limiter = Arc::new(RateLimiter::new(&server_public, TIMER_TICK_MS as u64));
|
||||
|
||||
// Build initial peer state
|
||||
let peer_index = AtomicU32::new(0);
|
||||
let mut peers: Vec<PeerState> = Vec::with_capacity(config.peers.len());
|
||||
|
||||
for peer_config in &config.peers {
|
||||
let peer_public = parse_public_key(&peer_config.public_key)?;
|
||||
let psk = match &peer_config.preshared_key {
|
||||
Some(k) => Some(parse_preshared_key(k)?),
|
||||
None => None,
|
||||
};
|
||||
let idx = peer_index.fetch_add(1, Ordering::Relaxed);
|
||||
let priv_copy = parse_private_key(&config.private_key)?;
|
||||
|
||||
let tunn = Tunn::new(
|
||||
priv_copy,
|
||||
peer_public,
|
||||
psk,
|
||||
peer_config.persistent_keepalive,
|
||||
idx,
|
||||
Some(rate_limiter.clone()),
|
||||
);
|
||||
|
||||
let allowed_ips: Vec<AllowedIp> = peer_config
|
||||
.allowed_ips
|
||||
.iter()
|
||||
.map(|cidr| AllowedIp::parse(cidr))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let endpoint = match &peer_config.endpoint {
|
||||
Some(ep) => Some(ep.parse::<SocketAddr>()?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
peers.push(PeerState {
|
||||
tunn,
|
||||
public_key_b64: peer_config.public_key.clone(),
|
||||
allowed_ips,
|
||||
endpoint,
|
||||
persistent_keepalive: peer_config.persistent_keepalive,
|
||||
stats: WgPeerStats::default(),
|
||||
});
|
||||
}
|
||||
|
||||
// Bind UDP socket
|
||||
let udp_socket = UdpSocket::bind(format!("0.0.0.0:{}", config.listen_port)).await?;
|
||||
info!("WireGuard listener started on UDP port {}", config.listen_port);
|
||||
|
||||
// Merged return-packet channel: all per-peer channels feed into this
|
||||
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
|
||||
|
||||
// Register initial peers in ServerState and track their VPN IPs
|
||||
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
|
||||
for peer in &peers {
|
||||
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
|
||||
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
|
||||
}
|
||||
}
|
||||
|
||||
// Buffers
|
||||
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
|
||||
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
|
||||
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
|
||||
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// --- UDP receive → decapsulate → ForwardingEngine ---
|
||||
result = udp_socket.recv_from(&mut udp_buf) => {
|
||||
let (n, src_addr) = result?;
|
||||
if n == 0 { continue; }
|
||||
|
||||
let mut handled = false;
|
||||
for peer in peers.iter_mut() {
|
||||
match peer.tunn.decapsulate(Some(src_addr.ip()), &udp_buf[..n], &mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
udp_socket.send_to(packet, src_addr).await?;
|
||||
loop {
|
||||
match peer.tunn.decapsulate(None, &[], &mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(pkt) => {
|
||||
let ep = peer.endpoint.unwrap_or(src_addr);
|
||||
udp_socket.send_to(pkt, ep).await?;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::WriteToTunnelV4(packet, addr) => {
|
||||
if peer.matches_dst(IpAddr::V4(addr)) {
|
||||
let pkt_len = packet.len() as u64;
|
||||
// Forward via shared forwarding engine
|
||||
let mut engine = state.forwarding_engine.lock().await;
|
||||
match &mut *engine {
|
||||
ForwardingEngine::Tun(writer) => {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
if let Err(e) = writer.write_all(packet).await {
|
||||
warn!("TUN write error for WG peer: {}", e);
|
||||
}
|
||||
}
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(packet.to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
peer.stats.bytes_received += pkt_len;
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::WriteToTunnelV6(packet, addr) => {
|
||||
if peer.matches_dst(IpAddr::V6(addr)) {
|
||||
let pkt_len = packet.len() as u64;
|
||||
let mut engine = state.forwarding_engine.lock().await;
|
||||
match &mut *engine {
|
||||
ForwardingEngine::Tun(writer) => {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
if let Err(e) = writer.write_all(packet).await {
|
||||
warn!("TUN write error for WG peer: {}", e);
|
||||
}
|
||||
}
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(packet.to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
peer.stats.bytes_received += pkt_len;
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::Done => { continue; }
|
||||
TunnResult::Err(e) => {
|
||||
debug!("decapsulate error from {}: {:?}", src_addr, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !handled {
|
||||
debug!("No WG peer matched UDP packet from {}", src_addr);
|
||||
}
|
||||
}
|
||||
|
||||
// --- Return packets from tun_routes → encapsulate → UDP ---
|
||||
Some((pubkey, packet)) = wg_return_rx.recv() => {
|
||||
if let Some(peer) = peers.iter_mut().find(|p| p.public_key_b64 == pubkey) {
|
||||
match peer.tunn.encapsulate(&packet, &mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(out) => {
|
||||
if let Some(endpoint) = peer.endpoint {
|
||||
let pkt_len = packet.len() as u64;
|
||||
udp_socket.send_to(out, endpoint).await?;
|
||||
peer.stats.bytes_sent += pkt_len;
|
||||
peer.stats.packets_sent += 1;
|
||||
} else {
|
||||
debug!("No endpoint for WG peer {}, dropping return packet",
|
||||
peer.public_key_b64);
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("encapsulate error for WG peer {}: {:?}",
|
||||
peer.public_key_b64, e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- WireGuard protocol timers (100ms) ---
|
||||
_ = timer.tick() => {
|
||||
for peer in peers.iter_mut() {
|
||||
match peer.tunn.update_timers(&mut dst_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
if let Some(endpoint) = peer.endpoint {
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("Timer error for WG peer {}: {:?}",
|
||||
peer.public_key_b64, e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Sync stats to ServerState (every 1s) ---
|
||||
_ = stats_timer.tick() => {
|
||||
let mut clients = state.clients.write().await;
|
||||
let mut stats = state.stats.write().await;
|
||||
for peer in peers.iter() {
|
||||
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
// Update stats delta
|
||||
let prev_sent = info.bytes_sent;
|
||||
let prev_recv = info.bytes_received;
|
||||
info.bytes_sent = peer.stats.bytes_sent;
|
||||
info.bytes_received = peer.stats.bytes_received;
|
||||
info.remote_addr = peer.endpoint.map(|e| e.to_string());
|
||||
|
||||
// Update aggregate stats
|
||||
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent);
|
||||
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Dynamic peer commands ---
|
||||
cmd = command_rx.recv() => {
|
||||
match cmd {
|
||||
Some(WgCommand::AddPeer(peer_config, resp_tx)) => {
|
||||
let result = add_peer_to_loop(
|
||||
&mut peers,
|
||||
&peer_config,
|
||||
&peer_index,
|
||||
&rate_limiter,
|
||||
&config.private_key,
|
||||
);
|
||||
if result.is_ok() {
|
||||
// Register new peer in ServerState
|
||||
let peer = peers.last().unwrap();
|
||||
match register_wg_peer(&state, peer, &wg_return_tx).await {
|
||||
Ok(Some(ip)) => {
|
||||
peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
warn!("Failed to register WG peer: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = resp_tx.send(result);
|
||||
}
|
||||
Some(WgCommand::RemovePeer(pubkey, resp_tx)) => {
|
||||
let prev_len = peers.len();
|
||||
peers.retain(|p| p.public_key_b64 != pubkey);
|
||||
if peers.len() < prev_len {
|
||||
let vpn_ip = peer_vpn_ips.remove(&pubkey);
|
||||
unregister_wg_peer(&state, &pubkey, vpn_ip).await;
|
||||
let _ = resp_tx.send(Ok(()));
|
||||
} else {
|
||||
let _ = resp_tx.send(Err(anyhow!("Peer not found: {}", pubkey)));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("WG command channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Shutdown ---
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("WireGuard listener shutdown signal received");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup: unregister all peers from ServerState
|
||||
for peer in &peers {
|
||||
let vpn_ip = peer_vpn_ips.get(&peer.public_key_b64).copied();
|
||||
unregister_wg_peer(&state, &peer.public_key_b64, vpn_ip).await;
|
||||
}
|
||||
|
||||
info!("WireGuard listener stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// WgClient
|
||||
// ============================================================================
|
||||
@@ -1096,6 +998,8 @@ fn chrono_now() -> String {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tunnel::extract_dst_ip;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
#[test]
|
||||
fn test_generate_wg_keypair() {
|
||||
|
||||
@@ -211,8 +211,8 @@ tap.test('throttled connection: handshake succeeds through throttle', async () =
|
||||
});
|
||||
|
||||
tap.test('sustained keepalive under throttle', async () => {
|
||||
// Wait for at least 2 keepalive cycles (3s interval)
|
||||
await delay(8000);
|
||||
// Wait for at least 1 keepalive cycle (3s interval)
|
||||
await delay(4000);
|
||||
|
||||
const client = allClients[0];
|
||||
const stats = await client.getStatistics();
|
||||
@@ -262,14 +262,14 @@ tap.test('rate limiting combined with network throttle', async () => {
|
||||
await server.removeClientRateLimit(targetId);
|
||||
});
|
||||
|
||||
tap.test('burst waves: 3 waves of 3 clients', async () => {
|
||||
tap.test('burst waves: 2 waves of 2 clients', async () => {
|
||||
const initialCount = (await server.listClients()).length;
|
||||
|
||||
for (let wave = 0; wave < 3; wave++) {
|
||||
for (let wave = 0; wave < 2; wave++) {
|
||||
const waveClients: VpnClient[] = [];
|
||||
|
||||
// Connect 3 clients
|
||||
for (let i = 0; i < 3; i++) {
|
||||
// Connect 2 clients
|
||||
for (let i = 0; i < 2; i++) {
|
||||
const c = await createConnectedClient(proxyPort);
|
||||
waveClients.push(c);
|
||||
}
|
||||
@@ -277,7 +277,7 @@ tap.test('burst waves: 3 waves of 3 clients', async () => {
|
||||
// Verify all connected
|
||||
await waitFor(async () => {
|
||||
const all = await server.listClients();
|
||||
return all.length === initialCount + 3;
|
||||
return all.length === initialCount + 2;
|
||||
});
|
||||
|
||||
// Disconnect all wave clients
|
||||
@@ -296,7 +296,7 @@ tap.test('burst waves: 3 waves of 3 clients', async () => {
|
||||
|
||||
// Verify total connections accumulated
|
||||
const stats = await server.getStatistics();
|
||||
expect(stats.totalConnections).toBeGreaterThanOrEqual(9 + initialCount);
|
||||
expect(stats.totalConnections).toBeGreaterThanOrEqual(4 + initialCount);
|
||||
|
||||
// Original clients still connected
|
||||
const remaining = await server.listClients();
|
||||
@@ -315,7 +315,7 @@ tap.test('aggressive throttle: 10 KB/s', async () => {
|
||||
expect(status.state).toEqual('connected');
|
||||
|
||||
// Wait for keepalive exchange (might take longer due to throttle)
|
||||
await delay(10000);
|
||||
await delay(4000);
|
||||
|
||||
const stats = await client.getStatistics();
|
||||
expect(stats.keepalivesSent).toBeGreaterThanOrEqual(1);
|
||||
@@ -332,7 +332,7 @@ tap.test('post-load health: direct connection still works', async () => {
|
||||
const status = await directClient.getStatus();
|
||||
expect(status.state).toEqual('connected');
|
||||
|
||||
await delay(5000);
|
||||
await delay(3500);
|
||||
|
||||
const stats = await directClient.getStatistics();
|
||||
expect(stats.keepalivesSent).toBeGreaterThanOrEqual(1);
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartvpn',
|
||||
version: '1.9.0',
|
||||
version: '1.13.0',
|
||||
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
|
||||
}
|
||||
|
||||
@@ -40,6 +40,9 @@ export interface IVpnClientConfig {
|
||||
transport?: 'auto' | 'websocket' | 'quic' | 'wireguard';
|
||||
/** For QUIC: SHA-256 hash of server certificate (base64) for cert pinning */
|
||||
serverCertHash?: string;
|
||||
/** Forwarding mode: 'tun' (TUN device, requires root) or 'testing' (no TUN).
|
||||
* Default: 'testing'. */
|
||||
forwardingMode?: 'tun' | 'testing';
|
||||
/** WireGuard: client private key (base64, X25519) */
|
||||
wgPrivateKey?: string;
|
||||
/** WireGuard: client TUN address (e.g. 10.8.0.2) */
|
||||
@@ -54,6 +57,8 @@ export interface IVpnClientConfig {
|
||||
wgEndpoint?: string;
|
||||
/** WireGuard: allowed IPs (CIDR strings, e.g. ['0.0.0.0/0']) */
|
||||
wgAllowedIps?: string[];
|
||||
/** Client-defined tags reported to the server after connection (informational, not for access control) */
|
||||
clientDefinedClientTags?: string[];
|
||||
}
|
||||
|
||||
export interface IVpnClientOptions {
|
||||
@@ -86,16 +91,22 @@ export interface IVpnServerConfig {
|
||||
keepaliveIntervalSecs?: number;
|
||||
/** Enable NAT/masquerade for client traffic */
|
||||
enableNat?: boolean;
|
||||
/** Forwarding mode: 'tun' (kernel TUN, requires root), 'socket' (userspace NAT),
|
||||
* or 'testing' (monitoring only). Default: 'testing'. */
|
||||
forwardingMode?: 'tun' | 'socket' | 'testing';
|
||||
/** Default rate limit for new clients (bytes/sec). Omit for unlimited. */
|
||||
defaultRateLimitBytesPerSec?: number;
|
||||
/** Default burst size for new clients (bytes). Omit for unlimited. */
|
||||
defaultBurstBytes?: number;
|
||||
/** Transport mode: 'both' (default, WS+QUIC), 'websocket', 'quic', or 'wireguard' */
|
||||
transportMode?: 'websocket' | 'quic' | 'both' | 'wireguard';
|
||||
/** Transport mode: 'all' (default, WS+QUIC+WG if configured), 'both' (WS+QUIC),
|
||||
* 'websocket', 'quic', or 'wireguard' */
|
||||
transportMode?: 'websocket' | 'quic' | 'both' | 'all' | 'wireguard';
|
||||
/** QUIC listen address (host:port). Defaults to listenAddr. */
|
||||
quicListenAddr?: string;
|
||||
/** QUIC idle timeout in seconds (default: 30) */
|
||||
quicIdleTimeoutSecs?: number;
|
||||
/** WireGuard: server X25519 private key (base64). Required when transport includes WG. */
|
||||
wgPrivateKey?: string;
|
||||
/** WireGuard: UDP listen port (default: 51820) */
|
||||
wgListenPort?: number;
|
||||
/** WireGuard: configured peers */
|
||||
@@ -109,6 +120,11 @@ export interface IVpnServerConfig {
|
||||
/** Server-level IP block list — applied at TCP accept, before Noise handshake.
|
||||
* Supports exact IPs, CIDR, wildcards, ranges. */
|
||||
connectionIpBlockList?: string[];
|
||||
/** When true and forwardingMode is 'socket', the userspace NAT engine prepends
|
||||
* PROXY protocol v2 headers on outbound TCP connections, conveying the VPN client's
|
||||
* tunnel IP as the source address. This allows downstream services (e.g. SmartProxy)
|
||||
* to see the real VPN client identity instead of 127.0.0.1. */
|
||||
socketForwardProxyProtocol?: boolean;
|
||||
}
|
||||
|
||||
export interface IVpnServerOptions {
|
||||
@@ -165,6 +181,8 @@ export interface IVpnClientInfo {
|
||||
registeredClientId: string;
|
||||
/** Real client IP:port (from PROXY protocol or direct TCP connection) */
|
||||
remoteAddr?: string;
|
||||
/** Transport used: "websocket", "quic", or "wireguard" */
|
||||
transportType: string;
|
||||
}
|
||||
|
||||
export interface IVpnServerStatistics extends IVpnStatistics {
|
||||
@@ -274,7 +292,11 @@ export interface IClientEntry {
|
||||
priority?: number;
|
||||
/** Whether this client is enabled (default: true) */
|
||||
enabled?: boolean;
|
||||
/** Tags for grouping (e.g. ["engineering", "office"]) */
|
||||
/** Tags assigned by the server admin — trusted, used for access control (e.g. ["engineering", "office"]) */
|
||||
serverDefinedClientTags?: string[];
|
||||
/** Tags reported by the connecting client — informational only, never used for access control */
|
||||
clientDefinedClientTags?: string[];
|
||||
/** @deprecated Use serverDefinedClientTags instead. Legacy field kept for backward compatibility. */
|
||||
tags?: string[];
|
||||
/** Optional description */
|
||||
description?: string;
|
||||
|
||||
Reference in New Issue
Block a user