10 Commits

15 changed files with 1108 additions and 678 deletions

View File

@@ -1,5 +1,42 @@
# Changelog # Changelog
## 2026-03-30 - 1.15.0 - feat(vpnserver)
add nftables-backed destination policy enforcement for TUN mode
- add @push.rocks/smartnftables dependency and export it through the plugin layer
- apply destination policy rules via nftables when starting the server in TUN mode
- add periodic nftables health checks and best-effort cleanup on server stop
- update documentation for destination routing policy, socket transport mode, trusted client tags, events, and service generation
## 2026-03-30 - 1.14.0 - feat(nat)
add destination routing policy support for socket-mode VPN traffic
- introduce configurable destinationPolicy settings in server and TypeScript interfaces
- apply allow, block, and forceTarget routing decisions when creating TCP and UDP NAT sessions
- export ACL IP matching helper for destination policy evaluation
## 2026-03-30 - 1.13.0 - feat(client-registry)
separate trusted server-defined client tags from client-reported tags with legacy tag compatibility
- Adds distinct serverDefinedClientTags and clientDefinedClientTags fields to client registry and TypeScript interfaces.
- Treats legacy tags values as serverDefinedClientTags during deserialization and server-side create/update flows for backward compatibility.
- Clarifies that only server-defined tags are trusted for access control while client-defined tags are informational only.
## 2026-03-30 - 1.12.0 - feat(server)
add optional PROXY protocol v2 headers for socket-based userspace NAT forwarding
- introduce a socketForwardProxyProtocol server option in Rust and TypeScript interfaces
- pass the new setting into the userspace NAT engine and TCP bridge tasks
- prepend PROXY protocol v2 headers on outbound TCP connections when socket forwarding is enabled
## 2026-03-30 - 1.11.0 - feat(server)
unify WireGuard into the shared server transport pipeline
- add integrated WireGuard server support to VpnServer with shared startup, shutdown, status, statistics, and peer management
- introduce transportMode 'all' as the default and add server config support for wgPrivateKey, wgListenPort, and preconfigured peers
- register WireGuard peers in the shared client registry and IP pool so they use the same forwarding engine, routing, and monitoring as WebSocket and QUIC clients
- expose transportType in server client info and update TypeScript interfaces and documentation to reflect unified multi-transport forwarding
## 2026-03-30 - 1.10.2 - fix(client) ## 2026-03-30 - 1.10.2 - fix(client)
wait for the connection task to shut down cleanly before disconnecting and increase test timeout wait for the connection task to shut down cleanly before disconnecting and increase test timeout

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartvpn", "name": "@push.rocks/smartvpn",
"version": "1.10.2", "version": "1.15.0",
"private": false, "private": false,
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon", "description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
"type": "module", "type": "module",
@@ -29,6 +29,7 @@
], ],
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@push.rocks/smartnftables": "1.1.0",
"@push.rocks/smartpath": "^6.0.0", "@push.rocks/smartpath": "^6.0.0",
"@push.rocks/smartrust": "^1.3.2" "@push.rocks/smartrust": "^1.3.2"
}, },

11
pnpm-lock.yaml generated
View File

@@ -8,6 +8,9 @@ importers:
.: .:
dependencies: dependencies:
'@push.rocks/smartnftables':
specifier: 1.1.0
version: 1.1.0
'@push.rocks/smartpath': '@push.rocks/smartpath':
specifier: ^6.0.0 specifier: ^6.0.0
version: 6.0.0 version: 6.0.0
@@ -1132,6 +1135,9 @@ packages:
'@push.rocks/smartnetwork@4.5.2': '@push.rocks/smartnetwork@4.5.2':
resolution: {integrity: sha512-lbMMyc2f/WWd5+qzZyF1ynXndjCtasxPWmj/d8GUuis9rDrW7sLIT1PlAPC2F6Qsy4H/K32JrYU+01d/6sWObg==} resolution: {integrity: sha512-lbMMyc2f/WWd5+qzZyF1ynXndjCtasxPWmj/d8GUuis9rDrW7sLIT1PlAPC2F6Qsy4H/K32JrYU+01d/6sWObg==}
'@push.rocks/smartnftables@1.1.0':
resolution: {integrity: sha512-7JNzerlW20HEl2wKMBIHltwneCQRpXiD2lJkXZZc02ctnfjgFejXVDIeWomhPx6PZ0Z6zmqdF6rrFDtDHyqqfA==}
'@push.rocks/smartnpm@2.0.6': '@push.rocks/smartnpm@2.0.6':
resolution: {integrity: sha512-7anKDOjX6gXWs1IAc+YWz9ZZ8gDsTwaLh+CxRnGHjAawOmK788NrrgVCg2Fb3qojrPnoxecc46F8Ivp1BT7Izw==} resolution: {integrity: sha512-7anKDOjX6gXWs1IAc+YWz9ZZ8gDsTwaLh+CxRnGHjAawOmK788NrrgVCg2Fb3qojrPnoxecc46F8Ivp1BT7Izw==}
@@ -5335,6 +5341,11 @@ snapshots:
transitivePeerDependencies: transitivePeerDependencies:
- supports-color - supports-color
'@push.rocks/smartnftables@1.1.0':
dependencies:
'@push.rocks/smartlog': 3.2.1
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartnpm@2.0.6': '@push.rocks/smartnpm@2.0.6':
dependencies: dependencies:
'@push.rocks/consolecolor': 2.0.3 '@push.rocks/consolecolor': 2.0.3

204
readme.md
View File

@@ -9,7 +9,8 @@ A high-performance VPN solution with a **TypeScript control plane** and a **Rust
📊 **Adaptive QoS**: per-client rate limiting, priority queues, connection quality tracking 📊 **Adaptive QoS**: per-client rate limiting, priority queues, connection quality tracking
🔄 **Hub API**: one `createClient()` call generates keys, assigns IP, returns both SmartVPN + WireGuard configs 🔄 **Hub API**: one `createClient()` call generates keys, assigns IP, returns both SmartVPN + WireGuard configs
📡 **Real-time telemetry**: RTT, jitter, loss ratio, link health — all via typed APIs 📡 **Real-time telemetry**: RTT, jitter, loss ratio, link health — all via typed APIs
🌐 **Flexible forwarding**: TUN device (kernel), userspace NAT (no root), or testing mode 🌐 **Unified forwarding pipeline**: all transports share the same engine — TUN (kernel), userspace NAT (no root), or testing mode
🎯 **Destination routing policy**: force-target, block, or allow traffic per destination with nftables integration
## Issue Reporting and Security ## Issue Reporting and Security
@@ -36,11 +37,38 @@ The package ships with pre-compiled Rust binaries for **linux/amd64** and **linu
│ Config validation │ │ WS + QUIC + WireGuard │ │ Config validation │ │ WS + QUIC + WireGuard │
│ Hub: client management │ │ TUN device, IP pool, NAT │ │ Hub: client management │ │ TUN device, IP pool, NAT │
│ WireGuard .conf generation │ │ Rate limiting, ACLs, QoS │ │ WireGuard .conf generation │ │ Rate limiting, ACLs, QoS │
│ nftables destination policy │ │ Destination routing, nftables│
└──────────────────────────────┘ └───────────────────────────────┘ └──────────────────────────────┘ └───────────────────────────────┘
``` ```
**Split-plane design** — TypeScript handles orchestration, config, and DX; Rust handles every hot-path byte with zero-copy async I/O (tokio, mimalloc). **Split-plane design** — TypeScript handles orchestration, config, and DX; Rust handles every hot-path byte with zero-copy async I/O (tokio, mimalloc).
### IPC Transport Modes
The bridge between TypeScript and Rust supports two transport modes:
| Mode | Use Case | How It Works |
|------|----------|-------------|
| **stdio** | Development, testing | Spawns the Rust daemon as a child process, communicates over stdin/stdout |
| **socket** | Production | Connects to an already-running daemon via Unix domain socket, with optional auto-reconnect |
```typescript
// Development: spawn the daemon
const server = new VpnServer({ transport: { transport: 'stdio' } });
// Production: connect to running daemon
const server = new VpnServer({
transport: {
transport: 'socket',
socketPath: '/var/run/smartvpn.sock',
autoReconnect: true,
reconnectBaseDelayMs: 100,
reconnectMaxDelayMs: 5000,
maxReconnectAttempts: 10,
},
});
```
## Quick Start 🚀 ## Quick Start 🚀
### 1. Start a VPN Server (Hub) ### 1. Start a VPN Server (Hub)
@@ -54,8 +82,9 @@ await server.start({
privateKey: '<server-noise-private-key-base64>', privateKey: '<server-noise-private-key-base64>',
publicKey: '<server-noise-public-key-base64>', publicKey: '<server-noise-public-key-base64>',
subnet: '10.8.0.0/24', subnet: '10.8.0.0/24',
transportMode: 'both', // WebSocket + QUIC simultaneously transportMode: 'all', // WebSocket + QUIC + WireGuard simultaneously (default)
forwardingMode: 'tun', // 'tun' (kernel), 'socket' (userspace NAT), or 'testing' forwardingMode: 'tun', // 'tun' (kernel), 'socket' (userspace NAT), or 'testing'
wgPrivateKey: '<server-wg-private-key-base64>', // required for WireGuard transport
enableNat: true, enableNat: true,
dns: ['1.1.1.1', '8.8.8.8'], dns: ['1.1.1.1', '8.8.8.8'],
}); });
@@ -66,7 +95,7 @@ await server.start({
```typescript ```typescript
const bundle = await server.createClient({ const bundle = await server.createClient({
clientId: 'alice-laptop', clientId: 'alice-laptop',
tags: ['engineering'], serverDefinedClientTags: ['engineering'], // trusted tags for access control
security: { security: {
destinationAllowList: ['10.0.0.0/8'], // can only reach internal network destinationAllowList: ['10.0.0.0/8'], // can only reach internal network
destinationBlockList: ['10.0.0.99'], // except this host destinationBlockList: ['10.0.0.99'], // except this host
@@ -109,7 +138,7 @@ Every client authenticates with a **Noise IK handshake** (`Noise_IK_25519_ChaCha
| **QUIC** | UDP (via quinn) | Low latency, datagram support for IP packets | | **QUIC** | UDP (via quinn) | Low latency, datagram support for IP packets |
| **WireGuard** | UDP (via boringtun) | Standard WG clients (iOS, Android, wg-quick) | | **WireGuard** | UDP (via boringtun) | Standard WG clients (iOS, Android, wg-quick) |
The server can run **all three simultaneously** with `transportMode: 'both'` (WS + QUIC) or `'wireguard'`. Clients auto-negotiate with `transport: 'auto'` (tries QUIC first, falls back to WS). The server runs **all three simultaneously** by default with `transportMode: 'all'`. All transports share the same unified forwarding pipeline (`ForwardingEngine`), IP pool, client registry, and stats — so WireGuard peers get the same userspace NAT, rate limiting, and monitoring as WS/QUIC clients. Clients auto-negotiate with `transport: 'auto'` (tries QUIC first, falls back to WS).
### 🛡️ ACL Engine (SmartProxy-Aligned) ### 🛡️ ACL Engine (SmartProxy-Aligned)
@@ -154,6 +183,47 @@ await server.start({
- `remoteAddr` field on `IVpnClientInfo` exposes the real client IP for monitoring - `remoteAddr` field on `IVpnClientInfo` exposes the real client IP for monitoring
- **Security**: must be `false` (default) when accepting direct connections — only enable behind a trusted proxy - **Security**: must be `false` (default) when accepting direct connections — only enable behind a trusted proxy
### 🎯 Destination Routing Policy
Control where decrypted VPN client traffic goes — force it to a specific target, block it, or allow it through. Evaluated per-packet before per-client ACLs.
```typescript
await server.start({
// ...
forwardingMode: 'socket', // userspace NAT mode
destinationPolicy: {
default: 'forceTarget', // redirect all traffic to a target
target: '127.0.0.1', // target IP for 'forceTarget' mode
allowList: ['10.0.0.0/8'], // these destinations pass through directly
blockList: ['10.0.0.99'], // always blocked (deny overrides allow)
},
});
```
**Policy modes:**
| Mode | Behavior |
|------|----------|
| `'forceTarget'` | Rewrites destination IP to `target` — funnels all traffic through a single endpoint |
| `'block'` | Drops all traffic not explicitly in `allowList` |
| `'allow'` | Passes all traffic through (default, backward compatible) |
In **TUN mode**, destination policies are enforced via **nftables** rules (using `@push.rocks/smartnftables`). A 60-second health check automatically re-applies rules if they're removed externally.
In **socket mode**, the policy is evaluated in the userspace NAT engine before per-client ACLs.
### 🔗 Socket Forward Proxy Protocol
When using `forwardingMode: 'socket'` (userspace NAT), you can prepend **PROXY protocol v2 headers** on outbound TCP connections. This conveys the VPN client's tunnel IP as the source address to downstream services (e.g., SmartProxy):
```typescript
await server.start({
// ...
forwardingMode: 'socket',
socketForwardProxyProtocol: true, // downstream sees VPN client IP, not 127.0.0.1
});
```
### 📦 Packet Forwarding Modes ### 📦 Packet Forwarding Modes
SmartVPN supports three forwarding modes, configurable per-server and per-client: SmartVPN supports three forwarding modes, configurable per-server and per-client:
@@ -189,6 +259,30 @@ The userspace NAT mode extracts destination IP/port from IP packets, opens a rea
- **Dead-peer detection**: 180s inactivity timeout - **Dead-peer detection**: 180s inactivity timeout
- **MTU management**: Automatic overhead calculation (IP+TCP+WS+Noise = 79 bytes) - **MTU management**: Automatic overhead calculation (IP+TCP+WS+Noise = 79 bytes)
### 🏷️ Client Tags (Trusted vs Informational)
SmartVPN separates server-managed tags from client-reported tags:
| Field | Set By | Trust Level | Use For |
|-------|--------|-------------|---------|
| `serverDefinedClientTags` | Server admin (via `createClient` / `updateClient`) | ✅ Trusted | Access control, routing, billing |
| `clientDefinedClientTags` | Client (reported after connection) | ⚠️ Informational | Diagnostics, client self-identification |
| `tags` | *(deprecated)* | — | Legacy alias for `serverDefinedClientTags` |
```typescript
// Server-side: trusted tags
await server.createClient({
clientId: 'alice-laptop',
serverDefinedClientTags: ['engineering', 'office-berlin'],
});
// Client-side: informational tags (reported to server)
await client.connect({
// ...
clientDefinedClientTags: ['macOS', 'v2.1.0'],
});
```
### 🔄 Hub Client Management ### 🔄 Hub Client Management
The server acts as a **hub** — one API to manage all clients: The server acts as a **hub** — one API to manage all clients:
@@ -204,7 +298,7 @@ const all = await server.listRegisteredClients();
// Update (ACLs, tags, description, rate limits...) // Update (ACLs, tags, description, rate limits...)
await server.updateClient('bob-phone', { await server.updateClient('bob-phone', {
security: { destinationAllowList: ['0.0.0.0/0'] }, security: { destinationAllowList: ['0.0.0.0/0'] },
tags: ['mobile', 'field-ops'], serverDefinedClientTags: ['mobile', 'field-ops'],
}); });
// Enable / Disable // Enable / Disable
@@ -242,46 +336,100 @@ const conf = WgConfigGenerator.generateClientConfig({
// → standard WireGuard .conf compatible with wg-quick, iOS, Android // → standard WireGuard .conf compatible with wg-quick, iOS, Android
``` ```
Server configs too:
```typescript
const serverConf = WgConfigGenerator.generateServerConfig({
privateKey: '<server-wg-private-key>',
address: '10.8.0.1/24',
listenPort: 51820,
enableNat: true,
natInterface: 'eth0',
peers: [
{ publicKey: '<client-wg-public-key>', allowedIps: ['10.8.0.2/32'] },
],
});
```
### 🖥️ System Service Installation ### 🖥️ System Service Installation
Generate systemd (Linux) or launchd (macOS) service units:
```typescript ```typescript
import { VpnInstaller } from '@push.rocks/smartvpn'; import { VpnInstaller } from '@push.rocks/smartvpn';
const unit = VpnInstaller.generateServiceUnit({ const unit = VpnInstaller.generateServiceUnit({
binaryPath: '/usr/local/bin/smartvpn_daemon',
socketPath: '/var/run/smartvpn.sock',
mode: 'server', mode: 'server',
configPath: '/etc/smartvpn/server.json',
}); });
// unit.platform → 'linux' | 'macos' // unit.platform → 'linux' | 'macos'
// unit.content → systemd unit file or launchd plist // unit.content → systemd unit file or launchd plist
// unit.installPath → /etc/systemd/system/smartvpn-server.service // unit.installPath → /etc/systemd/system/smartvpn-server.service
``` ```
You can also call `generateSystemdUnit()` or `generateLaunchdPlist()` directly for platform-specific options like custom descriptions.
### 📢 Events
Both `VpnServer` and `VpnClient` extend `EventEmitter` and emit typed events:
```typescript
server.on('client-connected', (info: IVpnClientInfo) => {
console.log(`${info.registeredClientId} connected from ${info.remoteAddr} via ${info.transportType}`);
});
server.on('client-disconnected', ({ clientId, reason }) => {
console.log(`${clientId} disconnected: ${reason}`);
});
client.on('status', (status: IVpnStatus) => {
console.log(`State: ${status.state}, IP: ${status.assignedIp}`);
});
// Both server and client emit:
server.on('exit', ({ code, signal }) => { /* daemon process exited */ });
server.on('reconnected', () => { /* socket transport reconnected */ });
```
| Event | Emitted By | Payload |
|-------|-----------|---------|
| `status` | Both | `IVpnStatus` — connection state changes |
| `error` | Both | `{ message, code? }` |
| `client-connected` | Server | `IVpnClientInfo` — full client info including transport type |
| `client-disconnected` | Server | `{ clientId, reason? }` |
| `exit` | Both | `{ code, signal }` — daemon process exited |
| `reconnected` | Both | `void` — socket transport reconnected |
## API Reference 📖 ## API Reference 📖
### Classes ### Classes
| Class | Description | | Class | Description |
|-------|-------------| |-------|-------------|
| `VpnServer` | Manages the Rust daemon in server mode. Hub methods for client CRUD. | | `VpnServer` | Manages the Rust daemon in server mode. Hub methods for client CRUD, telemetry, rate limits, WireGuard peer management. |
| `VpnClient` | Manages the Rust daemon in client mode. Connect, disconnect, telemetry. | | `VpnClient` | Manages the Rust daemon in client mode. Connect, disconnect, status, telemetry. |
| `VpnBridge<T>` | Low-level typed IPC bridge (stdio or Unix socket). | | `VpnBridge<T>` | Low-level typed IPC bridge (stdio or Unix socket). Handles spawn, connect, reconnect, and typed command dispatch. |
| `VpnConfig` | Static config validation and file I/O. | | `VpnConfig` | Static config validation and JSON file I/O. Validates keys, addresses, CIDRs, MTU, etc. |
| `VpnInstaller` | Generates systemd/launchd service files. | | `VpnInstaller` | Generates systemd/launchd service files for daemon deployment. |
| `WgConfigGenerator` | Generates standard WireGuard `.conf` files. | | `WgConfigGenerator` | Generates standard WireGuard `.conf` files (client and server). |
### Key Interfaces ### Key Interfaces
| Interface | Purpose | | Interface | Purpose |
|-----------|---------| |-----------|---------|
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode, clients, proxy protocol) | | `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode, clients, proxy protocol, destination policy) |
| `IVpnClientConfig` | Client configuration (server URL, keys, transport, forwarding mode, WG options) | | `IVpnClientConfig` | Client configuration (server URL, keys, transport, forwarding mode, WG options, client-defined tags) |
| `IClientEntry` | Server-side client definition (ID, keys, security, priority, tags, expiry) | | `IClientEntry` | Server-side client definition (ID, keys, security, priority, server/client tags, expiry) |
| `IClientSecurity` | Per-client ACLs and rate limits (SmartProxy-aligned naming) | | `IClientSecurity` | Per-client ACLs and rate limits (SmartProxy-aligned naming) |
| `IClientRateLimit` | Rate limiting config (bytesPerSec, burstBytes) | | `IClientRateLimit` | Rate limiting config (bytesPerSec, burstBytes) |
| `IClientConfigBundle` | Full config bundle returned by `createClient()` | | `IClientConfigBundle` | Full config bundle returned by `createClient()` — includes SmartVPN config, WireGuard .conf, and secrets |
| `IVpnClientInfo` | Connected client info (IP, stats, authenticated key, remote addr) | | `IVpnClientInfo` | Connected client info (IP, stats, authenticated key, remote addr, transport type) |
| `IVpnConnectionQuality` | RTT, jitter, loss ratio, link health | | `IVpnConnectionQuality` | RTT, jitter, loss ratio, link health |
| `IVpnMtuInfo` | TUN MTU, effective MTU, overhead bytes, oversized packet stats |
| `IVpnKeypair` | Base64-encoded public/private key pair | | `IVpnKeypair` | Base64-encoded public/private key pair |
| `IDestinationPolicy` | Destination routing policy (forceTarget / block / allow with allow/block lists) |
| `IVpnEventMap` | Typed event map for server and client EventEmitter |
### Server IPC Commands ### Server IPC Commands
@@ -313,19 +461,24 @@ const unit = VpnInstaller.generateServiceUnit({
### Server Configuration ### Server Configuration
```typescript ```typescript
// All transports simultaneously (default) — WS + QUIC + WireGuard
{ transportMode: 'all', listenAddr: '0.0.0.0:443', wgPrivateKey: '...', wgListenPort: 51820 }
// WS + QUIC only
{ transportMode: 'both', listenAddr: '0.0.0.0:443', quicListenAddr: '0.0.0.0:4433' }
// WebSocket only // WebSocket only
{ transportMode: 'websocket', listenAddr: '0.0.0.0:443' } { transportMode: 'websocket', listenAddr: '0.0.0.0:443' }
// QUIC only // QUIC only
{ transportMode: 'quic', listenAddr: '0.0.0.0:443' } { transportMode: 'quic', listenAddr: '0.0.0.0:443' }
// Both (WS + QUIC on same or different ports) // WireGuard only
{ transportMode: 'both', listenAddr: '0.0.0.0:443', quicListenAddr: '0.0.0.0:4433' } { transportMode: 'wireguard', wgPrivateKey: '...', wgListenPort: 51820, wgPeers: [...] }
// WireGuard
{ transportMode: 'wireguard', wgListenPort: 51820, wgPeers: [...] }
``` ```
All transport modes share the same `forwardingMode` — WireGuard peers can use `'socket'` (userspace NAT) just like WS/QUIC clients.
### Client Configuration ### Client Configuration
```typescript ```typescript
@@ -370,7 +523,7 @@ pnpm install
# Build (TypeScript + Rust cross-compile) # Build (TypeScript + Rust cross-compile)
pnpm build pnpm build
# Run all tests (79 TS + 132 Rust = 211 tests) # Run all tests
pnpm test pnpm test
# Run Rust tests directly # Run Rust tests directly
@@ -387,6 +540,7 @@ smartvpn/
├── ts/ # TypeScript control plane ├── ts/ # TypeScript control plane
│ ├── index.ts # All exports │ ├── index.ts # All exports
│ ├── smartvpn.interfaces.ts # Interfaces, types, IPC command maps │ ├── smartvpn.interfaces.ts # Interfaces, types, IPC command maps
│ ├── smartvpn.plugins.ts # Dependency imports
│ ├── smartvpn.classes.vpnserver.ts │ ├── smartvpn.classes.vpnserver.ts
│ ├── smartvpn.classes.vpnclient.ts │ ├── smartvpn.classes.vpnclient.ts
│ ├── smartvpn.classes.vpnbridge.ts │ ├── smartvpn.classes.vpnbridge.ts
@@ -411,14 +565,14 @@ smartvpn/
│ ├── ratelimit.rs # Token bucket │ ├── ratelimit.rs # Token bucket
│ ├── userspace_nat.rs # Userspace TCP/UDP NAT proxy │ ├── userspace_nat.rs # Userspace TCP/UDP NAT proxy
│ └── ... # tunnel, network, telemetry, qos, mtu, reconnect │ └── ... # tunnel, network, telemetry, qos, mtu, reconnect
├── test/ # 9 test files (79 tests) ├── test/ # Test files
├── dist_ts/ # Compiled TypeScript ├── dist_ts/ # Compiled TypeScript
└── dist_rust/ # Cross-compiled binaries (linux amd64 + arm64) └── dist_rust/ # Cross-compiled binaries (linux amd64 + arm64)
``` ```
## License and Legal Information ## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license.md) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

View File

@@ -78,7 +78,7 @@ pub fn check_acl(security: &ClientSecurity, src_ip: Ipv4Addr, dst_ip: Ipv4Addr)
/// Check if `ip` matches any pattern in the list. /// Check if `ip` matches any pattern in the list.
/// Supports: exact IP, CIDR notation, wildcard patterns (192.168.1.*), /// Supports: exact IP, CIDR notation, wildcard patterns (192.168.1.*),
/// and IP ranges (192.168.1.1-192.168.1.100). /// and IP ranges (192.168.1.1-192.168.1.100).
fn ip_matches_any(ip: Ipv4Addr, patterns: &[String]) -> bool { pub fn ip_matches_any(ip: Ipv4Addr, patterns: &[String]) -> bool {
for pattern in patterns { for pattern in patterns {
if ip_matches(ip, pattern) { if ip_matches(ip, pattern) {
return true; return true;

View File

@@ -44,7 +44,12 @@ pub struct ClientEntry {
pub priority: Option<u32>, pub priority: Option<u32>,
/// Whether this client is enabled (default: true). /// Whether this client is enabled (default: true).
pub enabled: Option<bool>, pub enabled: Option<bool>,
/// Tags for grouping. /// Tags assigned by the server admin — trusted, used for access control.
pub server_defined_client_tags: Option<Vec<String>>,
/// Tags reported by the connecting client — informational only.
pub client_defined_client_tags: Option<Vec<String>>,
/// Legacy tags field — treated as serverDefinedClientTags during deserialization.
#[serde(default)]
pub tags: Option<Vec<String>>, pub tags: Option<Vec<String>>,
/// Optional description. /// Optional description.
pub description: Option<String>, pub description: Option<String>,
@@ -90,7 +95,11 @@ impl ClientRegistry {
/// Build a registry from a list of client entries. /// Build a registry from a list of client entries.
pub fn from_entries(entries: Vec<ClientEntry>) -> Result<Self> { pub fn from_entries(entries: Vec<ClientEntry>) -> Result<Self> {
let mut registry = Self::new(); let mut registry = Self::new();
for entry in entries { for mut entry in entries {
// Migrate legacy `tags` → `serverDefinedClientTags`
if entry.server_defined_client_tags.is_none() && entry.tags.is_some() {
entry.server_defined_client_tags = entry.tags.take();
}
registry.add(entry)?; registry.add(entry)?;
} }
Ok(registry) Ok(registry)
@@ -193,6 +202,8 @@ mod tests {
security: None, security: None,
priority: None, priority: None,
enabled: None, enabled: None,
server_defined_client_tags: None,
client_defined_client_tags: None,
tags: None, tags: None,
description: None, description: None,
expires_at: None, expires_at: None,

View File

@@ -7,7 +7,7 @@ use tracing::{info, error, warn};
use crate::client::{ClientConfig, VpnClient}; use crate::client::{ClientConfig, VpnClient};
use crate::crypto; use crate::crypto;
use crate::server::{ServerConfig, VpnServer}; use crate::server::{ServerConfig, VpnServer};
use crate::wireguard::{self, WgClient, WgClientConfig, WgPeerConfig, WgServer, WgServerConfig}; use crate::wireguard::{self, WgClient, WgClientConfig, WgPeerConfig};
// ============================================================================ // ============================================================================
// IPC protocol types // IPC protocol types
@@ -95,7 +95,6 @@ pub async fn management_loop_stdio(mode: &str) -> Result<()> {
let mut vpn_client = VpnClient::new(); let mut vpn_client = VpnClient::new();
let mut vpn_server = VpnServer::new(); let mut vpn_server = VpnServer::new();
let mut wg_client = WgClient::new(); let mut wg_client = WgClient::new();
let mut wg_server = WgServer::new();
send_event_stdout("ready", serde_json::json!({ "mode": mode })); send_event_stdout("ready", serde_json::json!({ "mode": mode }));
@@ -131,7 +130,7 @@ pub async fn management_loop_stdio(mode: &str) -> Result<()> {
let response = match mode { let response = match mode {
"client" => handle_client_request(&request, &mut vpn_client, &mut wg_client).await, "client" => handle_client_request(&request, &mut vpn_client, &mut wg_client).await,
"server" => handle_server_request(&request, &mut vpn_server, &mut wg_server).await, "server" => handle_server_request(&request, &mut vpn_server).await,
_ => ManagementResponse::err(request.id.clone(), format!("Unknown mode: {}", mode)), _ => ManagementResponse::err(request.id.clone(), format!("Unknown mode: {}", mode)),
}; };
send_response_stdout(&response); send_response_stdout(&response);
@@ -154,7 +153,6 @@ pub async fn management_loop_socket(socket_path: &str, mode: &str) -> Result<()>
let vpn_client = std::sync::Arc::new(Mutex::new(VpnClient::new())); let vpn_client = std::sync::Arc::new(Mutex::new(VpnClient::new()));
let vpn_server = std::sync::Arc::new(Mutex::new(VpnServer::new())); let vpn_server = std::sync::Arc::new(Mutex::new(VpnServer::new()));
let wg_client = std::sync::Arc::new(Mutex::new(WgClient::new())); let wg_client = std::sync::Arc::new(Mutex::new(WgClient::new()));
let wg_server = std::sync::Arc::new(Mutex::new(WgServer::new()));
loop { loop {
match listener.accept().await { match listener.accept().await {
@@ -163,10 +161,9 @@ pub async fn management_loop_socket(socket_path: &str, mode: &str) -> Result<()>
let client = vpn_client.clone(); let client = vpn_client.clone();
let server = vpn_server.clone(); let server = vpn_server.clone();
let wg_c = wg_client.clone(); let wg_c = wg_client.clone();
let wg_s = wg_server.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = if let Err(e) =
handle_socket_connection(stream, &mode, client, server, wg_c, wg_s).await handle_socket_connection(stream, &mode, client, server, wg_c).await
{ {
warn!("Socket connection error: {}", e); warn!("Socket connection error: {}", e);
} }
@@ -185,7 +182,6 @@ async fn handle_socket_connection(
vpn_client: std::sync::Arc<Mutex<VpnClient>>, vpn_client: std::sync::Arc<Mutex<VpnClient>>,
vpn_server: std::sync::Arc<Mutex<VpnServer>>, vpn_server: std::sync::Arc<Mutex<VpnServer>>,
wg_client: std::sync::Arc<Mutex<WgClient>>, wg_client: std::sync::Arc<Mutex<WgClient>>,
wg_server: std::sync::Arc<Mutex<WgServer>>,
) -> Result<()> { ) -> Result<()> {
let (reader, mut writer) = stream.into_split(); let (reader, mut writer) = stream.into_split();
let buf_reader = BufReader::new(reader); let buf_reader = BufReader::new(reader);
@@ -241,8 +237,7 @@ async fn handle_socket_connection(
} }
"server" => { "server" => {
let mut server = vpn_server.lock().await; let mut server = vpn_server.lock().await;
let mut wg_s = wg_server.lock().await; handle_server_request(&request, &mut server).await
handle_server_request(&request, &mut server, &mut wg_s).await
} }
_ => ManagementResponse::err(request.id.clone(), format!("Unknown mode: {}", mode)), _ => ManagementResponse::err(request.id.clone(), format!("Unknown mode: {}", mode)),
}; };
@@ -381,33 +376,11 @@ async fn handle_client_request(
async fn handle_server_request( async fn handle_server_request(
request: &ManagementRequest, request: &ManagementRequest,
vpn_server: &mut VpnServer, vpn_server: &mut VpnServer,
wg_server: &mut WgServer,
) -> ManagementResponse { ) -> ManagementResponse {
let id = request.id.clone(); let id = request.id.clone();
match request.method.as_str() { match request.method.as_str() {
"start" => { "start" => {
// Check if transportMode is "wireguard"
let transport_mode = request.params
.get("config")
.and_then(|c| c.get("transportMode"))
.and_then(|t| t.as_str())
.unwrap_or("");
if transport_mode == "wireguard" {
let config: WgServerConfig = match serde_json::from_value(
request.params.get("config").cloned().unwrap_or_default(),
) {
Ok(c) => c,
Err(e) => {
return ManagementResponse::err(id, format!("Invalid WG config: {}", e));
}
};
match wg_server.start(config).await {
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
Err(e) => ManagementResponse::err(id, format!("WG start failed: {}", e)),
}
} else {
let config: ServerConfig = match serde_json::from_value( let config: ServerConfig = match serde_json::from_value(
request.params.get("config").cloned().unwrap_or_default(), request.params.get("config").cloned().unwrap_or_default(),
) { ) {
@@ -421,54 +394,30 @@ async fn handle_server_request(
Err(e) => ManagementResponse::err(id, format!("Start failed: {}", e)), Err(e) => ManagementResponse::err(id, format!("Start failed: {}", e)),
} }
} }
}
"stop" => { "stop" => {
if wg_server.is_running() {
match wg_server.stop().await {
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
Err(e) => ManagementResponse::err(id, format!("WG stop failed: {}", e)),
}
} else {
match vpn_server.stop().await { match vpn_server.stop().await {
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})), Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
Err(e) => ManagementResponse::err(id, format!("Stop failed: {}", e)), Err(e) => ManagementResponse::err(id, format!("Stop failed: {}", e)),
} }
} }
}
"getStatus" => { "getStatus" => {
if wg_server.is_running() {
ManagementResponse::ok(id, wg_server.get_status())
} else {
let status = vpn_server.get_status(); let status = vpn_server.get_status();
ManagementResponse::ok(id, status) ManagementResponse::ok(id, status)
} }
}
"getStatistics" => { "getStatistics" => {
if wg_server.is_running() {
ManagementResponse::ok(id, wg_server.get_statistics().await)
} else {
let stats = vpn_server.get_statistics().await; let stats = vpn_server.get_statistics().await;
match serde_json::to_value(&stats) { match serde_json::to_value(&stats) {
Ok(v) => ManagementResponse::ok(id, v), Ok(v) => ManagementResponse::ok(id, v),
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)), Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
} }
} }
}
"listClients" => { "listClients" => {
if wg_server.is_running() {
let peers = wg_server.list_peers().await;
match serde_json::to_value(&peers) {
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "clients": v })),
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
}
} else {
let clients = vpn_server.list_clients().await; let clients = vpn_server.list_clients().await;
match serde_json::to_value(&clients) { match serde_json::to_value(&clients) {
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "clients": v })), Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "clients": v })),
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)), Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),
} }
} }
}
"disconnectClient" => { "disconnectClient" => {
let client_id = match request.params.get("clientId").and_then(|v| v.as_str()) { let client_id = match request.params.get("clientId").and_then(|v| v.as_str()) {
Some(id) => id.to_string(), Some(id) => id.to_string(),
@@ -546,9 +495,6 @@ async fn handle_server_request(
) )
} }
"addWgPeer" => { "addWgPeer" => {
if !wg_server.is_running() {
return ManagementResponse::err(id, "WireGuard server not running".to_string());
}
let config: WgPeerConfig = match serde_json::from_value( let config: WgPeerConfig = match serde_json::from_value(
request.params.get("peer").cloned().unwrap_or_default(), request.params.get("peer").cloned().unwrap_or_default(),
) { ) {
@@ -557,29 +503,23 @@ async fn handle_server_request(
return ManagementResponse::err(id, format!("Invalid peer config: {}", e)); return ManagementResponse::err(id, format!("Invalid peer config: {}", e));
} }
}; };
match wg_server.add_peer(config).await { match vpn_server.add_wg_peer(config).await {
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})), Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
Err(e) => ManagementResponse::err(id, format!("Add peer failed: {}", e)), Err(e) => ManagementResponse::err(id, format!("Add peer failed: {}", e)),
} }
} }
"removeWgPeer" => { "removeWgPeer" => {
if !wg_server.is_running() {
return ManagementResponse::err(id, "WireGuard server not running".to_string());
}
let public_key = match request.params.get("publicKey").and_then(|v| v.as_str()) { let public_key = match request.params.get("publicKey").and_then(|v| v.as_str()) {
Some(k) => k.to_string(), Some(k) => k.to_string(),
None => return ManagementResponse::err(id, "Missing publicKey".to_string()), None => return ManagementResponse::err(id, "Missing publicKey".to_string()),
}; };
match wg_server.remove_peer(&public_key).await { match vpn_server.remove_wg_peer(&public_key).await {
Ok(()) => ManagementResponse::ok(id, serde_json::json!({})), Ok(()) => ManagementResponse::ok(id, serde_json::json!({})),
Err(e) => ManagementResponse::err(id, format!("Remove peer failed: {}", e)), Err(e) => ManagementResponse::err(id, format!("Remove peer failed: {}", e)),
} }
} }
"listWgPeers" => { "listWgPeers" => {
if !wg_server.is_running() { let peers = vpn_server.list_wg_peers().await;
return ManagementResponse::err(id, "WireGuard server not running".to_string());
}
let peers = wg_server.list_peers().await;
match serde_json::to_value(&peers) { match serde_json::to_value(&peers) {
Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "peers": v })), Ok(v) => ManagementResponse::ok(id, serde_json::json!({ "peers": v })),
Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)), Err(e) => ManagementResponse::err(id, format!("Serialize error: {}", e)),

View File

@@ -86,6 +86,16 @@ impl IpPool {
client_id client_id
} }
/// Reserve a specific IP for a client (e.g., WireGuard static IP from allowed_ips).
pub fn reserve(&mut self, ip: Ipv4Addr, client_id: &str) -> Result<()> {
if self.allocated.contains_key(&ip) {
anyhow::bail!("IP {} is already allocated", ip);
}
self.allocated.insert(ip, client_id.to_string());
info!("Reserved IP {} for client {}", ip, client_id);
Ok(())
}
/// Number of currently allocated IPs. /// Number of currently allocated IPs.
pub fn allocated_count(&self) -> usize { pub fn allocated_count(&self) -> usize {
self.allocated.len() self.allocated.len()

View File

@@ -24,6 +24,20 @@ use crate::tunnel::{self, TunConfig};
/// Dead-peer timeout: 3x max keepalive interval (Healthy=60s). /// Dead-peer timeout: 3x max keepalive interval (Healthy=60s).
const DEAD_PEER_TIMEOUT: Duration = Duration::from_secs(180); const DEAD_PEER_TIMEOUT: Duration = Duration::from_secs(180);
/// Destination routing policy for VPN client traffic.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DestinationPolicyConfig {
/// Default action: "forceTarget", "block", or "allow".
pub default: String,
/// Target IP for "forceTarget" mode (e.g. "127.0.0.1").
pub target: Option<String>,
/// Destinations that pass through directly (not rewritten, not blocked).
pub allow_list: Option<Vec<String>>,
/// Destinations always blocked (overrides allowList, deny wins).
pub block_list: Option<Vec<String>>,
}
/// Server configuration (matches TS IVpnServerConfig). /// Server configuration (matches TS IVpnServerConfig).
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@@ -58,6 +72,18 @@ pub struct ServerConfig {
pub proxy_protocol: Option<bool>, pub proxy_protocol: Option<bool>,
/// Server-level IP block list — applied at TCP accept, before Noise handshake. /// Server-level IP block list — applied at TCP accept, before Noise handshake.
pub connection_ip_block_list: Option<Vec<String>>, pub connection_ip_block_list: Option<Vec<String>>,
/// When true and forwarding_mode is "socket", the userspace NAT engine prepends
/// PROXY protocol v2 headers on outbound TCP connections, conveying the VPN client's
/// tunnel IP as the source address.
pub socket_forward_proxy_protocol: Option<bool>,
/// Destination routing policy for VPN client traffic (socket mode).
pub destination_policy: Option<DestinationPolicyConfig>,
/// WireGuard: server X25519 private key (base64). Required when transport includes WG.
pub wg_private_key: Option<String>,
/// WireGuard: UDP listen port (default: 51820).
pub wg_listen_port: Option<u16>,
/// WireGuard: pre-configured peers.
pub wg_peers: Option<Vec<crate::wireguard::WgPeerConfig>>,
} }
/// Information about a connected client. /// Information about a connected client.
@@ -81,6 +107,8 @@ pub struct ClientInfo {
pub registered_client_id: String, pub registered_client_id: String,
/// Real client IP:port (from PROXY protocol header or direct TCP connection). /// Real client IP:port (from PROXY protocol header or direct TCP connection).
pub remote_addr: Option<String>, pub remote_addr: Option<String>,
/// Transport used for this connection: "websocket", "quic", or "wireguard".
pub transport_type: String,
} }
/// Server statistics. /// Server statistics.
@@ -130,6 +158,7 @@ pub struct ServerState {
pub struct VpnServer { pub struct VpnServer {
state: Option<Arc<ServerState>>, state: Option<Arc<ServerState>>,
shutdown_tx: Option<mpsc::Sender<()>>, shutdown_tx: Option<mpsc::Sender<()>>,
wg_command_tx: Option<mpsc::Sender<crate::wireguard::WgCommand>>,
} }
impl VpnServer { impl VpnServer {
@@ -137,6 +166,7 @@ impl VpnServer {
Self { Self {
state: None, state: None,
shutdown_tx: None, shutdown_tx: None,
wg_command_tx: None,
} }
} }
@@ -241,10 +271,13 @@ impl VpnServer {
} }
ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx } => { ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx } => {
*state.forwarding_engine.lock().await = ForwardingEngine::Socket(packet_tx); *state.forwarding_engine.lock().await = ForwardingEngine::Socket(packet_tx);
let proxy_protocol = config.socket_forward_proxy_protocol.unwrap_or(false);
let nat_engine = crate::userspace_nat::NatEngine::new( let nat_engine = crate::userspace_nat::NatEngine::new(
gateway_ip, gateway_ip,
link_mtu as usize, link_mtu as usize,
state.clone(), state.clone(),
proxy_protocol,
config.destination_policy.clone(),
); );
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = nat_engine.run(packet_rx, shutdown_rx).await { if let Err(e) = nat_engine.run(packet_rx, shutdown_rx).await {
@@ -255,59 +288,79 @@ impl VpnServer {
ForwardingSetup::Testing => {} ForwardingSetup::Testing => {}
} }
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
self.state = Some(state.clone()); self.state = Some(state.clone());
self.shutdown_tx = Some(shutdown_tx);
let transport_mode = config.transport_mode.as_deref().unwrap_or("both"); let transport_mode = config.transport_mode.as_deref().unwrap_or("all");
let listen_addr = config.listen_addr.clone(); let listen_addr = config.listen_addr.clone();
match transport_mode { // Determine if WG should be included
"quic" => { let include_wg = config.wg_private_key.is_some()
let quic_addr = config.quic_listen_addr.clone().unwrap_or_else(|| listen_addr.clone()); && matches!(transport_mode, "all" | "wireguard");
let idle_timeout = config.quic_idle_timeout_secs.unwrap_or(30);
tokio::spawn(async move {
if let Err(e) = run_quic_listener(state, quic_addr, idle_timeout, &mut shutdown_rx).await {
error!("QUIC listener error: {}", e);
}
});
}
"both" => {
let quic_addr = config.quic_listen_addr.clone().unwrap_or_else(|| listen_addr.clone());
let idle_timeout = config.quic_idle_timeout_secs.unwrap_or(30);
let state2 = state.clone();
let (shutdown_tx2, mut shutdown_rx2) = mpsc::channel::<()>(1);
// Store second shutdown sender so both listeners stop
let shutdown_tx_orig = self.shutdown_tx.take().unwrap();
let (combined_tx, mut combined_rx) = mpsc::channel::<()>(1);
self.shutdown_tx = Some(combined_tx);
// Forward combined shutdown to both listeners // Collect shutdown senders for all listeners
tokio::spawn(async move { let mut listener_shutdown_txs: Vec<mpsc::Sender<()>> = Vec::new();
combined_rx.recv().await;
let _ = shutdown_tx_orig.send(()).await;
let _ = shutdown_tx2.send(()).await;
});
// Spawn transport listeners based on mode
let spawn_ws = matches!(transport_mode, "all" | "both" | "websocket");
let spawn_quic = matches!(transport_mode, "all" | "both" | "quic");
if spawn_ws {
let (tx, mut rx) = mpsc::channel::<()>(1);
listener_shutdown_txs.push(tx);
let ws_state = state.clone();
let ws_addr = listen_addr.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = run_ws_listener(state, listen_addr, &mut shutdown_rx).await { if let Err(e) = run_ws_listener(ws_state, ws_addr, &mut rx).await {
error!("WebSocket listener error: {}", e); error!("WebSocket listener error: {}", e);
} }
}); });
}
if spawn_quic {
let quic_addr = config.quic_listen_addr.clone().unwrap_or_else(|| listen_addr.clone());
let idle_timeout = config.quic_idle_timeout_secs.unwrap_or(30);
let (tx, mut rx) = mpsc::channel::<()>(1);
listener_shutdown_txs.push(tx);
let quic_state = state.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = run_quic_listener(state2, quic_addr, idle_timeout, &mut shutdown_rx2).await { if let Err(e) = run_quic_listener(quic_state, quic_addr, idle_timeout, &mut rx).await {
error!("QUIC listener error: {}", e); error!("QUIC listener error: {}", e);
} }
}); });
} }
_ => {
// "websocket" (default) if include_wg {
let wg_config = crate::wireguard::WgListenerConfig {
private_key: config.wg_private_key.clone().unwrap(),
listen_port: config.wg_listen_port.unwrap_or(51820),
peers: config.wg_peers.clone().unwrap_or_default(),
};
let (tx, rx) = mpsc::channel::<()>(1);
listener_shutdown_txs.push(tx);
let (cmd_tx, cmd_rx) = mpsc::channel::<crate::wireguard::WgCommand>(32);
self.wg_command_tx = Some(cmd_tx);
let wg_state = state.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = run_ws_listener(state, listen_addr, &mut shutdown_rx).await { if let Err(e) = crate::wireguard::run_wg_listener(wg_state, wg_config, rx, cmd_rx).await {
error!("Server listener error: {}", e); error!("WireGuard listener error: {}", e);
} }
}); });
} }
// Replace self.shutdown_tx with a combined sender that fans out to all listeners
if listener_shutdown_txs.len() > 1 {
let (combined_tx, mut combined_rx) = mpsc::channel::<()>(1);
// Take the original shutdown_tx (from line above)
let _ = self.shutdown_tx.take();
self.shutdown_tx = Some(combined_tx);
tokio::spawn(async move {
combined_rx.recv().await;
for tx in listener_shutdown_txs {
let _ = tx.send(()).await;
}
});
} else if let Some(single_tx) = listener_shutdown_txs.into_iter().next() {
self.shutdown_tx = Some(single_tx);
} }
info!("VPN server started (transport: {})", transport_mode); info!("VPN server started (transport: {})", transport_mode);
@@ -346,6 +399,7 @@ impl VpnServer {
if let Some(tx) = self.shutdown_tx.take() { if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(()).await; let _ = tx.send(()).await;
} }
self.wg_command_tx = None;
self.state = None; self.state = None;
info!("VPN server stopped"); info!("VPN server stopped");
Ok(()) Ok(())
@@ -434,6 +488,54 @@ impl VpnServer {
Ok(()) Ok(())
} }
// ── WireGuard Peer Management ────────────────────────────────────────
/// Add a WireGuard peer dynamically (delegates to the WG event loop).
pub async fn add_wg_peer(&self, config: crate::wireguard::WgPeerConfig) -> Result<()> {
let tx = self.wg_command_tx.as_ref()
.ok_or_else(|| anyhow::anyhow!("WireGuard listener not running"))?;
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
tx.send(crate::wireguard::WgCommand::AddPeer(config, resp_tx))
.await
.map_err(|_| anyhow::anyhow!("WG event loop closed"))?;
resp_rx.await.map_err(|_| anyhow::anyhow!("No response from WG loop"))?
}
/// Remove a WireGuard peer dynamically (delegates to the WG event loop).
pub async fn remove_wg_peer(&self, public_key: &str) -> Result<()> {
let tx = self.wg_command_tx.as_ref()
.ok_or_else(|| anyhow::anyhow!("WireGuard listener not running"))?;
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
tx.send(crate::wireguard::WgCommand::RemovePeer(public_key.to_string(), resp_tx))
.await
.map_err(|_| anyhow::anyhow!("WG event loop closed"))?;
resp_rx.await.map_err(|_| anyhow::anyhow!("No response from WG loop"))?
}
/// List WireGuard peers from the unified client list.
pub async fn list_wg_peers(&self) -> Vec<crate::wireguard::WgPeerInfo> {
if let Some(ref state) = self.state {
state.clients.read().await.values()
.filter(|c| c.transport_type == "wireguard")
.map(|c| crate::wireguard::WgPeerInfo {
public_key: c.authenticated_key.clone(),
allowed_ips: vec![format!("{}/32", c.assigned_ip)],
endpoint: c.remote_addr.clone(),
persistent_keepalive: None,
stats: crate::wireguard::WgPeerStats {
bytes_sent: c.bytes_sent,
bytes_received: c.bytes_received,
packets_sent: 0,
packets_received: 0,
last_handshake_time: None,
},
})
.collect()
} else {
Vec::new()
}
}
// ── Client Registry (Hub) Methods ─────────────────────────────────── // ── Client Registry (Hub) Methods ───────────────────────────────────
/// Create a new client entry. Generates keypairs and assigns an IP. /// Create a new client entry. Generates keypairs and assigns an IP.
@@ -466,9 +568,16 @@ impl VpnServer {
).ok(), ).ok(),
priority: partial.get("priority").and_then(|v| v.as_u64()).map(|v| v as u32), priority: partial.get("priority").and_then(|v| v.as_u64()).map(|v| v as u32),
enabled: partial.get("enabled").and_then(|v| v.as_bool()).or(Some(true)), enabled: partial.get("enabled").and_then(|v| v.as_bool()).or(Some(true)),
tags: partial.get("tags").and_then(|v| { server_defined_client_tags: partial.get("serverDefinedClientTags").and_then(|v| {
v.as_array().map(|a| a.iter().filter_map(|s| s.as_str().map(String::from)).collect()) v.as_array().map(|a| a.iter().filter_map(|s| s.as_str().map(String::from)).collect())
}).or_else(|| {
// Legacy: accept "tags" as serverDefinedClientTags
partial.get("tags").and_then(|v| {
v.as_array().map(|a| a.iter().filter_map(|s| s.as_str().map(String::from)).collect())
})
}), }),
client_defined_client_tags: None, // Only set by connecting client
tags: None, // Legacy field — not used for new entries
description: partial.get("description").and_then(|v| v.as_str()).map(String::from), description: partial.get("description").and_then(|v| v.as_str()).map(String::from),
expires_at: partial.get("expiresAt").and_then(|v| v.as_str()).map(String::from), expires_at: partial.get("expiresAt").and_then(|v| v.as_str()).map(String::from),
assigned_ip: Some(assigned_ip.to_string()), assigned_ip: Some(assigned_ip.to_string()),
@@ -563,8 +672,11 @@ impl VpnServer {
if let Some(enabled) = update.get("enabled").and_then(|v| v.as_bool()) { if let Some(enabled) = update.get("enabled").and_then(|v| v.as_bool()) {
entry.enabled = Some(enabled); entry.enabled = Some(enabled);
} }
if let Some(tags) = update.get("tags").and_then(|v| v.as_array()) { if let Some(tags) = update.get("serverDefinedClientTags").and_then(|v| v.as_array()) {
entry.tags = Some(tags.iter().filter_map(|s| s.as_str().map(String::from)).collect()); entry.server_defined_client_tags = Some(tags.iter().filter_map(|s| s.as_str().map(String::from)).collect());
} else if let Some(tags) = update.get("tags").and_then(|v| v.as_array()) {
// Legacy: accept "tags" as serverDefinedClientTags
entry.server_defined_client_tags = Some(tags.iter().filter_map(|s| s.as_str().map(String::from)).collect());
} }
if let Some(desc) = update.get("description").and_then(|v| v.as_str()) { if let Some(desc) = update.get("description").and_then(|v| v.as_str()) {
entry.description = Some(desc.to_string()); entry.description = Some(desc.to_string());
@@ -751,6 +863,7 @@ async fn run_ws_listener(
Box::new(sink), Box::new(sink),
Box::new(stream), Box::new(stream),
remote_addr, remote_addr,
"websocket",
).await { ).await {
warn!("Client connection error: {}", e); warn!("Client connection error: {}", e);
} }
@@ -827,6 +940,7 @@ async fn run_quic_listener(
Box::new(sink), Box::new(sink),
Box::new(stream), Box::new(stream),
Some(remote), Some(remote),
"quic",
).await { ).await {
warn!("QUIC client error: {}", e); warn!("QUIC client error: {}", e);
} }
@@ -916,6 +1030,7 @@ async fn handle_client_connection(
mut sink: Box<dyn TransportSink>, mut sink: Box<dyn TransportSink>,
mut stream: Box<dyn TransportStream>, mut stream: Box<dyn TransportStream>,
remote_addr: Option<std::net::SocketAddr>, remote_addr: Option<std::net::SocketAddr>,
transport_type: &str,
) -> Result<()> { ) -> Result<()> {
let server_private_key = base64::Engine::decode( let server_private_key = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD, &base64::engine::general_purpose::STANDARD,
@@ -1054,6 +1169,7 @@ async fn handle_client_connection(
authenticated_key: client_pub_key_b64.clone(), authenticated_key: client_pub_key_b64.clone(),
registered_client_id: registered_client_id.clone(), registered_client_id: registered_client_id.clone(),
remote_addr: remote_addr.map(|a| a.to_string()), remote_addr: remote_addr.map(|a| a.to_string()),
transport_type: transport_type.to_string(),
}; };
state.clients.write().await.insert(client_id.clone(), client_info); state.clients.write().await.insert(client_id.clone(), client_info);

View File

@@ -13,7 +13,8 @@ use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::server::ServerState; use crate::acl;
use crate::server::{DestinationPolicyConfig, ServerState};
use crate::tunnel; use crate::tunnel;
// ============================================================================ // ============================================================================
@@ -191,10 +192,25 @@ pub struct NatEngine {
bridge_rx: mpsc::Receiver<BridgeMessage>, bridge_rx: mpsc::Receiver<BridgeMessage>,
bridge_tx: mpsc::Sender<BridgeMessage>, bridge_tx: mpsc::Sender<BridgeMessage>,
start_time: std::time::Instant, start_time: std::time::Instant,
/// When true, outbound TCP connections prepend PROXY protocol v2 headers
/// with the VPN client's tunnel IP as source address.
proxy_protocol: bool,
/// Destination routing policy: forceTarget, block, or allow.
destination_policy: Option<DestinationPolicyConfig>,
}
/// Result of destination policy evaluation.
enum DestinationAction {
/// Connect to the original destination.
PassThrough(SocketAddr),
/// Redirect to a target IP, preserving original port.
ForceTarget(SocketAddr),
/// Drop the packet silently.
Drop,
} }
impl NatEngine { impl NatEngine {
pub fn new(gateway_ip: Ipv4Addr, mtu: usize, state: Arc<ServerState>) -> Self { pub fn new(gateway_ip: Ipv4Addr, mtu: usize, state: Arc<ServerState>, proxy_protocol: bool, destination_policy: Option<DestinationPolicyConfig>) -> Self {
let mut device = VirtualIpDevice::new(mtu); let mut device = VirtualIpDevice::new(mtu);
let config = Config::new(HardwareAddress::Ip); let config = Config::new(HardwareAddress::Ip);
let now = smoltcp::time::Instant::from_millis(0); let now = smoltcp::time::Instant::from_millis(0);
@@ -226,6 +242,8 @@ impl NatEngine {
bridge_rx, bridge_rx,
bridge_tx, bridge_tx,
start_time: std::time::Instant::now(), start_time: std::time::Instant::now(),
proxy_protocol,
destination_policy,
} }
} }
@@ -233,6 +251,40 @@ impl NatEngine {
smoltcp::time::Instant::from_millis(self.start_time.elapsed().as_millis() as i64) smoltcp::time::Instant::from_millis(self.start_time.elapsed().as_millis() as i64)
} }
/// Evaluate destination policy for a packet's destination IP.
fn evaluate_destination(&self, dst_ip: Ipv4Addr, dst_port: u16) -> DestinationAction {
let policy = match &self.destination_policy {
Some(p) => p,
None => return DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port)),
};
// 1. Block list wins (deny overrides allow)
if let Some(ref block_list) = policy.block_list {
if !block_list.is_empty() && acl::ip_matches_any(dst_ip, block_list) {
return DestinationAction::Drop;
}
}
// 2. Allow list — pass through directly
if let Some(ref allow_list) = policy.allow_list {
if !allow_list.is_empty() && acl::ip_matches_any(dst_ip, allow_list) {
return DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port));
}
}
// 3. Default action
match policy.default.as_str() {
"forceTarget" => {
let target_ip = policy.target.as_deref()
.and_then(|t| t.parse::<Ipv4Addr>().ok())
.unwrap_or(Ipv4Addr::LOCALHOST);
DestinationAction::ForceTarget(SocketAddr::new(target_ip.into(), dst_port))
}
"block" => DestinationAction::Drop,
_ => DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port)),
}
}
/// Inject a raw IP packet from a VPN client and handle new session creation. /// Inject a raw IP packet from a VPN client and handle new session creation.
fn inject_packet(&mut self, packet: Vec<u8>) { fn inject_packet(&mut self, packet: Vec<u8>) {
let Some((ihl, src_ip, dst_ip, protocol)) = parse_ipv4_header(&packet) else { let Some((ihl, src_ip, dst_ip, protocol)) = parse_ipv4_header(&packet) else {
@@ -257,7 +309,14 @@ impl NatEngine {
// SYN without ACK = new connection // SYN without ACK = new connection
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0; let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
if is_syn && !self.tcp_sessions.contains_key(&key) { if is_syn && !self.tcp_sessions.contains_key(&key) {
self.create_tcp_session(&key); match self.evaluate_destination(dst_ip, dst_port) {
DestinationAction::Drop => {
debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
return;
}
DestinationAction::PassThrough(addr) => self.create_tcp_session(&key, addr),
DestinationAction::ForceTarget(addr) => self.create_tcp_session(&key, addr),
}
} }
} }
17 => { 17 => {
@@ -274,7 +333,14 @@ impl NatEngine {
}; };
if !self.udp_sessions.contains_key(&key) { if !self.udp_sessions.contains_key(&key) {
self.create_udp_session(&key); match self.evaluate_destination(dst_ip, dst_port) {
DestinationAction::Drop => {
debug!("NAT: destination policy blocked UDP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
return;
}
DestinationAction::PassThrough(addr) => self.create_udp_session(&key, addr),
DestinationAction::ForceTarget(addr) => self.create_udp_session(&key, addr),
}
} }
// Update last_activity for existing sessions // Update last_activity for existing sessions
@@ -291,7 +357,7 @@ impl NatEngine {
self.device.inject_packet(packet); self.device.inject_packet(packet);
} }
fn create_tcp_session(&mut self, key: &SessionKey) { fn create_tcp_session(&mut self, key: &SessionKey, connect_addr: SocketAddr) {
// Create smoltcp TCP socket // Create smoltcp TCP socket
let tcp_rx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]); let tcp_rx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]);
let tcp_tx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]); let tcp_tx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]);
@@ -319,11 +385,12 @@ impl NatEngine {
}; };
self.tcp_sessions.insert(key.clone(), session); self.tcp_sessions.insert(key.clone(), session);
// Spawn bridge task that connects to the real destination // Spawn bridge task that connects to the resolved destination
let bridge_tx = self.bridge_tx.clone(); let bridge_tx = self.bridge_tx.clone();
let key_clone = key.clone(); let key_clone = key.clone();
let proxy_protocol = self.proxy_protocol;
tokio::spawn(async move { tokio::spawn(async move {
tcp_bridge_task(key_clone, data_rx, bridge_tx).await; tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await;
}); });
debug!( debug!(
@@ -332,7 +399,7 @@ impl NatEngine {
); );
} }
fn create_udp_session(&mut self, key: &SessionKey) { fn create_udp_session(&mut self, key: &SessionKey, connect_addr: SocketAddr) {
// Create smoltcp UDP socket // Create smoltcp UDP socket
let udp_rx_buf = udp::PacketBuffer::new( let udp_rx_buf = udp::PacketBuffer::new(
vec![udp::PacketMetadata::EMPTY; 32], vec![udp::PacketMetadata::EMPTY; 32],
@@ -368,7 +435,7 @@ impl NatEngine {
let bridge_tx = self.bridge_tx.clone(); let bridge_tx = self.bridge_tx.clone();
let key_clone = key.clone(); let key_clone = key.clone();
tokio::spawn(async move { tokio::spawn(async move {
udp_bridge_task(key_clone, data_rx, bridge_tx).await; udp_bridge_task(key_clone, data_rx, bridge_tx, connect_addr).await;
}); });
debug!( debug!(
@@ -531,20 +598,20 @@ async fn tcp_bridge_task(
key: SessionKey, key: SessionKey,
mut data_rx: mpsc::Receiver<Vec<u8>>, mut data_rx: mpsc::Receiver<Vec<u8>>,
bridge_tx: mpsc::Sender<BridgeMessage>, bridge_tx: mpsc::Sender<BridgeMessage>,
proxy_protocol: bool,
connect_addr: SocketAddr,
) { ) {
let addr = SocketAddr::new(key.dst_ip.into(), key.dst_port); // Connect to resolved destination (may differ from key.dst_ip if policy rewrote it)
let stream = match tokio::time::timeout(Duration::from_secs(30), TcpStream::connect(connect_addr)).await
// Connect to real destination with timeout
let stream = match tokio::time::timeout(Duration::from_secs(30), TcpStream::connect(addr)).await
{ {
Ok(Ok(s)) => s, Ok(Ok(s)) => s,
Ok(Err(e)) => { Ok(Err(e)) => {
debug!("NAT TCP connect to {} failed: {}", addr, e); debug!("NAT TCP connect to {} failed: {}", connect_addr, e);
let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await; let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await;
return; return;
} }
Err(_) => { Err(_) => {
debug!("NAT TCP connect to {} timed out", addr); debug!("NAT TCP connect to {} timed out", connect_addr);
let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await; let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await;
return; return;
} }
@@ -552,6 +619,18 @@ async fn tcp_bridge_task(
let (mut reader, mut writer) = stream.into_split(); let (mut reader, mut writer) = stream.into_split();
// Send PROXY protocol v2 header with VPN client's tunnel IP as source
if proxy_protocol {
let src = SocketAddr::new(key.src_ip.into(), key.src_port);
let dst = SocketAddr::new(key.dst_ip.into(), key.dst_port);
let pp_header = crate::proxy_protocol::build_pp_v2_header(src, dst);
if let Err(e) = writer.write_all(&pp_header).await {
debug!("NAT: failed to send PP v2 header to {}: {}", connect_addr, e);
let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await;
return;
}
}
// Read from real socket → send to NAT engine // Read from real socket → send to NAT engine
let bridge_tx2 = bridge_tx.clone(); let bridge_tx2 = bridge_tx.clone();
let key2 = key.clone(); let key2 = key.clone();
@@ -594,6 +673,7 @@ async fn udp_bridge_task(
key: SessionKey, key: SessionKey,
mut data_rx: mpsc::Receiver<Vec<u8>>, mut data_rx: mpsc::Receiver<Vec<u8>>,
bridge_tx: mpsc::Sender<BridgeMessage>, bridge_tx: mpsc::Sender<BridgeMessage>,
connect_addr: SocketAddr,
) { ) {
let socket = match UdpSocket::bind("0.0.0.0:0").await { let socket = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s, Ok(s) => s,
@@ -602,7 +682,7 @@ async fn udp_bridge_task(
return; return;
} }
}; };
let dest = SocketAddr::new(key.dst_ip.into(), key.dst_port); let dest = connect_addr;
let socket = Arc::new(socket); let socket = Arc::new(socket);
let socket2 = socket.clone(); let socket2 = socket.clone();

View File

@@ -2,8 +2,6 @@ use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine; use base64::Engine;
@@ -17,8 +15,7 @@ use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::network; use crate::server::{ClientInfo, ForwardingEngine, ServerState};
use crate::tunnel::extract_dst_ip;
use crate::tunnel::{self, TunConfig}; use crate::tunnel::{self, TunConfig};
// ============================================================================ // ============================================================================
@@ -30,9 +27,6 @@ const WG_BUFFER_SIZE: usize = MAX_UDP_PACKET;
/// Minimum dst buffer size for boringtun encapsulate/decapsulate /// Minimum dst buffer size for boringtun encapsulate/decapsulate
const _MIN_DST_BUF: usize = 148; const _MIN_DST_BUF: usize = 148;
const TIMER_TICK_MS: u64 = 100; const TIMER_TICK_MS: u64 = 100;
const DEFAULT_WG_PORT: u16 = 51820;
const DEFAULT_TUN_ADDRESS: &str = "10.8.0.1";
const DEFAULT_TUN_NETMASK: &str = "255.255.255.0";
const DEFAULT_MTU: u16 = 1420; const DEFAULT_MTU: u16 = 1420;
// ============================================================================ // ============================================================================
@@ -52,27 +46,6 @@ pub struct WgPeerConfig {
pub persistent_keepalive: Option<u16>, pub persistent_keepalive: Option<u16>,
} }
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WgServerConfig {
pub private_key: String,
#[serde(default)]
pub listen_port: Option<u16>,
#[serde(default)]
pub tun_address: Option<String>,
#[serde(default)]
pub tun_netmask: Option<String>,
#[serde(default)]
pub mtu: Option<u16>,
pub peers: Vec<WgPeerConfig>,
#[serde(default)]
pub dns: Option<Vec<String>>,
#[serde(default)]
pub enable_nat: Option<bool>,
#[serde(default)]
pub subnet: Option<String>,
}
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct WgClientConfig { pub struct WgClientConfig {
@@ -112,17 +85,6 @@ pub struct WgPeerInfo {
pub stats: WgPeerStats, pub stats: WgPeerStats,
} }
#[derive(Debug, Clone, Default, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WgServerStats {
pub total_bytes_sent: u64,
pub total_bytes_received: u64,
pub total_packets_sent: u64,
pub total_packets_received: u64,
pub active_peers: usize,
pub uptime_seconds: f64,
}
// ============================================================================ // ============================================================================
// Key generation and parsing // Key generation and parsing
// ============================================================================ // ============================================================================
@@ -233,7 +195,7 @@ impl AllowedIp {
// Dynamic peer management commands // Dynamic peer management commands
// ============================================================================ // ============================================================================
enum WgCommand { pub enum WgCommand {
AddPeer(WgPeerConfig, oneshot::Sender<Result<()>>), AddPeer(WgPeerConfig, oneshot::Sender<Result<()>>),
RemovePeer(String, oneshot::Sender<Result<()>>), RemovePeer(String, oneshot::Sender<Result<()>>),
} }
@@ -258,451 +220,6 @@ impl PeerState {
} }
} }
// ============================================================================
// WgServer
// ============================================================================
pub struct WgServer {
shutdown_tx: Option<oneshot::Sender<()>>,
command_tx: Option<mpsc::Sender<WgCommand>>,
shared_stats: Arc<RwLock<HashMap<String, WgPeerStats>>>,
server_stats: Arc<RwLock<WgServerStats>>,
started_at: Option<Instant>,
listen_port: Option<u16>,
}
impl WgServer {
pub fn new() -> Self {
Self {
shutdown_tx: None,
command_tx: None,
shared_stats: Arc::new(RwLock::new(HashMap::new())),
server_stats: Arc::new(RwLock::new(WgServerStats::default())),
started_at: None,
listen_port: None,
}
}
pub fn is_running(&self) -> bool {
self.shutdown_tx.is_some()
}
pub async fn start(&mut self, config: WgServerConfig) -> Result<()> {
if self.is_running() {
return Err(anyhow!("WireGuard server is already running"));
}
let listen_port = config.listen_port.unwrap_or(DEFAULT_WG_PORT);
let tun_address = config
.tun_address
.as_deref()
.unwrap_or(DEFAULT_TUN_ADDRESS);
let tun_netmask = config
.tun_netmask
.as_deref()
.unwrap_or(DEFAULT_TUN_NETMASK);
let mtu = config.mtu.unwrap_or(DEFAULT_MTU);
// Parse server private key
let server_private = parse_private_key(&config.private_key)?;
let server_public = PublicKey::from(&server_private);
// Create rate limiter for DDoS protection
let rate_limiter = Arc::new(RateLimiter::new(&server_public, TIMER_TICK_MS as u64));
// Build peer state
let peer_index = AtomicU32::new(0);
let mut peers: Vec<PeerState> = Vec::with_capacity(config.peers.len());
for peer_config in &config.peers {
let peer_public = parse_public_key(&peer_config.public_key)?;
let psk = match &peer_config.preshared_key {
Some(k) => Some(parse_preshared_key(k)?),
None => None,
};
let idx = peer_index.fetch_add(1, Ordering::Relaxed);
// Clone the private key for each Tunn (StaticSecret doesn't implement Clone,
// so re-parse from config)
let priv_copy = parse_private_key(&config.private_key)?;
let tunn = Tunn::new(
priv_copy,
peer_public,
psk,
peer_config.persistent_keepalive,
idx,
Some(rate_limiter.clone()),
);
let allowed_ips: Vec<AllowedIp> = peer_config
.allowed_ips
.iter()
.map(|cidr| AllowedIp::parse(cidr))
.collect::<Result<Vec<_>>>()?;
let endpoint = match &peer_config.endpoint {
Some(ep) => Some(ep.parse::<SocketAddr>()?),
None => None,
};
peers.push(PeerState {
tunn,
public_key_b64: peer_config.public_key.clone(),
allowed_ips,
endpoint,
persistent_keepalive: peer_config.persistent_keepalive,
stats: WgPeerStats::default(),
});
}
// Create TUN device
let tun_config = TunConfig {
name: "wg0".to_string(),
address: tun_address.parse()?,
netmask: tun_netmask.parse()?,
mtu,
};
let tun_device = tunnel::create_tun(&tun_config)?;
info!("WireGuard TUN device created: {}", tun_config.name);
// Bind UDP socket
let udp_socket = UdpSocket::bind(format!("0.0.0.0:{}", listen_port)).await?;
info!("WireGuard server listening on UDP port {}", listen_port);
// Enable IP forwarding and NAT if requested
if config.enable_nat.unwrap_or(false) {
network::enable_ip_forwarding()?;
let subnet = config
.subnet
.as_deref()
.unwrap_or("10.8.0.0/24");
let iface = network::get_default_interface()?;
network::setup_nat(subnet, &iface).await?;
info!("NAT enabled for subnet {} via {}", subnet, iface);
}
// Channels
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (command_tx, command_rx) = mpsc::channel::<WgCommand>(32);
let shared_stats = self.shared_stats.clone();
let server_stats = self.server_stats.clone();
let started_at = Instant::now();
// Initialize shared stats
{
let mut stats = shared_stats.write().await;
for peer in &peers {
stats.insert(peer.public_key_b64.clone(), WgPeerStats::default());
}
}
// Spawn the event loop
tokio::spawn(async move {
if let Err(e) = wg_server_loop(
udp_socket,
tun_device,
peers,
peer_index,
rate_limiter,
config.private_key.clone(),
shared_stats,
server_stats,
started_at,
shutdown_rx,
command_rx,
)
.await
{
error!("WireGuard server loop error: {}", e);
}
info!("WireGuard server loop exited");
});
self.shutdown_tx = Some(shutdown_tx);
self.command_tx = Some(command_tx);
self.started_at = Some(started_at);
self.listen_port = Some(listen_port);
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
self.command_tx = None;
self.started_at = None;
self.listen_port = None;
info!("WireGuard server stopped");
Ok(())
}
pub fn get_status(&self) -> serde_json::Value {
if self.is_running() {
serde_json::json!({
"state": "running",
"listenPort": self.listen_port,
"uptimeSeconds": self.started_at.map(|t| t.elapsed().as_secs_f64()).unwrap_or(0.0),
})
} else {
serde_json::json!({ "state": "stopped" })
}
}
pub async fn get_statistics(&self) -> serde_json::Value {
let mut stats = self.server_stats.write().await;
if let Some(started) = self.started_at {
stats.uptime_seconds = started.elapsed().as_secs_f64();
}
// Aggregate from peer stats
let peer_stats = self.shared_stats.read().await;
stats.active_peers = peer_stats.len();
stats.total_bytes_sent = peer_stats.values().map(|s| s.bytes_sent).sum();
stats.total_bytes_received = peer_stats.values().map(|s| s.bytes_received).sum();
stats.total_packets_sent = peer_stats.values().map(|s| s.packets_sent).sum();
stats.total_packets_received = peer_stats.values().map(|s| s.packets_received).sum();
serde_json::to_value(&*stats).unwrap_or_default()
}
pub async fn list_peers(&self) -> Vec<WgPeerInfo> {
let stats = self.shared_stats.read().await;
stats
.iter()
.map(|(key, s)| WgPeerInfo {
public_key: key.clone(),
allowed_ips: vec![], // populated from event loop snapshots
endpoint: None,
persistent_keepalive: None,
stats: s.clone(),
})
.collect()
}
pub async fn add_peer(&self, config: WgPeerConfig) -> Result<()> {
let tx = self
.command_tx
.as_ref()
.ok_or_else(|| anyhow!("Server not running"))?;
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(WgCommand::AddPeer(config, resp_tx))
.await
.map_err(|_| anyhow!("Server event loop closed"))?;
resp_rx.await.map_err(|_| anyhow!("No response"))?
}
pub async fn remove_peer(&self, public_key: &str) -> Result<()> {
let tx = self
.command_tx
.as_ref()
.ok_or_else(|| anyhow!("Server not running"))?;
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(WgCommand::RemovePeer(public_key.to_string(), resp_tx))
.await
.map_err(|_| anyhow!("Server event loop closed"))?;
resp_rx.await.map_err(|_| anyhow!("No response"))?
}
}
// ============================================================================
// Server event loop
// ============================================================================
async fn wg_server_loop(
udp_socket: UdpSocket,
tun_device: tun::AsyncDevice,
mut peers: Vec<PeerState>,
peer_index: AtomicU32,
rate_limiter: Arc<RateLimiter>,
server_private_key_b64: String,
shared_stats: Arc<RwLock<HashMap<String, WgPeerStats>>>,
_server_stats: Arc<RwLock<WgServerStats>>,
_started_at: Instant,
mut shutdown_rx: oneshot::Receiver<()>,
mut command_rx: mpsc::Receiver<WgCommand>,
) -> Result<()> {
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
let mut tun_buf = vec![0u8; MAX_UDP_PACKET];
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
// Split TUN for concurrent read/write in select
let (mut tun_reader, mut tun_writer) = tokio::io::split(tun_device);
// Stats sync interval
let mut stats_timer =
tokio::time::interval(std::time::Duration::from_secs(1));
loop {
tokio::select! {
// --- UDP receive ---
result = udp_socket.recv_from(&mut udp_buf) => {
let (n, src_addr) = result?;
if n == 0 { continue; }
// Find which peer this packet belongs to by trying decapsulate
let mut handled = false;
for peer in peers.iter_mut() {
match peer.tunn.decapsulate(Some(src_addr.ip()), &udp_buf[..n], &mut dst_buf) {
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, src_addr).await?;
// Drain loop
loop {
match peer.tunn.decapsulate(None, &[], &mut dst_buf) {
TunnResult::WriteToNetwork(pkt) => {
let ep = peer.endpoint.unwrap_or(src_addr);
udp_socket.send_to(pkt, ep).await?;
}
_ => break,
}
}
peer.endpoint = Some(src_addr);
handled = true;
break;
}
TunnResult::WriteToTunnelV4(packet, addr) => {
if peer.matches_dst(IpAddr::V4(addr)) {
let pkt_len = packet.len() as u64;
tun_writer.write_all(packet).await?;
peer.stats.bytes_received += pkt_len;
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
handled = true;
break;
}
TunnResult::WriteToTunnelV6(packet, addr) => {
if peer.matches_dst(IpAddr::V6(addr)) {
let pkt_len = packet.len() as u64;
tun_writer.write_all(packet).await?;
peer.stats.bytes_received += pkt_len;
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
handled = true;
break;
}
TunnResult::Done => {
// This peer didn't recognize the packet, try next
continue;
}
TunnResult::Err(e) => {
debug!("decapsulate error from {}: {:?}", src_addr, e);
continue;
}
}
}
if !handled {
debug!("No peer matched UDP packet from {}", src_addr);
}
}
// --- TUN read ---
result = tun_reader.read(&mut tun_buf) => {
let n = result?;
if n == 0 { continue; }
let dst_ip = match extract_dst_ip(&tun_buf[..n]) {
Some(ip) => ip,
None => { continue; }
};
// Find peer whose AllowedIPs match the destination
for peer in peers.iter_mut() {
if !peer.matches_dst(dst_ip) {
continue;
}
match peer.tunn.encapsulate(&tun_buf[..n], &mut dst_buf) {
TunnResult::WriteToNetwork(packet) => {
if let Some(endpoint) = peer.endpoint {
let pkt_len = n as u64;
udp_socket.send_to(packet, endpoint).await?;
peer.stats.bytes_sent += pkt_len;
peer.stats.packets_sent += 1;
} else {
debug!("No endpoint for peer {}, dropping packet", peer.public_key_b64);
}
}
TunnResult::Err(e) => {
debug!("encapsulate error for peer {}: {:?}", peer.public_key_b64, e);
}
_ => {}
}
break;
}
}
// --- Timer tick (100ms) for WireGuard timers ---
_ = timer.tick() => {
for peer in peers.iter_mut() {
match peer.tunn.update_timers(&mut dst_buf) {
TunnResult::WriteToNetwork(packet) => {
if let Some(endpoint) = peer.endpoint {
udp_socket.send_to(packet, endpoint).await?;
}
}
TunnResult::Err(e) => {
debug!("Timer error for peer {}: {:?}", peer.public_key_b64, e);
}
_ => {}
}
}
}
// --- Sync stats to shared state ---
_ = stats_timer.tick() => {
let mut shared = shared_stats.write().await;
for peer in peers.iter() {
shared.insert(peer.public_key_b64.clone(), peer.stats.clone());
}
}
// --- Dynamic peer commands ---
cmd = command_rx.recv() => {
match cmd {
Some(WgCommand::AddPeer(config, resp_tx)) => {
let result = add_peer_to_loop(
&mut peers,
&config,
&peer_index,
&rate_limiter,
&server_private_key_b64,
);
if result.is_ok() {
let mut shared = shared_stats.write().await;
shared.insert(config.public_key.clone(), WgPeerStats::default());
}
let _ = resp_tx.send(result);
}
Some(WgCommand::RemovePeer(pubkey, resp_tx)) => {
let prev_len = peers.len();
peers.retain(|p| p.public_key_b64 != pubkey);
if peers.len() < prev_len {
let mut shared = shared_stats.write().await;
shared.remove(&pubkey);
let _ = resp_tx.send(Ok(()));
} else {
let _ = resp_tx.send(Err(anyhow!("Peer not found: {}", pubkey)));
}
}
None => {
info!("Command channel closed");
break;
}
}
}
// --- Shutdown ---
_ = &mut shutdown_rx => {
info!("WireGuard server shutdown signal received");
break;
}
}
}
Ok(())
}
fn add_peer_to_loop( fn add_peer_to_loop(
peers: &mut Vec<PeerState>, peers: &mut Vec<PeerState>,
@@ -757,6 +274,410 @@ fn add_peer_to_loop(
Ok(()) Ok(())
} }
// ============================================================================
// Integrated WG listener (shares ServerState with WS/QUIC)
// ============================================================================
/// Configuration for the integrated WireGuard listener.
#[derive(Debug, Clone)]
pub struct WgListenerConfig {
pub private_key: String,
pub listen_port: u16,
pub peers: Vec<WgPeerConfig>,
}
/// Extract the first /32 IPv4 address from a list of AllowedIp entries.
/// This is the peer's VPN IP used for return-packet routing.
fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
for aip in allowed_ips {
if let IpAddr::V4(v4) = aip.addr {
if aip.prefix_len == 32 {
return Some(v4);
}
}
}
None
}
/// Timestamp helper (mirrors server.rs timestamp_now).
fn wg_timestamp_now() -> String {
use std::time::SystemTime;
let duration = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default();
format!("{}", duration.as_secs())
}
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool).
/// Returns the VPN IP and the per-peer return-packet receiver.
async fn register_wg_peer(
state: &Arc<ServerState>,
peer: &PeerState,
wg_return_tx: &mpsc::Sender<(String, Vec<u8>)>,
) -> Result<Option<Ipv4Addr>> {
let vpn_ip = match extract_peer_vpn_ip(&peer.allowed_ips) {
Some(ip) => ip,
None => {
warn!("WG peer {} has no /32 IPv4 in allowed_ips, skipping registration",
peer.public_key_b64);
return Ok(None);
}
};
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
// Reserve IP in the pool
if let Err(e) = state.ip_pool.lock().await.reserve(vpn_ip, &client_id) {
warn!("Failed to reserve IP {} for WG peer {}: {}", vpn_ip, client_id, e);
return Ok(None);
}
// Create per-peer return channel and register in tun_routes
let fwd_mode = state.config.forwarding_mode.as_deref().unwrap_or("testing");
let forwarding_active = fwd_mode == "tun" || fwd_mode == "socket";
if forwarding_active {
let (peer_return_tx, mut peer_return_rx) = mpsc::channel::<Vec<u8>>(256);
state.tun_routes.write().await.insert(vpn_ip, peer_return_tx);
// Spawn relay task: per-peer channel → merged channel tagged with pubkey
let relay_tx = wg_return_tx.clone();
let pubkey = peer.public_key_b64.clone();
tokio::spawn(async move {
while let Some(packet) = peer_return_rx.recv().await {
if relay_tx.send((pubkey.clone(), packet)).await.is_err() {
break;
}
}
});
}
// Insert ClientInfo
let client_info = ClientInfo {
client_id: client_id.clone(),
assigned_ip: vpn_ip.to_string(),
connected_since: wg_timestamp_now(),
bytes_sent: 0,
bytes_received: 0,
packets_dropped: 0,
bytes_dropped: 0,
last_keepalive_at: None,
keepalives_received: 0,
rate_limit_bytes_per_sec: None,
burst_bytes: None,
authenticated_key: peer.public_key_b64.clone(),
registered_client_id: client_id,
remote_addr: peer.endpoint.map(|e| e.to_string()),
transport_type: "wireguard".to_string(),
};
state.clients.write().await.insert(client_info.client_id.clone(), client_info);
Ok(Some(vpn_ip))
}
/// Unregister a WG peer from ServerState.
async fn unregister_wg_peer(
state: &Arc<ServerState>,
pubkey: &str,
vpn_ip: Option<Ipv4Addr>,
) {
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
if let Some(ip) = vpn_ip {
state.tun_routes.write().await.remove(&ip);
state.ip_pool.lock().await.release(&ip);
}
state.clients.write().await.remove(&client_id);
state.rate_limiters.lock().await.remove(&client_id);
}
/// Integrated WireGuard listener that shares ServerState with WS/QUIC listeners.
/// Uses the shared ForwardingEngine for packet routing instead of its own TUN device.
pub async fn run_wg_listener(
state: Arc<ServerState>,
config: WgListenerConfig,
mut shutdown_rx: mpsc::Receiver<()>,
mut command_rx: mpsc::Receiver<WgCommand>,
) -> Result<()> {
// Parse server private key
let server_private = parse_private_key(&config.private_key)?;
let server_public = PublicKey::from(&server_private);
// Create rate limiter for DDoS protection
let rate_limiter = Arc::new(RateLimiter::new(&server_public, TIMER_TICK_MS as u64));
// Build initial peer state
let peer_index = AtomicU32::new(0);
let mut peers: Vec<PeerState> = Vec::with_capacity(config.peers.len());
for peer_config in &config.peers {
let peer_public = parse_public_key(&peer_config.public_key)?;
let psk = match &peer_config.preshared_key {
Some(k) => Some(parse_preshared_key(k)?),
None => None,
};
let idx = peer_index.fetch_add(1, Ordering::Relaxed);
let priv_copy = parse_private_key(&config.private_key)?;
let tunn = Tunn::new(
priv_copy,
peer_public,
psk,
peer_config.persistent_keepalive,
idx,
Some(rate_limiter.clone()),
);
let allowed_ips: Vec<AllowedIp> = peer_config
.allowed_ips
.iter()
.map(|cidr| AllowedIp::parse(cidr))
.collect::<Result<Vec<_>>>()?;
let endpoint = match &peer_config.endpoint {
Some(ep) => Some(ep.parse::<SocketAddr>()?),
None => None,
};
peers.push(PeerState {
tunn,
public_key_b64: peer_config.public_key.clone(),
allowed_ips,
endpoint,
persistent_keepalive: peer_config.persistent_keepalive,
stats: WgPeerStats::default(),
});
}
// Bind UDP socket
let udp_socket = UdpSocket::bind(format!("0.0.0.0:{}", config.listen_port)).await?;
info!("WireGuard listener started on UDP port {}", config.listen_port);
// Merged return-packet channel: all per-peer channels feed into this
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
// Register initial peers in ServerState and track their VPN IPs
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
for peer in &peers {
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
}
}
// Buffers
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
tokio::select! {
// --- UDP receive → decapsulate → ForwardingEngine ---
result = udp_socket.recv_from(&mut udp_buf) => {
let (n, src_addr) = result?;
if n == 0 { continue; }
let mut handled = false;
for peer in peers.iter_mut() {
match peer.tunn.decapsulate(Some(src_addr.ip()), &udp_buf[..n], &mut dst_buf) {
TunnResult::WriteToNetwork(packet) => {
udp_socket.send_to(packet, src_addr).await?;
loop {
match peer.tunn.decapsulate(None, &[], &mut dst_buf) {
TunnResult::WriteToNetwork(pkt) => {
let ep = peer.endpoint.unwrap_or(src_addr);
udp_socket.send_to(pkt, ep).await?;
}
_ => break,
}
}
peer.endpoint = Some(src_addr);
handled = true;
break;
}
TunnResult::WriteToTunnelV4(packet, addr) => {
if peer.matches_dst(IpAddr::V4(addr)) {
let pkt_len = packet.len() as u64;
// Forward via shared forwarding engine
let mut engine = state.forwarding_engine.lock().await;
match &mut *engine {
ForwardingEngine::Tun(writer) => {
use tokio::io::AsyncWriteExt;
if let Err(e) = writer.write_all(packet).await {
warn!("TUN write error for WG peer: {}", e);
}
}
ForwardingEngine::Socket(sender) => {
let _ = sender.try_send(packet.to_vec());
}
ForwardingEngine::Testing => {}
}
peer.stats.bytes_received += pkt_len;
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
handled = true;
break;
}
TunnResult::WriteToTunnelV6(packet, addr) => {
if peer.matches_dst(IpAddr::V6(addr)) {
let pkt_len = packet.len() as u64;
let mut engine = state.forwarding_engine.lock().await;
match &mut *engine {
ForwardingEngine::Tun(writer) => {
use tokio::io::AsyncWriteExt;
if let Err(e) = writer.write_all(packet).await {
warn!("TUN write error for WG peer: {}", e);
}
}
ForwardingEngine::Socket(sender) => {
let _ = sender.try_send(packet.to_vec());
}
ForwardingEngine::Testing => {}
}
peer.stats.bytes_received += pkt_len;
peer.stats.packets_received += 1;
}
peer.endpoint = Some(src_addr);
handled = true;
break;
}
TunnResult::Done => { continue; }
TunnResult::Err(e) => {
debug!("decapsulate error from {}: {:?}", src_addr, e);
continue;
}
}
}
if !handled {
debug!("No WG peer matched UDP packet from {}", src_addr);
}
}
// --- Return packets from tun_routes → encapsulate → UDP ---
Some((pubkey, packet)) = wg_return_rx.recv() => {
if let Some(peer) = peers.iter_mut().find(|p| p.public_key_b64 == pubkey) {
match peer.tunn.encapsulate(&packet, &mut dst_buf) {
TunnResult::WriteToNetwork(out) => {
if let Some(endpoint) = peer.endpoint {
let pkt_len = packet.len() as u64;
udp_socket.send_to(out, endpoint).await?;
peer.stats.bytes_sent += pkt_len;
peer.stats.packets_sent += 1;
} else {
debug!("No endpoint for WG peer {}, dropping return packet",
peer.public_key_b64);
}
}
TunnResult::Err(e) => {
debug!("encapsulate error for WG peer {}: {:?}",
peer.public_key_b64, e);
}
_ => {}
}
}
}
// --- WireGuard protocol timers (100ms) ---
_ = timer.tick() => {
for peer in peers.iter_mut() {
match peer.tunn.update_timers(&mut dst_buf) {
TunnResult::WriteToNetwork(packet) => {
if let Some(endpoint) = peer.endpoint {
udp_socket.send_to(packet, endpoint).await?;
}
}
TunnResult::Err(e) => {
debug!("Timer error for WG peer {}: {:?}",
peer.public_key_b64, e);
}
_ => {}
}
}
}
// --- Sync stats to ServerState (every 1s) ---
_ = stats_timer.tick() => {
let mut clients = state.clients.write().await;
let mut stats = state.stats.write().await;
for peer in peers.iter() {
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
if let Some(info) = clients.get_mut(&client_id) {
// Update stats delta
let prev_sent = info.bytes_sent;
let prev_recv = info.bytes_received;
info.bytes_sent = peer.stats.bytes_sent;
info.bytes_received = peer.stats.bytes_received;
info.remote_addr = peer.endpoint.map(|e| e.to_string());
// Update aggregate stats
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent);
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv);
}
}
}
// --- Dynamic peer commands ---
cmd = command_rx.recv() => {
match cmd {
Some(WgCommand::AddPeer(peer_config, resp_tx)) => {
let result = add_peer_to_loop(
&mut peers,
&peer_config,
&peer_index,
&rate_limiter,
&config.private_key,
);
if result.is_ok() {
// Register new peer in ServerState
let peer = peers.last().unwrap();
match register_wg_peer(&state, peer, &wg_return_tx).await {
Ok(Some(ip)) => {
peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
}
Ok(None) => {}
Err(e) => {
warn!("Failed to register WG peer: {}", e);
}
}
}
let _ = resp_tx.send(result);
}
Some(WgCommand::RemovePeer(pubkey, resp_tx)) => {
let prev_len = peers.len();
peers.retain(|p| p.public_key_b64 != pubkey);
if peers.len() < prev_len {
let vpn_ip = peer_vpn_ips.remove(&pubkey);
unregister_wg_peer(&state, &pubkey, vpn_ip).await;
let _ = resp_tx.send(Ok(()));
} else {
let _ = resp_tx.send(Err(anyhow!("Peer not found: {}", pubkey)));
}
}
None => {
info!("WG command channel closed");
break;
}
}
}
// --- Shutdown ---
_ = shutdown_rx.recv() => {
info!("WireGuard listener shutdown signal received");
break;
}
}
}
// Cleanup: unregister all peers from ServerState
for peer in &peers {
let vpn_ip = peer_vpn_ips.get(&peer.public_key_b64).copied();
unregister_wg_peer(&state, &peer.public_key_b64, vpn_ip).await;
}
info!("WireGuard listener stopped");
Ok(())
}
// ============================================================================ // ============================================================================
// WgClient // WgClient
// ============================================================================ // ============================================================================
@@ -1077,6 +998,7 @@ fn chrono_now() -> String {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::tunnel::extract_dst_ip;
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
#[test] #[test]

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartvpn', name: '@push.rocks/smartvpn',
version: '1.10.2', version: '1.15.0',
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
} }

View File

@@ -12,6 +12,7 @@ import type {
IWgPeerInfo, IWgPeerInfo,
IClientEntry, IClientEntry,
IClientConfigBundle, IClientConfigBundle,
IDestinationPolicy,
TVpnServerCommands, TVpnServerCommands,
} from './smartvpn.interfaces.js'; } from './smartvpn.interfaces.js';
@@ -21,6 +22,10 @@ import type {
export class VpnServer extends plugins.events.EventEmitter { export class VpnServer extends plugins.events.EventEmitter {
private bridge: VpnBridge<TVpnServerCommands>; private bridge: VpnBridge<TVpnServerCommands>;
private options: IVpnServerOptions; private options: IVpnServerOptions;
private nft?: plugins.smartnftables.SmartNftables;
private nftHealthInterval?: ReturnType<typeof setInterval>;
private nftSubnet?: string;
private nftPolicy?: IDestinationPolicy;
constructor(options: IVpnServerOptions) { constructor(options: IVpnServerOptions) {
super(); super();
@@ -50,6 +55,11 @@ export class VpnServer extends plugins.events.EventEmitter {
const cfg = config || this.options.config; const cfg = config || this.options.config;
if (cfg) { if (cfg) {
await this.bridge.sendCommand('start', { config: cfg }); await this.bridge.sendCommand('start', { config: cfg });
// For TUN mode with a destination policy, set up nftables rules
if (cfg.forwardingMode === 'tun' && cfg.destinationPolicy) {
await this.setupTunDestinationPolicy(cfg.subnet, cfg.destinationPolicy);
}
} }
} }
@@ -229,10 +239,110 @@ export class VpnServer extends plugins.events.EventEmitter {
return this.bridge.sendCommand('generateClientKeypair', {} as Record<string, never>); return this.bridge.sendCommand('generateClientKeypair', {} as Record<string, never>);
} }
// ── TUN Destination Policy via nftables ──────────────────────────────
/**
* Set up nftables rules for TUN mode destination policy.
* Also starts a 60-second health check interval to re-apply if rules are removed externally.
*/
private async setupTunDestinationPolicy(subnet: string, policy: IDestinationPolicy): Promise<void> {
this.nftSubnet = subnet;
this.nftPolicy = policy;
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'smartvpn_tun',
dryRun: process.getuid?.() !== 0,
});
await this.nft.initialize();
await this.applyDestinationPolicyRules();
// Health check: re-apply rules if they disappear
this.nftHealthInterval = setInterval(async () => {
if (!this.nft) return;
try {
const exists = await this.nft.tableExists();
if (!exists) {
console.warn('[smartvpn] nftables rules missing, re-applying destination policy');
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'smartvpn_tun',
});
await this.nft.initialize();
await this.applyDestinationPolicyRules();
}
} catch (err) {
console.warn(`[smartvpn] nftables health check failed: ${err}`);
}
}, 60_000);
}
/**
* Apply destination policy as nftables rules.
* Order: blockList (drop) → allowList (accept) → default action.
*/
private async applyDestinationPolicyRules(): Promise<void> {
if (!this.nft || !this.nftSubnet || !this.nftPolicy) return;
const subnet = this.nftSubnet;
const policy = this.nftPolicy;
const family = 'ip';
const table = 'smartvpn_tun';
const commands: string[] = [];
// 1. Block list (deny wins — evaluated first)
if (policy.blockList) {
for (const dest of policy.blockList) {
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} ip daddr ${dest} drop`
);
}
}
// 2. Allow list (pass through directly — skip DNAT)
if (policy.allowList) {
for (const dest of policy.allowList) {
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} ip daddr ${dest} accept`
);
}
}
// 3. Default action
switch (policy.default) {
case 'forceTarget': {
const target = policy.target || '127.0.0.1';
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} dnat to ${target}`
);
break;
}
case 'block':
commands.push(
`nft add rule ${family} ${table} prerouting ip saddr ${subnet} drop`
);
break;
case 'allow':
// No rule needed — kernel default allows
break;
}
if (commands.length > 0) {
await this.nft.applyRuleGroup('vpn-destination-policy', commands);
}
}
/** /**
* Stop the daemon bridge. * Stop the daemon bridge.
*/ */
public stop(): void { public stop(): void {
// Clean up nftables rules
if (this.nftHealthInterval) {
clearInterval(this.nftHealthInterval);
this.nftHealthInterval = undefined;
}
if (this.nft) {
this.nft.cleanup().catch(() => {}); // best-effort cleanup
this.nft = undefined;
}
this.bridge.stop(); this.bridge.stop();
} }

View File

@@ -57,6 +57,8 @@ export interface IVpnClientConfig {
wgEndpoint?: string; wgEndpoint?: string;
/** WireGuard: allowed IPs (CIDR strings, e.g. ['0.0.0.0/0']) */ /** WireGuard: allowed IPs (CIDR strings, e.g. ['0.0.0.0/0']) */
wgAllowedIps?: string[]; wgAllowedIps?: string[];
/** Client-defined tags reported to the server after connection (informational, not for access control) */
clientDefinedClientTags?: string[];
} }
export interface IVpnClientOptions { export interface IVpnClientOptions {
@@ -96,12 +98,15 @@ export interface IVpnServerConfig {
defaultRateLimitBytesPerSec?: number; defaultRateLimitBytesPerSec?: number;
/** Default burst size for new clients (bytes). Omit for unlimited. */ /** Default burst size for new clients (bytes). Omit for unlimited. */
defaultBurstBytes?: number; defaultBurstBytes?: number;
/** Transport mode: 'both' (default, WS+QUIC), 'websocket', 'quic', or 'wireguard' */ /** Transport mode: 'all' (default, WS+QUIC+WG if configured), 'both' (WS+QUIC),
transportMode?: 'websocket' | 'quic' | 'both' | 'wireguard'; * 'websocket', 'quic', or 'wireguard' */
transportMode?: 'websocket' | 'quic' | 'both' | 'all' | 'wireguard';
/** QUIC listen address (host:port). Defaults to listenAddr. */ /** QUIC listen address (host:port). Defaults to listenAddr. */
quicListenAddr?: string; quicListenAddr?: string;
/** QUIC idle timeout in seconds (default: 30) */ /** QUIC idle timeout in seconds (default: 30) */
quicIdleTimeoutSecs?: number; quicIdleTimeoutSecs?: number;
/** WireGuard: server X25519 private key (base64). Required when transport includes WG. */
wgPrivateKey?: string;
/** WireGuard: UDP listen port (default: 51820) */ /** WireGuard: UDP listen port (default: 51820) */
wgListenPort?: number; wgListenPort?: number;
/** WireGuard: configured peers */ /** WireGuard: configured peers */
@@ -115,6 +120,32 @@ export interface IVpnServerConfig {
/** Server-level IP block list — applied at TCP accept, before Noise handshake. /** Server-level IP block list — applied at TCP accept, before Noise handshake.
* Supports exact IPs, CIDR, wildcards, ranges. */ * Supports exact IPs, CIDR, wildcards, ranges. */
connectionIpBlockList?: string[]; connectionIpBlockList?: string[];
/** When true and forwardingMode is 'socket', the userspace NAT engine prepends
* PROXY protocol v2 headers on outbound TCP connections, conveying the VPN client's
* tunnel IP as the source address. This allows downstream services (e.g. SmartProxy)
* to see the real VPN client identity instead of 127.0.0.1. */
socketForwardProxyProtocol?: boolean;
/** Destination routing policy for VPN client traffic (socket mode).
* Controls where decrypted traffic goes: allow through, block, or redirect to a target.
* Default: all traffic passes through (backward compatible). */
destinationPolicy?: IDestinationPolicy;
}
/**
* Destination routing policy for VPN client traffic.
* Evaluated per-packet in the NAT engine before per-client ACLs.
*/
export interface IDestinationPolicy {
/** Default action for traffic not matching allow/block lists */
default: 'forceTarget' | 'block' | 'allow';
/** Target IP address for 'forceTarget' mode (e.g. '127.0.0.1'). Required when default is 'forceTarget'. */
target?: string;
/** Destinations that pass through directly — not rewritten, not blocked.
* Supports: exact IP, CIDR, wildcards (192.168.190.*), ranges. */
allowList?: string[];
/** Destinations that are always blocked. Overrides allowList (deny wins).
* Supports: exact IP, CIDR, wildcards, ranges. */
blockList?: string[];
} }
export interface IVpnServerOptions { export interface IVpnServerOptions {
@@ -171,6 +202,8 @@ export interface IVpnClientInfo {
registeredClientId: string; registeredClientId: string;
/** Real client IP:port (from PROXY protocol or direct TCP connection) */ /** Real client IP:port (from PROXY protocol or direct TCP connection) */
remoteAddr?: string; remoteAddr?: string;
/** Transport used: "websocket", "quic", or "wireguard" */
transportType: string;
} }
export interface IVpnServerStatistics extends IVpnStatistics { export interface IVpnServerStatistics extends IVpnStatistics {
@@ -280,7 +313,11 @@ export interface IClientEntry {
priority?: number; priority?: number;
/** Whether this client is enabled (default: true) */ /** Whether this client is enabled (default: true) */
enabled?: boolean; enabled?: boolean;
/** Tags for grouping (e.g. ["engineering", "office"]) */ /** Tags assigned by the server admin — trusted, used for access control (e.g. ["engineering", "office"]) */
serverDefinedClientTags?: string[];
/** Tags reported by the connecting client — informational only, never used for access control */
clientDefinedClientTags?: string[];
/** @deprecated Use serverDefinedClientTags instead. Legacy field kept for backward compatibility. */
tags?: string[]; tags?: string[];
/** Optional description */ /** Optional description */
description?: string; description?: string;

View File

@@ -8,7 +8,8 @@ import * as events from 'events';
export { path, fs, os, url, events }; export { path, fs, os, url, events };
// @push.rocks // @push.rocks
import * as smartnftables from '@push.rocks/smartnftables';
import * as smartpath from '@push.rocks/smartpath'; import * as smartpath from '@push.rocks/smartpath';
import * as smartrust from '@push.rocks/smartrust'; import * as smartrust from '@push.rocks/smartrust';
export { smartpath, smartrust }; export { smartnftables, smartpath, smartrust };