Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c49fcaf1ce | |||
| fdeba5eeb5 | |||
| 17af7ab289 | |||
| b98006e792 | |||
| fbfbe0db51 | |||
| 67542f0be7 | |||
| 13d0183e9d | |||
| 99a8a29ff1 | |||
| fe9c693ac8 | |||
| 20ef92599b | |||
| c3f180e264 | |||
| 667e5ff3de | |||
| ef5856bd3a | |||
| 6e4cafe3c5 | |||
| 42949b1233 | |||
| 7ae7d389dd | |||
| 414edf7038 | |||
| a1b62f6b62 |
66
changelog.md
66
changelog.md
@@ -1,5 +1,71 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-31 - 1.18.0 - feat(server)
|
||||
add bridge forwarding mode and per-client destination policy overrides
|
||||
|
||||
- introduces Linux bridge-based forwarding so VPN clients can receive IPs from a LAN subnet via TAP/bridge integration
|
||||
- adds bridge server configuration options for LAN subnet, physical interface, and client IP allocation range
|
||||
- adds per-client destinationPolicy overrides in the client registry and applies them in the userspace NAT engine based on assigned tunnel IP
|
||||
- extends IP pool allocation to support constrained address ranges needed for bridge mode
|
||||
- updates TypeScript interfaces and documentation to cover bridge mode and per-client destination policy behavior
|
||||
|
||||
## 2026-03-31 - 1.17.1 - fix(readme)
|
||||
document per-transport metrics and handshake-driven WireGuard connection state
|
||||
|
||||
- Add README examples for getStatistics() per-transport active client and total connection counters
|
||||
- Clarify that WireGuard peers are marked connected only after a successful handshake and disconnect after idle timeout
|
||||
- Refresh API and project structure documentation to reflect newly documented stats fields and source files
|
||||
|
||||
## 2026-03-31 - 1.17.0 - feat(wireguard)
|
||||
track per-transport server statistics and make WireGuard clients active only after handshake
|
||||
|
||||
- add websocket, quic, and wireguard active-client and total-connection counters to server statistics
|
||||
- register WireGuard peers without marking them active until handshake/data is received, and remove them from active clients on expiration or idle timeout
|
||||
- sync WireGuard byte counters into aggregate server stats independently of active client presence and expose new statistics fields in TypeScript interfaces
|
||||
|
||||
## 2026-03-31 - 1.16.5 - fix(rust-userspace-nat)
|
||||
improve TCP session backpressure, buffering, and idle cleanup in userspace NAT
|
||||
|
||||
- apply proper bridge-channel backpressure by reserving channel capacity before consuming smoltcp TCP data
|
||||
- defer bridge sender initialization until the bridge task starts and track TCP session activity timestamps
|
||||
- cap per-session pending TCP send buffers at 512KB and abort stalled sessions when clients cannot keep up
|
||||
- add idle TCP session cleanup and switch NAT polling to a dynamic smoltcp-driven delay
|
||||
|
||||
## 2026-03-31 - 1.16.4 - fix(server)
|
||||
register preloaded WireGuard clients as peers on server startup
|
||||
|
||||
- Adds configured clients from the runtime registry to the WireGuard listener when the server starts.
|
||||
- Ensures clients loaded from config can complete WireGuard handshakes without requiring separate peer registration.
|
||||
- Logs a warning if automatic peer registration fails for an individual client.
|
||||
|
||||
## 2026-03-31 - 1.16.3 - fix(rust-nat)
|
||||
defer TCP bridge startup until handshake completion and buffer partial NAT socket writes
|
||||
|
||||
- Start TCP bridge tasks only after the smoltcp socket becomes active to prevent server data from arriving before the client handshake completes.
|
||||
- Buffer pending TCP payloads and flush partial writes so bridge-to-socket data is not silently lost under backpressure.
|
||||
- Keep closing TCP sessions alive until FIN processing completes and add logging for dropped packets when bridge or route channels are full.
|
||||
|
||||
## 2026-03-31 - 1.16.2 - fix(wireguard)
|
||||
sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key
|
||||
|
||||
- Register, remove, and rotate WireGuard peers in the running listener when clients are added, deleted, or rekeyed.
|
||||
- Generate client WireGuard configs with the public key derived from the configured WireGuard private key instead of reusing the generic server public key.
|
||||
- Handle expired WireGuard sessions by re-initiating handshakes and mark client state as handshaking until the tunnel becomes active.
|
||||
- Improve allowed IP matching and peer VPN IP extraction for runtime packet routing.
|
||||
|
||||
## 2026-03-30 - 1.16.1 - fix(rust/server)
|
||||
add serde alias for clientAllowedIPs in server config
|
||||
|
||||
- Accepts the camelCase clientAllowedIPs field when deserializing server configuration.
|
||||
- Improves compatibility with existing or external configuration formats without changing runtime behavior.
|
||||
|
||||
## 2026-03-30 - 1.16.0 - feat(server)
|
||||
add configurable client endpoint and allowed IPs for generated VPN configs
|
||||
|
||||
- adds serverEndpoint to generated SmartVPN and WireGuard client configs so remote clients can use a public address instead of the listen address
|
||||
- adds clientAllowedIPs to generated WireGuard configs to support full-tunnel or split-tunnel routing
|
||||
- updates TypeScript interfaces to expose the new server configuration options
|
||||
|
||||
## 2026-03-30 - 1.15.0 - feat(vpnserver)
|
||||
add nftables-backed destination policy enforcement for TUN mode
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartvpn",
|
||||
"version": "1.15.0",
|
||||
"version": "1.18.0",
|
||||
"private": false,
|
||||
"description": "A VPN solution with TypeScript control plane and Rust data plane daemon",
|
||||
"type": "module",
|
||||
|
||||
87
readme.md
87
readme.md
@@ -6,11 +6,13 @@ A high-performance VPN solution with a **TypeScript control plane** and a **Rust
|
||||
🚀 **Triple transport**: WebSocket (Cloudflare-friendly), raw **QUIC** (datagrams), and **WireGuard** (standard protocol)
|
||||
🛡️ **ACL engine** — deny-overrides-allow IP filtering, aligned with SmartProxy conventions
|
||||
🔀 **PROXY protocol v2** — real client IPs behind reverse proxies (HAProxy, SmartProxy, Cloudflare Spectrum)
|
||||
📊 **Adaptive QoS**: per-client rate limiting, priority queues, connection quality tracking
|
||||
📊 **Per-transport metrics**: active clients and total connections broken down by websocket, QUIC, and WireGuard
|
||||
🔄 **Hub API**: one `createClient()` call generates keys, assigns IP, returns both SmartVPN + WireGuard configs
|
||||
📡 **Real-time telemetry**: RTT, jitter, loss ratio, link health — all via typed APIs
|
||||
🌐 **Unified forwarding pipeline**: all transports share the same engine — TUN (kernel), userspace NAT (no root), or testing mode
|
||||
🌐 **Unified forwarding pipeline**: all transports share the same engine — TUN (kernel), userspace NAT (no root), L2 bridge, or testing mode
|
||||
🏠 **Bridge mode**: VPN clients get IPs from your LAN subnet — seamlessly bridge remote clients onto a physical network
|
||||
🎯 **Destination routing policy**: force-target, block, or allow traffic per destination with nftables integration
|
||||
⚡ **Handshake-driven WireGuard state**: peers appear as "connected" only after a successful WireGuard handshake, and auto-disconnect on idle timeout
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
@@ -83,7 +85,7 @@ await server.start({
|
||||
publicKey: '<server-noise-public-key-base64>',
|
||||
subnet: '10.8.0.0/24',
|
||||
transportMode: 'all', // WebSocket + QUIC + WireGuard simultaneously (default)
|
||||
forwardingMode: 'tun', // 'tun' (kernel), 'socket' (userspace NAT), or 'testing'
|
||||
forwardingMode: 'tun', // 'tun' | 'socket' | 'bridge' | 'testing'
|
||||
wgPrivateKey: '<server-wg-private-key-base64>', // required for WireGuard transport
|
||||
enableNat: true,
|
||||
dns: ['1.1.1.1', '8.8.8.8'],
|
||||
@@ -140,6 +142,30 @@ Every client authenticates with a **Noise IK handshake** (`Noise_IK_25519_ChaCha
|
||||
|
||||
The server runs **all three simultaneously** by default with `transportMode: 'all'`. All transports share the same unified forwarding pipeline (`ForwardingEngine`), IP pool, client registry, and stats — so WireGuard peers get the same userspace NAT, rate limiting, and monitoring as WS/QUIC clients. Clients auto-negotiate with `transport: 'auto'` (tries QUIC first, falls back to WS).
|
||||
|
||||
### 📊 Per-Transport Metrics
|
||||
|
||||
Server statistics include per-transport breakdowns so you can see exactly how many clients use each protocol:
|
||||
|
||||
```typescript
|
||||
const stats = await server.getStatistics();
|
||||
|
||||
// Aggregate
|
||||
console.log(stats.activeClients); // total connected clients
|
||||
console.log(stats.totalConnections); // total connections since start
|
||||
|
||||
// Per-transport active clients
|
||||
console.log(stats.activeClientsWebsocket); // currently connected via WS
|
||||
console.log(stats.activeClientsQuic); // currently connected via QUIC
|
||||
console.log(stats.activeClientsWireguard); // currently connected via WireGuard
|
||||
|
||||
// Per-transport total connections
|
||||
console.log(stats.totalConnectionsWebsocket);
|
||||
console.log(stats.totalConnectionsQuic);
|
||||
console.log(stats.totalConnectionsWireguard);
|
||||
```
|
||||
|
||||
**WireGuard connection state is handshake-driven** — registered WireGuard peers do NOT appear as "connected" until their first successful WireGuard handshake completes. They automatically disconnect after 180 seconds of inactivity or when boringtun reports `ConnectionExpired`. This matches how WebSocket/QUIC clients behave: they appear on connection and disappear on disconnect.
|
||||
|
||||
### 🛡️ ACL Engine (SmartProxy-Aligned)
|
||||
|
||||
Security policies per client, using the same `ipAllowList` / `ipBlockList` naming convention as `@push.rocks/smartproxy`:
|
||||
@@ -212,6 +238,21 @@ In **TUN mode**, destination policies are enforced via **nftables** rules (using
|
||||
|
||||
In **socket mode**, the policy is evaluated in the userspace NAT engine before per-client ACLs.
|
||||
|
||||
**Per-client override** — individual clients can have their own destination policy that overrides the server-level default:
|
||||
|
||||
```typescript
|
||||
await server.createClient({
|
||||
clientId: 'restricted-client',
|
||||
security: {
|
||||
destinationPolicy: {
|
||||
default: 'block', // block everything by default
|
||||
allowList: ['10.0.0.0/8'], // except internal network
|
||||
},
|
||||
// ... other security settings
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
### 🔗 Socket Forward Proxy Protocol
|
||||
|
||||
When using `forwardingMode: 'socket'` (userspace NAT), you can prepend **PROXY protocol v2 headers** on outbound TCP connections. This conveys the VPN client's tunnel IP as the source address to downstream services (e.g., SmartProxy):
|
||||
@@ -226,12 +267,13 @@ await server.start({
|
||||
|
||||
### 📦 Packet Forwarding Modes
|
||||
|
||||
SmartVPN supports three forwarding modes, configurable per-server and per-client:
|
||||
SmartVPN supports four forwarding modes, configurable per-server and per-client:
|
||||
|
||||
| Mode | Flag | Description | Root Required |
|
||||
|------|------|-------------|---------------|
|
||||
| **TUN** | `'tun'` | Kernel TUN device — real packet forwarding with system routing | ✅ Yes |
|
||||
| **Userspace NAT** | `'socket'` | Userspace TCP/UDP proxy via `connect(2)` — no TUN, no root needed | ❌ No |
|
||||
| **Bridge** | `'bridge'` | L2 bridge — VPN clients get IPs from a physical LAN subnet | ✅ Yes |
|
||||
| **Testing** | `'testing'` | Monitoring only — packets are counted but not forwarded | ❌ No |
|
||||
|
||||
```typescript
|
||||
@@ -242,6 +284,16 @@ await server.start({
|
||||
enableNat: true,
|
||||
});
|
||||
|
||||
// Server with bridge mode — VPN clients appear on the LAN
|
||||
await server.start({
|
||||
// ...
|
||||
forwardingMode: 'bridge',
|
||||
bridgeLanSubnet: '192.168.1.0/24', // LAN subnet to bridge into
|
||||
bridgePhysicalInterface: 'eth0', // auto-detected if omitted
|
||||
bridgeIpRangeStart: 200, // clients get .200–.250 (defaults)
|
||||
bridgeIpRangeEnd: 250,
|
||||
});
|
||||
|
||||
// Client with TUN device
|
||||
const { assignedIp } = await client.connect({
|
||||
// ...
|
||||
@@ -249,15 +301,18 @@ const { assignedIp } = await client.connect({
|
||||
});
|
||||
```
|
||||
|
||||
The userspace NAT mode extracts destination IP/port from IP packets, opens a real socket to the destination, and relays data — supporting both TCP streams and UDP datagrams without requiring `CAP_NET_ADMIN` or root privileges.
|
||||
The **userspace NAT** mode extracts destination IP/port from IP packets, opens a real socket to the destination, and relays data — supporting both TCP streams and UDP datagrams without requiring `CAP_NET_ADMIN` or root privileges.
|
||||
|
||||
The **bridge** mode assigns VPN clients IPs from a real LAN subnet instead of a virtual VPN subnet. Clients appear as if they're directly on the physical network — perfect for remote access to home labs, office networks, or IoT devices.
|
||||
|
||||
### 📊 Telemetry & QoS
|
||||
|
||||
- **Connection quality**: Smoothed RTT, jitter, min/max RTT, loss ratio, link health (`healthy` / `degraded` / `critical`)
|
||||
- **Adaptive keepalives**: Interval adjusts based on link health (60s → 30s → 10s)
|
||||
- **Per-client rate limiting**: Token bucket with configurable bytes/sec and burst
|
||||
- **Dead-peer detection**: 180s inactivity timeout
|
||||
- **Dead-peer detection**: 180s inactivity timeout (all transports)
|
||||
- **MTU management**: Automatic overhead calculation (IP+TCP+WS+Noise = 79 bytes)
|
||||
- **Per-transport stats**: Active client and total connection counts broken down by websocket, QUIC, and WireGuard
|
||||
|
||||
### 🏷️ Client Tags (Trusted vs Informational)
|
||||
|
||||
@@ -418,13 +473,14 @@ server.on('reconnected', () => { /* socket transport reconnected */ });
|
||||
|
||||
| Interface | Purpose |
|
||||
|-----------|---------|
|
||||
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode, clients, proxy protocol, destination policy) |
|
||||
| `IVpnServerConfig` | Server configuration (listen addr, keys, subnet, transport mode, forwarding mode incl. bridge, clients, proxy protocol, destination policy) |
|
||||
| `IVpnClientConfig` | Client configuration (server URL, keys, transport, forwarding mode, WG options, client-defined tags) |
|
||||
| `IClientEntry` | Server-side client definition (ID, keys, security, priority, server/client tags, expiry) |
|
||||
| `IClientSecurity` | Per-client ACLs and rate limits (SmartProxy-aligned naming) |
|
||||
| `IClientSecurity` | Per-client ACLs, rate limits, and destination policy override (SmartProxy-aligned naming) |
|
||||
| `IClientRateLimit` | Rate limiting config (bytesPerSec, burstBytes) |
|
||||
| `IClientConfigBundle` | Full config bundle returned by `createClient()` — includes SmartVPN config, WireGuard .conf, and secrets |
|
||||
| `IVpnClientInfo` | Connected client info (IP, stats, authenticated key, remote addr, transport type) |
|
||||
| `IVpnServerStatistics` | Server stats with per-transport breakdowns (activeClientsWebsocket/Quic/Wireguard, totalConnections*) |
|
||||
| `IVpnConnectionQuality` | RTT, jitter, loss ratio, link health |
|
||||
| `IVpnMtuInfo` | TUN MTU, effective MTU, overhead bytes, oversized packet stats |
|
||||
| `IVpnKeypair` | Base64-encoded public/private key pair |
|
||||
@@ -443,7 +499,7 @@ server.on('reconnected', () => { /* socket transport reconnected */ });
|
||||
| `exportClientConfig` | Re-export as SmartVPN config or WireGuard `.conf` |
|
||||
| `listClients` / `disconnectClient` | Manage live connections |
|
||||
| `setClientRateLimit` / `removeClientRateLimit` | Runtime rate limit adjustments |
|
||||
| `getStatus` / `getStatistics` / `getClientTelemetry` | Monitoring |
|
||||
| `getStatus` / `getStatistics` / `getClientTelemetry` | Monitoring (stats include per-transport breakdowns) |
|
||||
| `generateKeypair` / `generateWgKeypair` / `generateClientKeypair` | Key generation |
|
||||
| `addWgPeer` / `removeWgPeer` / `listWgPeers` | WireGuard peer management |
|
||||
|
||||
@@ -541,6 +597,7 @@ smartvpn/
|
||||
│ ├── index.ts # All exports
|
||||
│ ├── smartvpn.interfaces.ts # Interfaces, types, IPC command maps
|
||||
│ ├── smartvpn.plugins.ts # Dependency imports
|
||||
│ ├── smartvpn.paths.ts # Binary path resolution
|
||||
│ ├── smartvpn.classes.vpnserver.ts
|
||||
│ ├── smartvpn.classes.vpnclient.ts
|
||||
│ ├── smartvpn.classes.vpnbridge.ts
|
||||
@@ -558,13 +615,19 @@ smartvpn/
|
||||
│ ├── proxy_protocol.rs # PROXY protocol v2 parser
|
||||
│ ├── management.rs # JSON-lines IPC
|
||||
│ ├── transport.rs # WebSocket transport
|
||||
│ ├── transport_trait.rs # Transport abstraction (Sink/Stream)
|
||||
│ ├── quic_transport.rs # QUIC transport
|
||||
│ ├── wireguard.rs # WireGuard (boringtun)
|
||||
│ ├── codec.rs # Binary frame protocol
|
||||
│ ├── keepalive.rs # Adaptive keepalives
|
||||
│ ├── ratelimit.rs # Token bucket
|
||||
│ ├── userspace_nat.rs # Userspace TCP/UDP NAT proxy
|
||||
│ └── ... # tunnel, network, telemetry, qos, mtu, reconnect
|
||||
│ ├── tunnel.rs # TUN device management
|
||||
│ ├── network.rs # IP pool + networking
|
||||
│ ├── telemetry.rs # RTT/jitter/loss tracking
|
||||
│ ├── qos.rs # Priority queues + smart dropping
|
||||
│ ├── mtu.rs # MTU + ICMP too-big
|
||||
│ └── reconnect.rs # Exponential backoff + session tokens
|
||||
├── test/ # Test files
|
||||
├── dist_ts/ # Compiled TypeScript
|
||||
└── dist_rust/ # Cross-compiled binaries (linux amd64 + arm64)
|
||||
@@ -572,7 +635,7 @@ smartvpn/
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license.md) file.
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
|
||||
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||
|
||||
@@ -584,7 +647,7 @@ Use of these trademarks must comply with Task Venture Capital GmbH's Trademark G
|
||||
|
||||
### Company Information
|
||||
|
||||
Task Venture Capital GmbH
|
||||
Task Venture Capital GmbH
|
||||
Registered at District Court Bremen HRB 35230 HB, Germany
|
||||
|
||||
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
||||
|
||||
@@ -164,6 +164,7 @@ mod tests {
|
||||
destination_block_list: dst_block.map(|v| v.into_iter().map(String::from).collect()),
|
||||
max_connections: None,
|
||||
rate_limit: None,
|
||||
destination_policy: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
352
rust/src/bridge.rs
Normal file
352
rust/src/bridge.rs
Normal file
@@ -0,0 +1,352 @@
|
||||
//! L2 Bridge forwarding engine.
|
||||
//!
|
||||
//! Provides server-side bridging: receives L3 IP packets from VPN clients,
|
||||
//! wraps them in Ethernet frames, and injects them into a Linux bridge
|
||||
//! connected to the host's physical network interface.
|
||||
//!
|
||||
//! Return traffic from the bridge is stripped of its Ethernet header and
|
||||
//! routed back to VPN clients via `tun_routes`.
|
||||
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::server::ServerState;
|
||||
|
||||
/// Configuration for the bridge forwarding engine.
|
||||
pub struct BridgeConfig {
|
||||
/// TAP device name (e.g., "svpn_tap0")
|
||||
pub tap_name: String,
|
||||
/// Linux bridge name (e.g., "svpn_br0")
|
||||
pub bridge_name: String,
|
||||
/// Physical interface to bridge (e.g., "eth0")
|
||||
pub physical_interface: String,
|
||||
/// Gateway IP on the bridge (host's LAN IP)
|
||||
pub gateway_ip: Ipv4Addr,
|
||||
/// Subnet prefix length (e.g., 24)
|
||||
pub prefix_len: u8,
|
||||
/// MTU for the TAP device
|
||||
pub mtu: u16,
|
||||
}
|
||||
|
||||
/// Ethernet frame constants
|
||||
const ETH_HEADER_LEN: usize = 14;
|
||||
const ETH_TYPE_IPV4: [u8; 2] = [0x08, 0x00];
|
||||
const ETH_TYPE_ARP: [u8; 2] = [0x08, 0x06];
|
||||
const BROADCAST_MAC: [u8; 6] = [0xff; 6];
|
||||
|
||||
/// Generate a deterministic locally-administered MAC from an IPv4 address.
|
||||
/// Uses prefix 02:53:56 (locally administered, "SVP" in hex-ish).
|
||||
fn mac_from_ip(ip: Ipv4Addr) -> [u8; 6] {
|
||||
let octets = ip.octets();
|
||||
[0x02, 0x53, 0x56, octets[1], octets[2], octets[3]]
|
||||
}
|
||||
|
||||
/// Wrap an IP packet in an Ethernet frame.
|
||||
fn wrap_in_ethernet(ip_packet: &[u8], src_mac: [u8; 6], dst_mac: [u8; 6]) -> Vec<u8> {
|
||||
let mut frame = Vec::with_capacity(ETH_HEADER_LEN + ip_packet.len());
|
||||
frame.extend_from_slice(&dst_mac);
|
||||
frame.extend_from_slice(&src_mac);
|
||||
frame.extend_from_slice(Ð_TYPE_IPV4);
|
||||
frame.extend_from_slice(ip_packet);
|
||||
frame
|
||||
}
|
||||
|
||||
/// Extract the EtherType and payload from an Ethernet frame.
|
||||
fn unwrap_ethernet(frame: &[u8]) -> Option<([u8; 2], &[u8])> {
|
||||
if frame.len() < ETH_HEADER_LEN {
|
||||
return None;
|
||||
}
|
||||
let ether_type = [frame[12], frame[13]];
|
||||
Some((ether_type, &frame[ETH_HEADER_LEN..]))
|
||||
}
|
||||
|
||||
/// Extract destination IPv4 from a raw IP packet header.
|
||||
fn dst_ip_from_packet(packet: &[u8]) -> Option<Ipv4Addr> {
|
||||
if packet.len() < 20 {
|
||||
return None;
|
||||
}
|
||||
// Version must be 4
|
||||
if (packet[0] >> 4) != 4 {
|
||||
return None;
|
||||
}
|
||||
Some(Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]))
|
||||
}
|
||||
|
||||
/// Extract source IPv4 from a raw IP packet header.
|
||||
fn src_ip_from_packet(packet: &[u8]) -> Option<Ipv4Addr> {
|
||||
if packet.len() < 20 {
|
||||
return None;
|
||||
}
|
||||
if (packet[0] >> 4) != 4 {
|
||||
return None;
|
||||
}
|
||||
Some(Ipv4Addr::new(packet[12], packet[13], packet[14], packet[15]))
|
||||
}
|
||||
|
||||
/// Build a gratuitous ARP announcement frame.
|
||||
fn build_garp(ip: Ipv4Addr, mac: [u8; 6]) -> Vec<u8> {
|
||||
let ip_bytes = ip.octets();
|
||||
let mut frame = Vec::with_capacity(42); // 14 eth + 28 ARP
|
||||
// Ethernet header
|
||||
frame.extend_from_slice(&BROADCAST_MAC); // dst: broadcast
|
||||
frame.extend_from_slice(&mac); // src: our MAC
|
||||
frame.extend_from_slice(Ð_TYPE_ARP); // EtherType: ARP
|
||||
// ARP payload
|
||||
frame.extend_from_slice(&[0x00, 0x01]); // Hardware type: Ethernet
|
||||
frame.extend_from_slice(&[0x08, 0x00]); // Protocol type: IPv4
|
||||
frame.push(6); // Hardware addr len
|
||||
frame.push(4); // Protocol addr len
|
||||
frame.extend_from_slice(&[0x00, 0x01]); // Operation: ARP Request (GARP uses request)
|
||||
frame.extend_from_slice(&mac); // Sender hardware addr
|
||||
frame.extend_from_slice(&ip_bytes); // Sender protocol addr
|
||||
frame.extend_from_slice(&[0x00; 6]); // Target hardware addr (ignored in GARP)
|
||||
frame.extend_from_slice(&ip_bytes); // Target protocol addr (same as sender for GARP)
|
||||
frame
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Linux bridge management (ip commands)
|
||||
// ============================================================================
|
||||
|
||||
async fn run_ip_cmd(args: &[&str]) -> Result<String> {
|
||||
let output = tokio::process::Command::new("ip")
|
||||
.args(args)
|
||||
.output()
|
||||
.await?;
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
anyhow::bail!("ip {} failed: {}", args.join(" "), stderr.trim());
|
||||
}
|
||||
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||
}
|
||||
|
||||
/// Create a Linux bridge interface.
|
||||
pub async fn create_bridge(name: &str) -> Result<()> {
|
||||
run_ip_cmd(&["link", "add", name, "type", "bridge"]).await?;
|
||||
info!("Created bridge {}", name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add an interface to a bridge.
|
||||
pub async fn bridge_add_interface(bridge: &str, iface: &str) -> Result<()> {
|
||||
run_ip_cmd(&["link", "set", iface, "master", bridge]).await?;
|
||||
info!("Added {} to bridge {}", iface, bridge);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bring an interface up.
|
||||
pub async fn set_interface_up(iface: &str) -> Result<()> {
|
||||
run_ip_cmd(&["link", "set", iface, "up"]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove a bridge interface.
|
||||
pub async fn remove_bridge(name: &str) -> Result<()> {
|
||||
// First bring it down, ignore errors
|
||||
let _ = run_ip_cmd(&["link", "set", name, "down"]).await;
|
||||
run_ip_cmd(&["link", "del", name]).await?;
|
||||
info!("Removed bridge {}", name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Detect the default network interface from the routing table.
|
||||
pub async fn detect_default_interface() -> Result<String> {
|
||||
let output = run_ip_cmd(&["route", "show", "default"]).await?;
|
||||
// Format: "default via X.X.X.X dev IFACE ..."
|
||||
let parts: Vec<&str> = output.split_whitespace().collect();
|
||||
if let Some(idx) = parts.iter().position(|&s| s == "dev") {
|
||||
if let Some(iface) = parts.get(idx + 1) {
|
||||
return Ok(iface.to_string());
|
||||
}
|
||||
}
|
||||
anyhow::bail!("Could not detect default network interface from route table");
|
||||
}
|
||||
|
||||
/// Get the IP address and prefix length of a network interface.
|
||||
pub async fn get_interface_ip(iface: &str) -> Result<(Ipv4Addr, u8)> {
|
||||
let output = run_ip_cmd(&["-4", "addr", "show", "dev", iface]).await?;
|
||||
// Parse "inet X.X.X.X/NN" from output
|
||||
for line in output.lines() {
|
||||
let trimmed = line.trim();
|
||||
if let Some(rest) = trimmed.strip_prefix("inet ") {
|
||||
let addr_cidr = rest.split_whitespace().next().unwrap_or("");
|
||||
let parts: Vec<&str> = addr_cidr.split('/').collect();
|
||||
if parts.len() == 2 {
|
||||
let ip: Ipv4Addr = parts[0].parse()?;
|
||||
let prefix: u8 = parts[1].parse()?;
|
||||
return Ok((ip, prefix));
|
||||
}
|
||||
}
|
||||
}
|
||||
anyhow::bail!("Could not find IPv4 address on interface {}", iface);
|
||||
}
|
||||
|
||||
/// Migrate the host's IP from a physical interface to a bridge.
|
||||
/// This is the most delicate operation — briefly interrupts connectivity.
|
||||
pub async fn migrate_host_ip_to_bridge(
|
||||
physical_iface: &str,
|
||||
bridge: &str,
|
||||
ip: Ipv4Addr,
|
||||
prefix: u8,
|
||||
) -> Result<()> {
|
||||
let cidr = format!("{}/{}", ip, prefix);
|
||||
// Remove IP from physical interface
|
||||
let _ = run_ip_cmd(&["addr", "del", &cidr, "dev", physical_iface]).await;
|
||||
// Add IP to bridge
|
||||
run_ip_cmd(&["addr", "add", &cidr, "dev", bridge]).await?;
|
||||
info!("Migrated IP {} from {} to {}", cidr, physical_iface, bridge);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Restore the host's IP from bridge back to the physical interface.
|
||||
pub async fn restore_host_ip(
|
||||
physical_iface: &str,
|
||||
bridge: &str,
|
||||
ip: Ipv4Addr,
|
||||
prefix: u8,
|
||||
) -> Result<()> {
|
||||
let cidr = format!("{}/{}", ip, prefix);
|
||||
let _ = run_ip_cmd(&["addr", "del", &cidr, "dev", bridge]).await;
|
||||
run_ip_cmd(&["addr", "add", &cidr, "dev", physical_iface]).await?;
|
||||
info!("Restored IP {} to {}", cidr, physical_iface);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enable proxy ARP on an interface via sysctl.
|
||||
pub async fn enable_proxy_arp(iface: &str) -> Result<()> {
|
||||
let path = format!("/proc/sys/net/ipv4/conf/{}/proxy_arp", iface);
|
||||
tokio::fs::write(&path, "1").await?;
|
||||
info!("Enabled proxy_arp on {}", iface);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a TAP device (L2) using the tun crate.
|
||||
pub fn create_tap(name: &str, mtu: u16) -> Result<tun::AsyncDevice> {
|
||||
let mut config = tun::Configuration::default();
|
||||
config
|
||||
.tun_name(name)
|
||||
.layer(tun::Layer::L2)
|
||||
.mtu(mtu)
|
||||
.up();
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
config.platform_config(|p| {
|
||||
p.ensure_root_privileges(true);
|
||||
});
|
||||
|
||||
let device = tun::create_as_async(&config)?;
|
||||
info!("TAP device {} created (L2, mtu={})", name, mtu);
|
||||
Ok(device)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// BridgeEngine — main event loop
|
||||
// ============================================================================
|
||||
|
||||
/// The BridgeEngine wraps/unwraps Ethernet frames and bridges VPN traffic
|
||||
/// to the host's physical LAN via a Linux bridge + TAP device.
|
||||
pub struct BridgeEngine {
|
||||
state: Arc<ServerState>,
|
||||
/// Learned MAC addresses for LAN peers (dst IP → MAC).
|
||||
/// Populated from ARP replies and Ethernet frame src MACs.
|
||||
arp_cache: HashMap<Ipv4Addr, [u8; 6]>,
|
||||
}
|
||||
|
||||
impl BridgeEngine {
|
||||
pub fn new(state: Arc<ServerState>) -> Self {
|
||||
Self {
|
||||
state,
|
||||
arp_cache: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the bridge engine event loop.
|
||||
/// Receives L3 IP packets from VPN clients, wraps in Ethernet, writes to TAP.
|
||||
/// Reads Ethernet frames from TAP, strips header, routes back to VPN clients.
|
||||
pub async fn run(
|
||||
mut self,
|
||||
mut tap_device: tun::AsyncDevice,
|
||||
mut packet_rx: mpsc::Receiver<Vec<u8>>,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
let mut buf = vec![0u8; 2048];
|
||||
|
||||
info!("BridgeEngine started");
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Packet from VPN client → wrap in Ethernet → write to TAP
|
||||
Some(ip_packet) = packet_rx.recv() => {
|
||||
if let Some(dst_ip) = dst_ip_from_packet(&ip_packet) {
|
||||
let src_ip = src_ip_from_packet(&ip_packet).unwrap_or(Ipv4Addr::UNSPECIFIED);
|
||||
let src_mac = mac_from_ip(src_ip);
|
||||
let dst_mac = self.arp_cache.get(&dst_ip)
|
||||
.copied()
|
||||
.unwrap_or(BROADCAST_MAC);
|
||||
let frame = wrap_in_ethernet(&ip_packet, src_mac, dst_mac);
|
||||
if let Err(e) = tap_device.write_all(&frame).await {
|
||||
warn!("TAP write error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Frame from TAP (LAN) → strip Ethernet → route to VPN client
|
||||
result = tap_device.read(&mut buf) => {
|
||||
match result {
|
||||
Ok(len) if len >= ETH_HEADER_LEN => {
|
||||
let frame = &buf[..len];
|
||||
|
||||
// Learn src MAC from incoming frames
|
||||
if let Some((ether_type, payload)) = unwrap_ethernet(frame) {
|
||||
// Learn ARP cache from src MAC + src IP
|
||||
let src_mac: [u8; 6] = frame[6..12].try_into().unwrap_or([0; 6]);
|
||||
if ether_type == ETH_TYPE_IPV4 {
|
||||
if let Some(src_ip) = src_ip_from_packet(payload) {
|
||||
self.arp_cache.insert(src_ip, src_mac);
|
||||
}
|
||||
}
|
||||
|
||||
// Only forward IPv4 packets to VPN clients
|
||||
if ether_type == ETH_TYPE_IPV4 {
|
||||
if let Some(dst_ip) = dst_ip_from_packet(payload) {
|
||||
// Look up VPN client by dst IP in tun_routes
|
||||
let routes = self.state.tun_routes.read().await;
|
||||
if let Some(sender) = routes.get(&dst_ip) {
|
||||
let _ = sender.try_send(payload.to_vec());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {} // Frame too short, ignore
|
||||
Err(e) => {
|
||||
warn!("TAP read error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("BridgeEngine shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a gratuitous ARP for a VPN client IP.
|
||||
pub async fn announce_client(tap: &mut tun::AsyncDevice, ip: Ipv4Addr) -> Result<()> {
|
||||
let mac = mac_from_ip(ip);
|
||||
let garp = build_garp(ip, mac);
|
||||
tap.write_all(&garp).await?;
|
||||
debug!("Sent GARP for {} (MAC {:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x})",
|
||||
ip, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,9 @@ pub struct ClientSecurity {
|
||||
pub max_connections: Option<u32>,
|
||||
/// Per-client rate limiting.
|
||||
pub rate_limit: Option<ClientRateLimit>,
|
||||
/// Per-client destination routing policy override.
|
||||
/// When set, overrides the server-level DestinationPolicy for this client's traffic.
|
||||
pub destination_policy: Option<crate::server::DestinationPolicyConfig>,
|
||||
}
|
||||
|
||||
/// A registered client entry — the server-side source of truth.
|
||||
@@ -76,12 +79,14 @@ impl ClientEntry {
|
||||
}
|
||||
}
|
||||
|
||||
/// In-memory client registry with dual-key indexing.
|
||||
/// In-memory client registry with triple-key indexing.
|
||||
pub struct ClientRegistry {
|
||||
/// Primary index: clientId → ClientEntry
|
||||
entries: HashMap<String, ClientEntry>,
|
||||
/// Secondary index: publicKey (base64) → clientId (fast lookup during handshake)
|
||||
key_index: HashMap<String, String>,
|
||||
/// Tertiary index: assignedIp → clientId (fast lookup during NAT destination policy)
|
||||
ip_index: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl ClientRegistry {
|
||||
@@ -89,6 +94,7 @@ impl ClientRegistry {
|
||||
Self {
|
||||
entries: HashMap::new(),
|
||||
key_index: HashMap::new(),
|
||||
ip_index: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +120,9 @@ impl ClientRegistry {
|
||||
anyhow::bail!("Public key already registered to another client");
|
||||
}
|
||||
self.key_index.insert(entry.public_key.clone(), entry.client_id.clone());
|
||||
if let Some(ref ip) = entry.assigned_ip {
|
||||
self.ip_index.insert(ip.clone(), entry.client_id.clone());
|
||||
}
|
||||
self.entries.insert(entry.client_id.clone(), entry);
|
||||
Ok(())
|
||||
}
|
||||
@@ -123,6 +132,9 @@ impl ClientRegistry {
|
||||
let entry = self.entries.remove(client_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Client '{}' not found", client_id))?;
|
||||
self.key_index.remove(&entry.public_key);
|
||||
if let Some(ref ip) = entry.assigned_ip {
|
||||
self.ip_index.remove(ip);
|
||||
}
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
@@ -137,6 +149,12 @@ impl ClientRegistry {
|
||||
self.entries.get(client_id)
|
||||
}
|
||||
|
||||
/// Get a client by assigned IP (used for per-client destination policy in NAT engine).
|
||||
pub fn get_by_assigned_ip(&self, ip: &str) -> Option<&ClientEntry> {
|
||||
let client_id = self.ip_index.get(ip)?;
|
||||
self.entries.get(client_id)
|
||||
}
|
||||
|
||||
/// Check if a public key is authorized (exists, enabled, not expired).
|
||||
pub fn is_authorized(&self, public_key: &str) -> bool {
|
||||
match self.get_by_key(public_key) {
|
||||
@@ -153,12 +171,22 @@ impl ClientRegistry {
|
||||
let entry = self.entries.get_mut(client_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Client '{}' not found", client_id))?;
|
||||
let old_key = entry.public_key.clone();
|
||||
let old_ip = entry.assigned_ip.clone();
|
||||
updater(entry);
|
||||
// If public key changed, update the index
|
||||
// If public key changed, update the key index
|
||||
if entry.public_key != old_key {
|
||||
self.key_index.remove(&old_key);
|
||||
self.key_index.insert(entry.public_key.clone(), client_id.to_string());
|
||||
}
|
||||
// If assigned IP changed, update the IP index
|
||||
if entry.assigned_ip != old_ip {
|
||||
if let Some(ref old) = old_ip {
|
||||
self.ip_index.remove(old);
|
||||
}
|
||||
if let Some(ref new_ip) = entry.assigned_ip {
|
||||
self.ip_index.insert(new_ip.clone(), client_id.to_string());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -362,6 +390,7 @@ mod tests {
|
||||
bytes_per_sec: 1_000_000,
|
||||
burst_bytes: 2_000_000,
|
||||
}),
|
||||
destination_policy: None,
|
||||
});
|
||||
let mut reg = ClientRegistry::new();
|
||||
reg.add(entry).unwrap();
|
||||
|
||||
@@ -22,3 +22,4 @@ pub mod client_registry;
|
||||
pub mod acl;
|
||||
pub mod proxy_protocol;
|
||||
pub mod userspace_nat;
|
||||
pub mod bridge;
|
||||
|
||||
@@ -13,6 +13,10 @@ pub struct IpPool {
|
||||
allocated: HashMap<Ipv4Addr, String>,
|
||||
/// Next candidate offset (skipping .0 network and .1 gateway)
|
||||
next_offset: u32,
|
||||
/// Minimum allocation offset (inclusive). Default: 2 (skip .0 network and .1 gateway).
|
||||
min_offset: u32,
|
||||
/// Maximum allocation offset (exclusive). Default: broadcast offset.
|
||||
max_offset: u32,
|
||||
}
|
||||
|
||||
impl IpPool {
|
||||
@@ -28,11 +32,47 @@ impl IpPool {
|
||||
anyhow::bail!("Prefix too long for VPN pool: /{}", prefix_len);
|
||||
}
|
||||
|
||||
let host_bits = 32 - prefix_len as u32;
|
||||
let max_offset = (1u32 << host_bits) - 1; // broadcast offset
|
||||
|
||||
Ok(Self {
|
||||
network,
|
||||
prefix_len,
|
||||
allocated: HashMap::new(),
|
||||
next_offset: 2, // Skip .0 (network) and .1 (server/gateway)
|
||||
min_offset: 2,
|
||||
max_offset,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new IP pool with a restricted allocation range within the subnet.
|
||||
/// `range_start` and `range_end` are host offsets (e.g., 200 and 250 for .200-.250).
|
||||
pub fn new_with_range(subnet: &str, range_start: u32, range_end: u32) -> Result<Self> {
|
||||
let parts: Vec<&str> = subnet.split('/').collect();
|
||||
if parts.len() != 2 {
|
||||
anyhow::bail!("Invalid subnet format: {}", subnet);
|
||||
}
|
||||
let network: Ipv4Addr = parts[0].parse()?;
|
||||
let prefix_len: u8 = parts[1].parse()?;
|
||||
if prefix_len > 30 {
|
||||
anyhow::bail!("Prefix too long for VPN pool: /{}", prefix_len);
|
||||
}
|
||||
if range_start >= range_end {
|
||||
anyhow::bail!("Invalid IP range: start ({}) must be less than end ({})", range_start, range_end);
|
||||
}
|
||||
let host_bits = 32 - prefix_len as u32;
|
||||
let broadcast_offset = (1u32 << host_bits) - 1;
|
||||
if range_end > broadcast_offset {
|
||||
anyhow::bail!("IP range end ({}) exceeds subnet broadcast ({})", range_end, broadcast_offset);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
network,
|
||||
prefix_len,
|
||||
allocated: HashMap::new(),
|
||||
next_offset: range_start,
|
||||
min_offset: range_start,
|
||||
max_offset: range_end + 1, // exclusive
|
||||
})
|
||||
}
|
||||
|
||||
@@ -44,22 +84,17 @@ impl IpPool {
|
||||
|
||||
/// Total number of usable client addresses in the pool.
|
||||
pub fn capacity(&self) -> u32 {
|
||||
let host_bits = 32 - self.prefix_len as u32;
|
||||
let total = 1u32 << host_bits;
|
||||
total.saturating_sub(3) // minus network, gateway, broadcast
|
||||
self.max_offset.saturating_sub(self.min_offset)
|
||||
}
|
||||
|
||||
/// Allocate an IP for a client. Returns the assigned IP.
|
||||
pub fn allocate(&mut self, client_id: &str) -> Result<Ipv4Addr> {
|
||||
let host_bits = 32 - self.prefix_len as u32;
|
||||
let max_offset = (1u32 << host_bits) - 1; // broadcast offset
|
||||
|
||||
// Try to find a free IP starting from next_offset
|
||||
let start = self.next_offset;
|
||||
let mut offset = start;
|
||||
loop {
|
||||
if offset >= max_offset {
|
||||
offset = 2; // wrap around
|
||||
if offset >= self.max_offset {
|
||||
offset = self.min_offset; // wrap around
|
||||
}
|
||||
|
||||
let ip = Ipv4Addr::from(u32::from(self.network) + offset);
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{mpsc, Mutex, RwLock};
|
||||
use tracing::{info, error, warn};
|
||||
use tracing::{debug, info, error, warn};
|
||||
|
||||
use crate::acl;
|
||||
use crate::client_registry::{ClientEntry, ClientRegistry};
|
||||
@@ -25,7 +25,7 @@ use crate::tunnel::{self, TunConfig};
|
||||
const DEAD_PEER_TIMEOUT: Duration = Duration::from_secs(180);
|
||||
|
||||
/// Destination routing policy for VPN client traffic.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DestinationPolicyConfig {
|
||||
/// Default action: "forceTarget", "block", or "allow".
|
||||
@@ -84,6 +84,25 @@ pub struct ServerConfig {
|
||||
pub wg_listen_port: Option<u16>,
|
||||
/// WireGuard: pre-configured peers.
|
||||
pub wg_peers: Option<Vec<crate::wireguard::WgPeerConfig>>,
|
||||
/// Public endpoint address for generated client configs (e.g. "vpn.example.com:51820").
|
||||
/// Used as WireGuard `Endpoint` and SmartVPN `serverUrl` host.
|
||||
/// Defaults to listen_addr.
|
||||
pub server_endpoint: Option<String>,
|
||||
/// AllowedIPs for generated WireGuard client configs.
|
||||
/// Defaults to ["0.0.0.0/0"] (full tunnel).
|
||||
#[serde(alias = "clientAllowedIPs")]
|
||||
pub client_allowed_ips: Option<Vec<String>>,
|
||||
|
||||
// Bridge mode configuration (forwarding_mode: "bridge")
|
||||
|
||||
/// LAN subnet CIDR for bridge mode (e.g. "192.168.1.0/24").
|
||||
pub bridge_lan_subnet: Option<String>,
|
||||
/// Physical network interface to bridge (e.g. "eth0"). Auto-detected if omitted.
|
||||
pub bridge_physical_interface: Option<String>,
|
||||
/// Start of VPN client IP range within the LAN subnet (host offset, e.g. 200).
|
||||
pub bridge_ip_range_start: Option<u32>,
|
||||
/// End of VPN client IP range within the LAN subnet (host offset, e.g. 250).
|
||||
pub bridge_ip_range_end: Option<u32>,
|
||||
}
|
||||
|
||||
/// Information about a connected client.
|
||||
@@ -124,6 +143,14 @@ pub struct ServerStatistics {
|
||||
pub uptime_seconds: u64,
|
||||
pub active_clients: u64,
|
||||
pub total_connections: u64,
|
||||
/// Per-transport active client counts.
|
||||
pub active_clients_websocket: u64,
|
||||
pub active_clients_quic: u64,
|
||||
pub active_clients_wireguard: u64,
|
||||
/// Per-transport total connection counts.
|
||||
pub total_connections_websocket: u64,
|
||||
pub total_connections_quic: u64,
|
||||
pub total_connections_wireguard: u64,
|
||||
}
|
||||
|
||||
/// The forwarding engine determines how decrypted IP packets are routed.
|
||||
@@ -132,6 +159,8 @@ pub enum ForwardingEngine {
|
||||
Tun(tokio::io::WriteHalf<tun::AsyncDevice>),
|
||||
/// Userspace NAT — packets sent to smoltcp-based NAT engine via channel.
|
||||
Socket(mpsc::Sender<Vec<u8>>),
|
||||
/// L2 Bridge — packets sent to BridgeEngine via channel, bridged to host LAN.
|
||||
Bridge(mpsc::Sender<Vec<u8>>),
|
||||
/// Testing/monitoring — packets are counted but not forwarded.
|
||||
Testing,
|
||||
}
|
||||
@@ -175,7 +204,15 @@ impl VpnServer {
|
||||
anyhow::bail!("Server is already running");
|
||||
}
|
||||
|
||||
let ip_pool = IpPool::new(&config.subnet)?;
|
||||
let mode = config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let ip_pool = if mode == "bridge" {
|
||||
let lan_subnet = config.bridge_lan_subnet.as_deref().unwrap_or(&config.subnet);
|
||||
let range_start = config.bridge_ip_range_start.unwrap_or(200);
|
||||
let range_end = config.bridge_ip_range_end.unwrap_or(250);
|
||||
IpPool::new_with_range(lan_subnet, range_start, range_end)?
|
||||
} else {
|
||||
IpPool::new(&config.subnet)?
|
||||
};
|
||||
|
||||
if config.enable_nat.unwrap_or(false) {
|
||||
if let Err(e) = crate::network::enable_ip_forwarding() {
|
||||
@@ -189,7 +226,6 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
let link_mtu = config.mtu.unwrap_or(1420);
|
||||
let mode = config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let gateway_ip = ip_pool.gateway_addr();
|
||||
|
||||
// Create forwarding engine based on mode
|
||||
@@ -204,6 +240,12 @@ impl VpnServer {
|
||||
packet_rx: mpsc::Receiver<Vec<u8>>,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
Bridge {
|
||||
packet_tx: mpsc::Sender<Vec<u8>>,
|
||||
packet_rx: mpsc::Receiver<Vec<u8>>,
|
||||
tap_device: tun::AsyncDevice,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
Testing,
|
||||
}
|
||||
|
||||
@@ -227,6 +269,33 @@ impl VpnServer {
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx: rx }, tx)
|
||||
}
|
||||
"bridge" => {
|
||||
info!("Starting L2 bridge forwarding (requires CAP_NET_ADMIN)");
|
||||
let phys_iface = match &config.bridge_physical_interface {
|
||||
Some(i) => i.clone(),
|
||||
None => crate::bridge::detect_default_interface().await?,
|
||||
};
|
||||
let (host_ip, host_prefix) = crate::bridge::get_interface_ip(&phys_iface).await?;
|
||||
|
||||
let bridge_name = "svpn_br0";
|
||||
let tap_name = "svpn_tap0";
|
||||
|
||||
// Create TAP + bridge infrastructure
|
||||
let tap_device = crate::bridge::create_tap(tap_name, link_mtu)?;
|
||||
crate::bridge::create_bridge(bridge_name).await?;
|
||||
crate::bridge::set_interface_up(bridge_name).await?;
|
||||
crate::bridge::bridge_add_interface(bridge_name, tap_name).await?;
|
||||
crate::bridge::set_interface_up(tap_name).await?;
|
||||
crate::bridge::bridge_add_interface(bridge_name, &phys_iface).await?;
|
||||
crate::bridge::migrate_host_ip_to_bridge(&phys_iface, bridge_name, host_ip, host_prefix).await?;
|
||||
crate::bridge::enable_proxy_arp(bridge_name).await?;
|
||||
|
||||
info!("Bridge {} created: TAP={}, physical={}, IP={}/{}", bridge_name, tap_name, phys_iface, host_ip, host_prefix);
|
||||
|
||||
let (packet_tx, packet_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Bridge { packet_tx, packet_rx, tap_device, shutdown_rx: rx }, tx)
|
||||
}
|
||||
_ => {
|
||||
info!("Forwarding disabled (testing/monitoring mode)");
|
||||
let (tx, _rx) = mpsc::channel::<()>(1);
|
||||
@@ -285,6 +354,15 @@ impl VpnServer {
|
||||
}
|
||||
});
|
||||
}
|
||||
ForwardingSetup::Bridge { packet_tx, packet_rx, tap_device, shutdown_rx } => {
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Bridge(packet_tx);
|
||||
let bridge_engine = crate::bridge::BridgeEngine::new(state.clone());
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = bridge_engine.run(tap_device, packet_rx, shutdown_rx).await {
|
||||
error!("Bridge engine error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
ForwardingSetup::Testing => {}
|
||||
}
|
||||
|
||||
@@ -364,6 +442,28 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
info!("VPN server started (transport: {})", transport_mode);
|
||||
|
||||
// Register pre-loaded clients (from config.clients) as WG peers.
|
||||
// The WG listener only starts with config.wg_peers; clients loaded into the
|
||||
// registry need to be dynamically added so WG handshakes work.
|
||||
if self.wg_command_tx.is_some() {
|
||||
let registry = state.client_registry.read().await;
|
||||
for entry in registry.list() {
|
||||
if let (Some(ref wg_key), Some(ref ip_str)) = (&entry.wg_public_key, &entry.assigned_ip) {
|
||||
let peer_config = crate::wireguard::WgPeerConfig {
|
||||
public_key: wg_key.clone(),
|
||||
preshared_key: None,
|
||||
allowed_ips: vec![format!("{}/32", ip_str)],
|
||||
endpoint: None,
|
||||
persistent_keepalive: Some(25),
|
||||
};
|
||||
if let Err(e) = self.add_wg_peer(peer_config).await {
|
||||
warn!("Failed to register pre-loaded WG peer for {}: {}", entry.client_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -420,7 +520,21 @@ impl VpnServer {
|
||||
if let Some(ref state) = self.state {
|
||||
let mut stats = state.stats.read().await.clone();
|
||||
stats.uptime_seconds = state.started_at.elapsed().as_secs();
|
||||
stats.active_clients = state.clients.read().await.len() as u64;
|
||||
let clients = state.clients.read().await;
|
||||
stats.active_clients = clients.len() as u64;
|
||||
// Compute per-transport active counts
|
||||
stats.active_clients_websocket = 0;
|
||||
stats.active_clients_quic = 0;
|
||||
stats.active_clients_wireguard = 0;
|
||||
for info in clients.values() {
|
||||
match info.transport_type.as_str() {
|
||||
"websocket" => stats.active_clients_websocket += 1,
|
||||
"quic" => stats.active_clients_quic += 1,
|
||||
"wireguard" => stats.active_clients_wireguard += 1,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
drop(clients);
|
||||
stats
|
||||
} else {
|
||||
ServerStatistics::default()
|
||||
@@ -586,10 +700,27 @@ impl VpnServer {
|
||||
// Add to registry
|
||||
state.client_registry.write().await.add(entry.clone())?;
|
||||
|
||||
// Register WG peer with the running WG listener (if active)
|
||||
if self.wg_command_tx.is_some() {
|
||||
let wg_peer_config = crate::wireguard::WgPeerConfig {
|
||||
public_key: wg_pub.clone(),
|
||||
preshared_key: None,
|
||||
allowed_ips: vec![format!("{}/32", assigned_ip)],
|
||||
endpoint: None,
|
||||
persistent_keepalive: Some(25),
|
||||
};
|
||||
if let Err(e) = self.add_wg_peer(wg_peer_config).await {
|
||||
warn!("Failed to register WG peer for client {}: {}", client_id, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Build SmartVPN client config
|
||||
let smartvpn_server_url = format!("wss://{}",
|
||||
state.config.server_endpoint.as_deref()
|
||||
.unwrap_or(&state.config.listen_addr)
|
||||
.replace("0.0.0.0", "localhost"));
|
||||
let smartvpn_config = serde_json::json!({
|
||||
"serverUrl": format!("wss://{}",
|
||||
state.config.listen_addr.replace("0.0.0.0", "localhost")),
|
||||
"serverUrl": smartvpn_server_url,
|
||||
"serverPublicKey": state.config.public_key,
|
||||
"clientPrivateKey": noise_priv,
|
||||
"clientPublicKey": noise_pub,
|
||||
@@ -599,15 +730,25 @@ impl VpnServer {
|
||||
});
|
||||
|
||||
// Build WireGuard config string
|
||||
let wg_server_pubkey = match &state.config.wg_private_key {
|
||||
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
|
||||
None => state.config.public_key.clone(),
|
||||
};
|
||||
let wg_endpoint = state.config.server_endpoint.as_deref()
|
||||
.unwrap_or(&state.config.listen_addr);
|
||||
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
|
||||
.map(|ips| ips.join(", "))
|
||||
.unwrap_or_else(|| "0.0.0.0/0".to_string());
|
||||
let wg_config = format!(
|
||||
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = 0.0.0.0/0\nEndpoint = {}\nPersistentKeepalive = 25\n",
|
||||
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = {}\nEndpoint = {}\nPersistentKeepalive = 25\n",
|
||||
wg_priv,
|
||||
assigned_ip,
|
||||
state.config.dns.as_ref()
|
||||
.map(|d| format!("DNS = {}", d.join(", ")))
|
||||
.unwrap_or_default(),
|
||||
state.config.public_key,
|
||||
state.config.listen_addr,
|
||||
wg_server_pubkey,
|
||||
wg_allowed_ips,
|
||||
wg_endpoint,
|
||||
);
|
||||
|
||||
let entry_json = serde_json::to_value(&entry)?;
|
||||
@@ -628,6 +769,14 @@ impl VpnServer {
|
||||
let state = self.state.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Server not running"))?;
|
||||
let entry = state.client_registry.write().await.remove(client_id)?;
|
||||
// Remove WG peer from running listener
|
||||
if self.wg_command_tx.is_some() {
|
||||
if let Some(ref wg_key) = entry.wg_public_key {
|
||||
if let Err(e) = self.remove_wg_peer(wg_key).await {
|
||||
debug!("Failed to remove WG peer for client {}: {}", client_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Release the IP if assigned
|
||||
if let Some(ref ip_str) = entry.assigned_ip {
|
||||
if let Ok(ip) = ip_str.parse::<Ipv4Addr>() {
|
||||
@@ -714,6 +863,14 @@ impl VpnServer {
|
||||
let state = self.state.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Server not running"))?;
|
||||
|
||||
// Capture old WG key before rotation (needed to remove from WG listener)
|
||||
let old_wg_pub = {
|
||||
let registry = state.client_registry.read().await;
|
||||
let entry = registry.get_by_id(client_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Client '{}' not found", client_id))?;
|
||||
entry.wg_public_key.clone()
|
||||
};
|
||||
|
||||
let (noise_pub, noise_priv) = crypto::generate_keypair()?;
|
||||
let (wg_pub, wg_priv) = crate::wireguard::generate_wg_keypair();
|
||||
|
||||
@@ -732,9 +889,31 @@ impl VpnServer {
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("0.0.0.0");
|
||||
|
||||
// Update WG listener: remove old peer, add new peer
|
||||
if self.wg_command_tx.is_some() {
|
||||
if let Some(ref old_key) = old_wg_pub {
|
||||
if let Err(e) = self.remove_wg_peer(old_key).await {
|
||||
debug!("Failed to remove old WG peer during rotation: {}", e);
|
||||
}
|
||||
}
|
||||
let wg_peer_config = crate::wireguard::WgPeerConfig {
|
||||
public_key: wg_pub.clone(),
|
||||
preshared_key: None,
|
||||
allowed_ips: vec![format!("{}/32", assigned_ip)],
|
||||
endpoint: None,
|
||||
persistent_keepalive: Some(25),
|
||||
};
|
||||
if let Err(e) = self.add_wg_peer(wg_peer_config).await {
|
||||
warn!("Failed to register new WG peer during rotation: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
let smartvpn_server_url = format!("wss://{}",
|
||||
state.config.server_endpoint.as_deref()
|
||||
.unwrap_or(&state.config.listen_addr)
|
||||
.replace("0.0.0.0", "localhost"));
|
||||
let smartvpn_config = serde_json::json!({
|
||||
"serverUrl": format!("wss://{}",
|
||||
state.config.listen_addr.replace("0.0.0.0", "localhost")),
|
||||
"serverUrl": smartvpn_server_url,
|
||||
"serverPublicKey": state.config.public_key,
|
||||
"clientPrivateKey": noise_priv,
|
||||
"clientPublicKey": noise_pub,
|
||||
@@ -743,14 +922,24 @@ impl VpnServer {
|
||||
"keepaliveIntervalSecs": state.config.keepalive_interval_secs,
|
||||
});
|
||||
|
||||
let wg_server_pubkey = match &state.config.wg_private_key {
|
||||
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
|
||||
None => state.config.public_key.clone(),
|
||||
};
|
||||
let wg_endpoint = state.config.server_endpoint.as_deref()
|
||||
.unwrap_or(&state.config.listen_addr);
|
||||
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
|
||||
.map(|ips| ips.join(", "))
|
||||
.unwrap_or_else(|| "0.0.0.0/0".to_string());
|
||||
let wg_config = format!(
|
||||
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = 0.0.0.0/0\nEndpoint = {}\nPersistentKeepalive = 25\n",
|
||||
"[Interface]\nPrivateKey = {}\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = {}\nEndpoint = {}\nPersistentKeepalive = 25\n",
|
||||
wg_priv, assigned_ip,
|
||||
state.config.dns.as_ref()
|
||||
.map(|d| format!("DNS = {}", d.join(", ")))
|
||||
.unwrap_or_default(),
|
||||
state.config.public_key,
|
||||
state.config.listen_addr,
|
||||
wg_server_pubkey,
|
||||
wg_allowed_ips,
|
||||
wg_endpoint,
|
||||
);
|
||||
|
||||
Ok(serde_json::json!({
|
||||
@@ -774,10 +963,13 @@ impl VpnServer {
|
||||
|
||||
match format {
|
||||
"smartvpn" => {
|
||||
let smartvpn_server_url = format!("wss://{}",
|
||||
state.config.server_endpoint.as_deref()
|
||||
.unwrap_or(&state.config.listen_addr)
|
||||
.replace("0.0.0.0", "localhost"));
|
||||
Ok(serde_json::json!({
|
||||
"config": {
|
||||
"serverUrl": format!("wss://{}",
|
||||
state.config.listen_addr.replace("0.0.0.0", "localhost")),
|
||||
"serverUrl": smartvpn_server_url,
|
||||
"serverPublicKey": state.config.public_key,
|
||||
"clientPublicKey": entry.public_key,
|
||||
"dns": state.config.dns,
|
||||
@@ -787,15 +979,25 @@ impl VpnServer {
|
||||
}))
|
||||
}
|
||||
"wireguard" => {
|
||||
let wg_server_pubkey = match &state.config.wg_private_key {
|
||||
Some(wg_priv_key) => crate::wireguard::wg_public_key_from_private(wg_priv_key)?,
|
||||
None => state.config.public_key.clone(),
|
||||
};
|
||||
let assigned_ip = entry.assigned_ip.as_deref().unwrap_or("0.0.0.0");
|
||||
let wg_endpoint = state.config.server_endpoint.as_deref()
|
||||
.unwrap_or(&state.config.listen_addr);
|
||||
let wg_allowed_ips = state.config.client_allowed_ips.as_ref()
|
||||
.map(|ips| ips.join(", "))
|
||||
.unwrap_or_else(|| "0.0.0.0/0".to_string());
|
||||
let config = format!(
|
||||
"[Interface]\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = 0.0.0.0/0\nEndpoint = {}\nPersistentKeepalive = 25\n",
|
||||
"[Interface]\nAddress = {}/24\n{}\n[Peer]\nPublicKey = {}\nAllowedIPs = {}\nEndpoint = {}\nPersistentKeepalive = 25\n",
|
||||
assigned_ip,
|
||||
state.config.dns.as_ref()
|
||||
.map(|d| format!("DNS = {}", d.join(", ")))
|
||||
.unwrap_or_default(),
|
||||
state.config.public_key,
|
||||
state.config.listen_addr,
|
||||
wg_server_pubkey,
|
||||
wg_allowed_ips,
|
||||
wg_endpoint,
|
||||
);
|
||||
Ok(serde_json::json!({ "config": config }))
|
||||
}
|
||||
@@ -1185,6 +1387,11 @@ async fn handle_client_connection(
|
||||
{
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.total_connections += 1;
|
||||
match transport_type {
|
||||
"websocket" => stats.total_connections_websocket += 1,
|
||||
"quic" => stats.total_connections_quic += 1,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Send assigned IP info (encrypted), include effective MTU
|
||||
@@ -1285,6 +1492,9 @@ async fn handle_client_connection(
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(buf[..len].to_vec());
|
||||
}
|
||||
ForwardingEngine::Bridge(sender) => {
|
||||
let _ = sender.try_send(buf[..len].to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,10 @@ use crate::acl;
|
||||
use crate::server::{DestinationPolicyConfig, ServerState};
|
||||
use crate::tunnel;
|
||||
|
||||
/// Maximum size of per-session pending send buffer (512KB = 8x socket buffer).
|
||||
/// Sessions exceeding this are aborted — the client cannot keep up.
|
||||
const TCP_PENDING_SEND_MAX: usize = 512 * 1024;
|
||||
|
||||
// ============================================================================
|
||||
// Virtual IP device for smoltcp
|
||||
// ============================================================================
|
||||
@@ -101,7 +105,7 @@ impl Device for VirtualIpDevice {
|
||||
let mut caps = DeviceCapabilities::default();
|
||||
caps.medium = Medium::Ip;
|
||||
caps.max_transmission_unit = self.mtu;
|
||||
caps.max_burst_size = Some(1);
|
||||
caps.max_burst_size = None;
|
||||
caps
|
||||
}
|
||||
}
|
||||
@@ -121,9 +125,20 @@ struct SessionKey {
|
||||
|
||||
struct TcpSession {
|
||||
smoltcp_handle: SocketHandle,
|
||||
bridge_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Channel to send data to the bridge task. None until bridge starts.
|
||||
bridge_data_tx: Option<mpsc::Sender<Vec<u8>>>,
|
||||
#[allow(dead_code)]
|
||||
client_ip: Ipv4Addr,
|
||||
/// Bridge task has been spawned (deferred until handshake completes)
|
||||
bridge_started: bool,
|
||||
/// Address to connect the bridge task to (may differ from dst if policy rewrote it)
|
||||
connect_addr: SocketAddr,
|
||||
/// Buffered data from bridge waiting to be written to smoltcp socket
|
||||
pending_send: Vec<u8>,
|
||||
/// Session is closing (FIN in progress), don't accept new SYNs
|
||||
closing: bool,
|
||||
/// Last time data flowed through this session (for idle timeout)
|
||||
last_activity: tokio::time::Instant,
|
||||
}
|
||||
|
||||
struct UdpSession {
|
||||
@@ -252,8 +267,19 @@ impl NatEngine {
|
||||
}
|
||||
|
||||
/// Evaluate destination policy for a packet's destination IP.
|
||||
fn evaluate_destination(&self, dst_ip: Ipv4Addr, dst_port: u16) -> DestinationAction {
|
||||
let policy = match &self.destination_policy {
|
||||
/// Checks per-client policy first (via src_ip → client registry lookup),
|
||||
/// falls back to server-wide policy.
|
||||
fn evaluate_destination(&self, src_ip: Ipv4Addr, dst_ip: Ipv4Addr, dst_port: u16) -> DestinationAction {
|
||||
// Try per-client destination policy (lookup by tunnel IP)
|
||||
let client_policy = if let Ok(registry) = self.state.client_registry.try_read() {
|
||||
registry.get_by_assigned_ip(&src_ip.to_string())
|
||||
.and_then(|e| e.security.as_ref())
|
||||
.and_then(|s| s.destination_policy.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let policy = match client_policy.as_ref().or(self.destination_policy.as_ref()) {
|
||||
Some(p) => p,
|
||||
None => return DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port)),
|
||||
};
|
||||
@@ -308,8 +334,10 @@ impl NatEngine {
|
||||
|
||||
// SYN without ACK = new connection
|
||||
let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0;
|
||||
if is_syn && !self.tcp_sessions.contains_key(&key) {
|
||||
match self.evaluate_destination(dst_ip, dst_port) {
|
||||
// Skip if session exists (including closing sessions — let FIN complete)
|
||||
let session_exists = self.tcp_sessions.contains_key(&key);
|
||||
if is_syn && !session_exists {
|
||||
match self.evaluate_destination(src_ip, dst_ip, dst_port) {
|
||||
DestinationAction::Drop => {
|
||||
debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
|
||||
return;
|
||||
@@ -333,7 +361,7 @@ impl NatEngine {
|
||||
};
|
||||
|
||||
if !self.udp_sessions.contains_key(&key) {
|
||||
match self.evaluate_destination(dst_ip, dst_port) {
|
||||
match self.evaluate_destination(src_ip, dst_ip, dst_port) {
|
||||
DestinationAction::Drop => {
|
||||
debug!("NAT: destination policy blocked UDP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port);
|
||||
return;
|
||||
@@ -375,23 +403,22 @@ impl NatEngine {
|
||||
|
||||
let handle = self.sockets.add(socket);
|
||||
|
||||
// Channel for sending data from NAT engine to bridge task
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
|
||||
let session = TcpSession {
|
||||
smoltcp_handle: handle,
|
||||
bridge_data_tx: data_tx,
|
||||
bridge_data_tx: None,
|
||||
client_ip: key.src_ip,
|
||||
bridge_started: false,
|
||||
connect_addr,
|
||||
pending_send: Vec::new(),
|
||||
closing: false,
|
||||
last_activity: tokio::time::Instant::now(),
|
||||
};
|
||||
self.tcp_sessions.insert(key.clone(), session);
|
||||
|
||||
// Spawn bridge task that connects to the resolved destination
|
||||
let bridge_tx = self.bridge_tx.clone();
|
||||
let key_clone = key.clone();
|
||||
let proxy_protocol = self.proxy_protocol;
|
||||
tokio::spawn(async move {
|
||||
tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await;
|
||||
});
|
||||
// NOTE: Bridge task is NOT spawned here — it will be spawned in process()
|
||||
// once the smoltcp handshake completes (socket.is_active() == true).
|
||||
// This prevents data from the real server arriving before the VPN client
|
||||
// handshake is done, which would cause silent data loss.
|
||||
|
||||
debug!(
|
||||
"NAT: new TCP session {}:{} -> {}:{}",
|
||||
@@ -451,15 +478,69 @@ impl NatEngine {
|
||||
self.iface
|
||||
.poll(now, &mut self.device, &mut self.sockets);
|
||||
|
||||
// Start bridge tasks for sessions whose handshake just completed
|
||||
let bridge_tx_clone = self.bridge_tx.clone();
|
||||
let proxy_protocol = self.proxy_protocol;
|
||||
for (key, session) in self.tcp_sessions.iter_mut() {
|
||||
if !session.bridge_started && !session.closing {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.is_active() {
|
||||
session.bridge_started = true;
|
||||
let (data_tx, data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
session.bridge_data_tx = Some(data_tx);
|
||||
let btx = bridge_tx_clone.clone();
|
||||
let k = key.clone();
|
||||
let addr = session.connect_addr;
|
||||
let pp = proxy_protocol;
|
||||
tokio::spawn(async move {
|
||||
tcp_bridge_task(k, data_rx, btx, pp, addr).await;
|
||||
});
|
||||
debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush pending send buffers to smoltcp sockets
|
||||
for (_key, session) in self.tcp_sessions.iter_mut() {
|
||||
if !session.pending_send.is_empty() {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_send() {
|
||||
match socket.send_slice(&session.pending_send) {
|
||||
Ok(written) if written > 0 => {
|
||||
session.pending_send.drain(..written);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Bridge: read data from smoltcp TCP sockets → send to bridge tasks
|
||||
let mut closed_tcp: Vec<SessionKey> = Vec::new();
|
||||
let mut active_tcp: Vec<SessionKey> = Vec::new();
|
||||
for (key, session) in &self.tcp_sessions {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_recv() {
|
||||
let _ = socket.recv(|data| {
|
||||
let _ = session.bridge_data_tx.try_send(data.to_vec());
|
||||
(data.len(), ())
|
||||
});
|
||||
if session.bridge_started && socket.can_recv() {
|
||||
if let Some(ref sender) = session.bridge_data_tx {
|
||||
// Reserve channel slot BEFORE consuming from smoltcp.
|
||||
// If the channel is full, we don't consume — smoltcp's RX buffer
|
||||
// fills up, it stops advertising TCP window space, and the VPN
|
||||
// client's TCP stack backs off. Proper end-to-end backpressure.
|
||||
match sender.try_reserve() {
|
||||
Ok(permit) => {
|
||||
let _ = socket.recv(|data| {
|
||||
permit.send(data.to_vec());
|
||||
(data.len(), ())
|
||||
});
|
||||
active_tcp.push(key.clone());
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Detect closed connections
|
||||
if !socket.is_open() && !socket.is_listening() {
|
||||
@@ -467,6 +548,14 @@ impl NatEngine {
|
||||
}
|
||||
}
|
||||
|
||||
// Update last_activity for sessions that had data flow
|
||||
let now = tokio::time::Instant::now();
|
||||
for key in active_tcp {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
session.last_activity = now;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up closed TCP sessions
|
||||
for key in closed_tcp {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
@@ -479,7 +568,9 @@ impl NatEngine {
|
||||
for (_key, session) in &self.udp_sessions {
|
||||
let socket = self.sockets.get_mut::<udp::Socket>(session.smoltcp_handle);
|
||||
while let Ok((data, _meta)) = socket.recv() {
|
||||
let _ = session.bridge_data_tx.try_send(data.to_vec());
|
||||
if session.bridge_data_tx.try_send(data.to_vec()).is_err() {
|
||||
debug!("NAT: bridge channel full, UDP data dropped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,7 +579,9 @@ impl NatEngine {
|
||||
for packet in self.device.drain_tx() {
|
||||
if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) {
|
||||
if let Some(sender) = routes.get(&dst_ip) {
|
||||
let _ = sender.try_send(packet);
|
||||
if sender.try_send(packet).is_err() {
|
||||
debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -497,22 +590,43 @@ impl NatEngine {
|
||||
fn handle_bridge_message(&mut self, msg: BridgeMessage) {
|
||||
match msg {
|
||||
BridgeMessage::TcpData { key, data } => {
|
||||
if let Some(session) = self.tcp_sessions.get(&key) {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
session.last_activity = tokio::time::Instant::now();
|
||||
// Append to pending buffer, then flush as much as possible
|
||||
session.pending_send.extend_from_slice(&data);
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
if socket.can_send() {
|
||||
let _ = socket.send_slice(&data);
|
||||
if socket.can_send() && !session.pending_send.is_empty() {
|
||||
match socket.send_slice(&session.pending_send) {
|
||||
Ok(written) if written > 0 => {
|
||||
session.pending_send.drain(..written);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// Cap check — abort session if client can't keep up
|
||||
if session.pending_send.len() > TCP_PENDING_SEND_MAX {
|
||||
warn!(
|
||||
"NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port,
|
||||
TCP_PENDING_SEND_MAX / 1024
|
||||
);
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.abort();
|
||||
session.pending_send.clear();
|
||||
session.closing = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BridgeMessage::TcpClosed { key } => {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
if let Some(session) = self.tcp_sessions.get_mut(&key) {
|
||||
let socket =
|
||||
self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.close();
|
||||
session.closing = true;
|
||||
// Don't remove from SocketSet yet — let smoltcp send FIN
|
||||
// It will be cleaned up in process() when is_open() returns false
|
||||
self.tcp_sessions.insert(key, session);
|
||||
}
|
||||
}
|
||||
BridgeMessage::UdpData { key, data } => {
|
||||
@@ -552,6 +666,29 @@ impl NatEngine {
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_idle_tcp_sessions(&mut self) {
|
||||
let timeout = Duration::from_secs(300); // 5 minutes
|
||||
let now = tokio::time::Instant::now();
|
||||
let expired: Vec<SessionKey> = self
|
||||
.tcp_sessions
|
||||
.iter()
|
||||
.filter(|(_, s)| now.duration_since(s.last_activity) > timeout)
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
|
||||
for key in expired {
|
||||
if let Some(session) = self.tcp_sessions.remove(&key) {
|
||||
let socket = self.sockets.get_mut::<tcp::Socket>(session.smoltcp_handle);
|
||||
socket.abort();
|
||||
self.sockets.remove(session.smoltcp_handle);
|
||||
warn!(
|
||||
"NAT: TCP session timed out {}:{} -> {}:{}",
|
||||
key.src_ip, key.src_port, key.dst_ip, key.dst_port
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main async event loop for the NAT engine.
|
||||
pub async fn run(
|
||||
mut self,
|
||||
@@ -559,9 +696,13 @@ impl NatEngine {
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
info!("Userspace NAT engine started");
|
||||
let mut timer = tokio::time::interval(Duration::from_millis(50));
|
||||
let default_poll_delay = Duration::from_millis(50);
|
||||
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10));
|
||||
|
||||
// Dynamic poll timer — reset after each event using smoltcp's poll_delay()
|
||||
let poll_sleep = tokio::time::sleep(default_poll_delay);
|
||||
tokio::pin!(poll_sleep);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(packet) = packet_rx.recv() => {
|
||||
@@ -572,18 +713,26 @@ impl NatEngine {
|
||||
self.handle_bridge_message(msg);
|
||||
self.process().await;
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
() = &mut poll_sleep => {
|
||||
// Periodic poll for smoltcp maintenance (TCP retransmit, etc.)
|
||||
self.process().await;
|
||||
}
|
||||
_ = cleanup_timer.tick() => {
|
||||
self.cleanup_idle_udp_sessions();
|
||||
self.cleanup_idle_tcp_sessions();
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Userspace NAT engine shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Reset poll delay based on smoltcp's actual timer needs
|
||||
let now = self.smoltcp_now();
|
||||
let delay = self.iface.poll_delay(now, &self.sockets)
|
||||
.map(|d| Duration::from_millis(d.total_millis()))
|
||||
.unwrap_or(default_poll_delay);
|
||||
poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use anyhow::{anyhow, Result};
|
||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||
use base64::Engine;
|
||||
use boringtun::noise::errors::WireGuardError;
|
||||
use boringtun::noise::rate_limiter::RateLimiter;
|
||||
use boringtun::noise::{Tunn, TunnResult};
|
||||
use boringtun::x25519::{PublicKey, StaticSecret};
|
||||
@@ -99,6 +100,13 @@ pub fn generate_wg_keypair() -> (String, String) {
|
||||
(pub_b64, priv_b64)
|
||||
}
|
||||
|
||||
/// Derive the WireGuard public key (base64) from a private key (base64).
|
||||
pub fn wg_public_key_from_private(private_key_b64: &str) -> Result<String> {
|
||||
let private = parse_private_key(private_key_b64)?;
|
||||
let public = PublicKey::from(&private);
|
||||
Ok(BASE64.encode(public.to_bytes()))
|
||||
}
|
||||
|
||||
fn parse_private_key(b64: &str) -> Result<StaticSecret> {
|
||||
let bytes = BASE64.decode(b64)?;
|
||||
if bytes.len() != 32 {
|
||||
@@ -212,11 +220,20 @@ struct PeerState {
|
||||
#[allow(dead_code)]
|
||||
persistent_keepalive: Option<u16>,
|
||||
stats: WgPeerStats,
|
||||
/// Whether this peer has completed a WireGuard handshake and is in state.clients.
|
||||
is_connected: bool,
|
||||
/// Last time we received data or handshake activity from this peer.
|
||||
last_activity_at: Option<tokio::time::Instant>,
|
||||
/// VPN IP assigned during registration (used for connect/disconnect).
|
||||
vpn_ip: Option<Ipv4Addr>,
|
||||
/// Previous synced byte counts for aggregate stats delta tracking.
|
||||
prev_synced_bytes_sent: u64,
|
||||
prev_synced_bytes_received: u64,
|
||||
}
|
||||
|
||||
impl PeerState {
|
||||
fn matches_dst(&self, dst_ip: IpAddr) -> bool {
|
||||
self.allowed_ips.iter().any(|aip| aip.matches(dst_ip))
|
||||
fn matches_allowed_ips(&self, ip: IpAddr) -> bool {
|
||||
self.allowed_ips.iter().any(|aip| aip.matches(ip))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,6 +285,11 @@ fn add_peer_to_loop(
|
||||
endpoint,
|
||||
persistent_keepalive: config.persistent_keepalive,
|
||||
stats: WgPeerStats::default(),
|
||||
is_connected: false,
|
||||
last_activity_at: None,
|
||||
vpn_ip: None,
|
||||
prev_synced_bytes_sent: 0,
|
||||
prev_synced_bytes_received: 0,
|
||||
});
|
||||
|
||||
info!("Added WireGuard peer: {}", config.public_key);
|
||||
@@ -286,9 +308,10 @@ pub struct WgListenerConfig {
|
||||
pub peers: Vec<WgPeerConfig>,
|
||||
}
|
||||
|
||||
/// Extract the first /32 IPv4 address from a list of AllowedIp entries.
|
||||
/// This is the peer's VPN IP used for return-packet routing.
|
||||
/// Extract the peer's VPN IP from AllowedIp entries.
|
||||
/// Prefers /32 entries (exact match); falls back to any IPv4 address.
|
||||
fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
|
||||
// Prefer /32 entries (exact peer VPN IP)
|
||||
for aip in allowed_ips {
|
||||
if let IpAddr::V4(v4) = aip.addr {
|
||||
if aip.prefix_len == 32 {
|
||||
@@ -296,6 +319,12 @@ fn extract_peer_vpn_ip(allowed_ips: &[AllowedIp]) -> Option<Ipv4Addr> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Fallback: use the first IPv4 address from any prefix length
|
||||
for aip in allowed_ips {
|
||||
if let IpAddr::V4(v4) = aip.addr {
|
||||
return Some(v4);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
@@ -308,8 +337,9 @@ fn wg_timestamp_now() -> String {
|
||||
format!("{}", duration.as_secs())
|
||||
}
|
||||
|
||||
/// Register a WG peer in ServerState (tun_routes, clients, ip_pool).
|
||||
/// Returns the VPN IP and the per-peer return-packet receiver.
|
||||
/// Register a WG peer in ServerState (tun_routes + ip_pool only).
|
||||
/// Does NOT add to state.clients — peers appear there only after handshake.
|
||||
/// Returns the VPN IP.
|
||||
async fn register_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
peer: &PeerState,
|
||||
@@ -351,13 +381,23 @@ async fn register_wg_peer(
|
||||
});
|
||||
}
|
||||
|
||||
// Insert ClientInfo
|
||||
info!("WG peer {} registered with IP {} (not yet connected)", peer.public_key_b64, vpn_ip);
|
||||
Ok(Some(vpn_ip))
|
||||
}
|
||||
|
||||
/// Add a WG peer to state.clients on first successful handshake (data received).
|
||||
async fn connect_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
peer: &PeerState,
|
||||
vpn_ip: Ipv4Addr,
|
||||
) {
|
||||
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
|
||||
let client_info = ClientInfo {
|
||||
client_id: client_id.clone(),
|
||||
assigned_ip: vpn_ip.to_string(),
|
||||
connected_since: wg_timestamp_now(),
|
||||
bytes_sent: 0,
|
||||
bytes_received: 0,
|
||||
bytes_sent: peer.stats.bytes_sent,
|
||||
bytes_received: peer.stats.bytes_received,
|
||||
packets_dropped: 0,
|
||||
bytes_dropped: 0,
|
||||
last_keepalive_at: None,
|
||||
@@ -365,13 +405,31 @@ async fn register_wg_peer(
|
||||
rate_limit_bytes_per_sec: None,
|
||||
burst_bytes: None,
|
||||
authenticated_key: peer.public_key_b64.clone(),
|
||||
registered_client_id: client_id,
|
||||
registered_client_id: client_id.clone(),
|
||||
remote_addr: peer.endpoint.map(|e| e.to_string()),
|
||||
transport_type: "wireguard".to_string(),
|
||||
};
|
||||
state.clients.write().await.insert(client_info.client_id.clone(), client_info);
|
||||
|
||||
Ok(Some(vpn_ip))
|
||||
// Increment total_connections
|
||||
{
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.total_connections += 1;
|
||||
stats.total_connections_wireguard += 1;
|
||||
}
|
||||
|
||||
info!("WG peer {} connected (IP: {})", peer.public_key_b64, vpn_ip);
|
||||
}
|
||||
|
||||
/// Remove a WG peer from state.clients (disconnect without unregistering).
|
||||
async fn disconnect_wg_peer(
|
||||
state: &Arc<ServerState>,
|
||||
pubkey: &str,
|
||||
) {
|
||||
let client_id = format!("wg-{}", &pubkey[..8.min(pubkey.len())]);
|
||||
if state.clients.write().await.remove(&client_id).is_some() {
|
||||
info!("WG peer {} disconnected (removed from active clients)", pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
/// Unregister a WG peer from ServerState.
|
||||
@@ -445,6 +503,11 @@ pub async fn run_wg_listener(
|
||||
endpoint,
|
||||
persistent_keepalive: peer_config.persistent_keepalive,
|
||||
stats: WgPeerStats::default(),
|
||||
is_connected: false,
|
||||
last_activity_at: None,
|
||||
vpn_ip: None,
|
||||
prev_synced_bytes_sent: 0,
|
||||
prev_synced_bytes_received: 0,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -455,11 +518,12 @@ pub async fn run_wg_listener(
|
||||
// Merged return-packet channel: all per-peer channels feed into this
|
||||
let (wg_return_tx, mut wg_return_rx) = mpsc::channel::<(String, Vec<u8>)>(1024);
|
||||
|
||||
// Register initial peers in ServerState and track their VPN IPs
|
||||
// Register initial peers in ServerState (IP reservation + tun_routes only, NOT state.clients)
|
||||
let mut peer_vpn_ips: HashMap<String, Ipv4Addr> = HashMap::new();
|
||||
for peer in &peers {
|
||||
for peer in peers.iter_mut() {
|
||||
if let Ok(Some(ip)) = register_wg_peer(&state, peer, &wg_return_tx).await {
|
||||
peer_vpn_ips.insert(peer.public_key_b64.clone(), ip);
|
||||
peer.vpn_ip = Some(ip);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -468,6 +532,7 @@ pub async fn run_wg_listener(
|
||||
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
|
||||
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
|
||||
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
|
||||
let mut idle_check_timer = tokio::time::interval(std::time::Duration::from_secs(10));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -491,11 +556,13 @@ pub async fn run_wg_listener(
|
||||
}
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
// Handshake response counts as activity
|
||||
peer.last_activity_at = Some(tokio::time::Instant::now());
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::WriteToTunnelV4(packet, addr) => {
|
||||
if peer.matches_dst(IpAddr::V4(addr)) {
|
||||
if peer.matches_allowed_ips(IpAddr::V4(addr)) {
|
||||
let pkt_len = packet.len() as u64;
|
||||
// Forward via shared forwarding engine
|
||||
let mut engine = state.forwarding_engine.lock().await;
|
||||
@@ -509,17 +576,29 @@ pub async fn run_wg_listener(
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(packet.to_vec());
|
||||
}
|
||||
ForwardingEngine::Bridge(sender) => {
|
||||
let _ = sender.try_send(packet.to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
peer.stats.bytes_received += pkt_len;
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
// Track activity and detect handshake completion
|
||||
peer.last_activity_at = Some(tokio::time::Instant::now());
|
||||
if !peer.is_connected {
|
||||
peer.is_connected = true;
|
||||
peer.stats.last_handshake_time = Some(wg_timestamp_now());
|
||||
if let Some(vpn_ip) = peer.vpn_ip {
|
||||
connect_wg_peer(&state, peer, vpn_ip).await;
|
||||
}
|
||||
}
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
TunnResult::WriteToTunnelV6(packet, addr) => {
|
||||
if peer.matches_dst(IpAddr::V6(addr)) {
|
||||
if peer.matches_allowed_ips(IpAddr::V6(addr)) {
|
||||
let pkt_len = packet.len() as u64;
|
||||
let mut engine = state.forwarding_engine.lock().await;
|
||||
match &mut *engine {
|
||||
@@ -532,12 +611,24 @@ pub async fn run_wg_listener(
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(packet.to_vec());
|
||||
}
|
||||
ForwardingEngine::Bridge(sender) => {
|
||||
let _ = sender.try_send(packet.to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
peer.stats.bytes_received += pkt_len;
|
||||
peer.stats.packets_received += 1;
|
||||
}
|
||||
peer.endpoint = Some(src_addr);
|
||||
// Track activity and detect handshake completion
|
||||
peer.last_activity_at = Some(tokio::time::Instant::now());
|
||||
if !peer.is_connected {
|
||||
peer.is_connected = true;
|
||||
peer.stats.last_handshake_time = Some(wg_timestamp_now());
|
||||
if let Some(vpn_ip) = peer.vpn_ip {
|
||||
connect_wg_peer(&state, peer, vpn_ip).await;
|
||||
}
|
||||
}
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
@@ -586,6 +677,13 @@ pub async fn run_wg_listener(
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
}
|
||||
}
|
||||
TunnResult::Err(WireGuardError::ConnectionExpired) => {
|
||||
warn!("WG peer {} connection expired", peer.public_key_b64);
|
||||
if peer.is_connected {
|
||||
peer.is_connected = false;
|
||||
disconnect_wg_peer(&state, &peer.public_key_b64).await;
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("Timer error for WG peer {}: {:?}",
|
||||
peer.public_key_b64, e);
|
||||
@@ -599,19 +697,39 @@ pub async fn run_wg_listener(
|
||||
_ = stats_timer.tick() => {
|
||||
let mut clients = state.clients.write().await;
|
||||
let mut stats = state.stats.write().await;
|
||||
for peer in peers.iter() {
|
||||
for peer in peers.iter_mut() {
|
||||
// Always update aggregate stats (regardless of connection state)
|
||||
let delta_sent = peer.stats.bytes_sent.saturating_sub(peer.prev_synced_bytes_sent);
|
||||
let delta_recv = peer.stats.bytes_received.saturating_sub(peer.prev_synced_bytes_received);
|
||||
if delta_sent > 0 || delta_recv > 0 {
|
||||
stats.bytes_sent += delta_sent;
|
||||
stats.bytes_received += delta_recv;
|
||||
peer.prev_synced_bytes_sent = peer.stats.bytes_sent;
|
||||
peer.prev_synced_bytes_received = peer.stats.bytes_received;
|
||||
}
|
||||
|
||||
// Only update ClientInfo if peer is connected (in state.clients)
|
||||
let client_id = format!("wg-{}", &peer.public_key_b64[..8.min(peer.public_key_b64.len())]);
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
// Update stats delta
|
||||
let prev_sent = info.bytes_sent;
|
||||
let prev_recv = info.bytes_received;
|
||||
info.bytes_sent = peer.stats.bytes_sent;
|
||||
info.bytes_received = peer.stats.bytes_received;
|
||||
info.remote_addr = peer.endpoint.map(|e| e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update aggregate stats
|
||||
stats.bytes_sent += peer.stats.bytes_sent.saturating_sub(prev_sent);
|
||||
stats.bytes_received += peer.stats.bytes_received.saturating_sub(prev_recv);
|
||||
// --- Idle timeout check (every 10s) ---
|
||||
_ = idle_check_timer.tick() => {
|
||||
let now = tokio::time::Instant::now();
|
||||
for peer in peers.iter_mut() {
|
||||
if peer.is_connected {
|
||||
if let Some(last) = peer.last_activity_at {
|
||||
if now.duration_since(last) > std::time::Duration::from_secs(180) {
|
||||
info!("WG peer {} idle timeout (180s), disconnecting", peer.public_key_b64);
|
||||
peer.is_connected = false;
|
||||
disconnect_wg_peer(&state, &peer.public_key_b64).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -628,11 +746,12 @@ pub async fn run_wg_listener(
|
||||
&config.private_key,
|
||||
);
|
||||
if result.is_ok() {
|
||||
// Register new peer in ServerState
|
||||
let peer = peers.last().unwrap();
|
||||
// Register new peer in ServerState (IP + tun_routes only)
|
||||
let peer = peers.last_mut().unwrap();
|
||||
match register_wg_peer(&state, peer, &wg_return_tx).await {
|
||||
Ok(Some(ip)) => {
|
||||
peer_vpn_ips.insert(peer_config.public_key.clone(), ip);
|
||||
peer.vpn_ip = Some(ip);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
@@ -796,12 +915,12 @@ impl WgClient {
|
||||
let state = self.state.clone();
|
||||
let assigned_ip = config.address.clone();
|
||||
|
||||
// Update state
|
||||
// Update state — handshake hasn't completed yet
|
||||
{
|
||||
let mut s = state.write().await;
|
||||
s.state = "connected".to_string();
|
||||
s.state = "handshaking".to_string();
|
||||
s.assigned_ip = Some(assigned_ip.clone());
|
||||
s.connected_since = Some(chrono_now());
|
||||
s.connected_since = None;
|
||||
}
|
||||
|
||||
// Spawn client loop
|
||||
@@ -868,7 +987,7 @@ async fn wg_client_loop(
|
||||
endpoint: SocketAddr,
|
||||
_allowed_ips: Vec<AllowedIp>,
|
||||
shared_stats: Arc<RwLock<WgPeerStats>>,
|
||||
_state: Arc<RwLock<WgClientState>>,
|
||||
state: Arc<RwLock<WgClientState>>,
|
||||
mut shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
let mut udp_buf = vec![0u8; MAX_UDP_PACKET];
|
||||
@@ -876,6 +995,7 @@ async fn wg_client_loop(
|
||||
let mut dst_buf = vec![0u8; WG_BUFFER_SIZE];
|
||||
let mut timer = tokio::time::interval(std::time::Duration::from_millis(TIMER_TICK_MS));
|
||||
let mut stats_timer = tokio::time::interval(std::time::Duration::from_secs(1));
|
||||
let mut handshake_complete = false;
|
||||
|
||||
let (mut tun_reader, mut tun_writer) = tokio::io::split(tun_device);
|
||||
|
||||
@@ -916,14 +1036,37 @@ async fn wg_client_loop(
|
||||
tun_writer.write_all(packet).await?;
|
||||
local_stats.bytes_received += pkt_len;
|
||||
local_stats.packets_received += 1;
|
||||
if !handshake_complete {
|
||||
handshake_complete = true;
|
||||
let mut s = state.write().await;
|
||||
s.state = "connected".to_string();
|
||||
s.connected_since = Some(chrono_now());
|
||||
info!("WireGuard handshake completed, tunnel active");
|
||||
}
|
||||
}
|
||||
TunnResult::WriteToTunnelV6(packet, _addr) => {
|
||||
let pkt_len = packet.len() as u64;
|
||||
tun_writer.write_all(packet).await?;
|
||||
local_stats.bytes_received += pkt_len;
|
||||
local_stats.packets_received += 1;
|
||||
if !handshake_complete {
|
||||
handshake_complete = true;
|
||||
let mut s = state.write().await;
|
||||
s.state = "connected".to_string();
|
||||
s.connected_since = Some(chrono_now());
|
||||
info!("WireGuard handshake completed, tunnel active");
|
||||
}
|
||||
}
|
||||
TunnResult::Done => {}
|
||||
TunnResult::Err(WireGuardError::ConnectionExpired) => {
|
||||
warn!("WireGuard session expired during decapsulate, re-initiating handshake");
|
||||
match tunn.format_handshake_initiation(&mut dst_buf, true) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("Client decapsulate error: {:?}", e);
|
||||
}
|
||||
@@ -955,6 +1098,19 @@ async fn wg_client_loop(
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
}
|
||||
TunnResult::Err(WireGuardError::ConnectionExpired) => {
|
||||
warn!("WireGuard connection expired, re-initiating handshake");
|
||||
match tunn.format_handshake_initiation(&mut dst_buf, true) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
udp_socket.send_to(packet, endpoint).await?;
|
||||
debug!("Sent handshake re-initiation after expiry");
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
warn!("Failed to re-initiate handshake: {:?}", e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
debug!("Client timer error: {:?}", e);
|
||||
}
|
||||
@@ -1028,6 +1184,19 @@ mod tests {
|
||||
assert_eq!(public.to_bytes(), derived_public.to_bytes());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wg_public_key_from_private() {
|
||||
let (pub_b64, priv_b64) = generate_wg_keypair();
|
||||
let derived = wg_public_key_from_private(&priv_b64).unwrap();
|
||||
assert_eq!(derived, pub_b64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wg_public_key_from_private_invalid() {
|
||||
assert!(wg_public_key_from_private("not-valid").is_err());
|
||||
assert!(wg_public_key_from_private("AAAA").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_invalid_key() {
|
||||
assert!(parse_private_key("not-valid-base64!!!").is_err());
|
||||
@@ -1171,7 +1340,7 @@ mod tests {
|
||||
let _ = server_tunn.decapsulate(None, &pkt_copy, &mut buf_b);
|
||||
}
|
||||
TunnResult::Done => {}
|
||||
other => {
|
||||
_other => {
|
||||
// Drain
|
||||
loop {
|
||||
match client_tunn.decapsulate(None, &[], &mut buf_a) {
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartvpn',
|
||||
version: '1.15.0',
|
||||
version: '1.18.0',
|
||||
description: 'A VPN solution with TypeScript control plane and Rust data plane daemon'
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ export interface IVpnServerConfig {
|
||||
enableNat?: boolean;
|
||||
/** Forwarding mode: 'tun' (kernel TUN, requires root), 'socket' (userspace NAT),
|
||||
* or 'testing' (monitoring only). Default: 'testing'. */
|
||||
forwardingMode?: 'tun' | 'socket' | 'testing';
|
||||
forwardingMode?: 'tun' | 'socket' | 'bridge' | 'testing';
|
||||
/** Default rate limit for new clients (bytes/sec). Omit for unlimited. */
|
||||
defaultRateLimitBytesPerSec?: number;
|
||||
/** Default burst size for new clients (bytes). Omit for unlimited. */
|
||||
@@ -129,6 +129,30 @@ export interface IVpnServerConfig {
|
||||
* Controls where decrypted traffic goes: allow through, block, or redirect to a target.
|
||||
* Default: all traffic passes through (backward compatible). */
|
||||
destinationPolicy?: IDestinationPolicy;
|
||||
/** Public endpoint address for generated client configs (e.g. 'vpn.example.com:51820').
|
||||
* Used as the WireGuard `Endpoint =` and SmartVPN `serverUrl` host.
|
||||
* Defaults to listenAddr (which is typically wrong for remote clients). */
|
||||
serverEndpoint?: string;
|
||||
/** AllowedIPs for generated WireGuard client configs.
|
||||
* Controls what traffic the client routes through the VPN tunnel.
|
||||
* Defaults to ['0.0.0.0/0'] (full tunnel). Set to e.g. ['10.8.0.0/24'] for split tunnel. */
|
||||
clientAllowedIPs?: string[];
|
||||
|
||||
// Bridge mode configuration (forwardingMode: 'bridge')
|
||||
|
||||
/** LAN subnet CIDR for bridge mode (e.g. '192.168.1.0/24').
|
||||
* VPN clients get IPs from this subnet instead of the VPN subnet.
|
||||
* Required when forwardingMode is 'bridge'. */
|
||||
bridgeLanSubnet?: string;
|
||||
/** Physical network interface to bridge (e.g. 'eth0').
|
||||
* Auto-detected from the default route if omitted. */
|
||||
bridgePhysicalInterface?: string;
|
||||
/** Start of VPN client IP range within the LAN subnet (host offset, e.g. 200 for .200).
|
||||
* Default: 200. */
|
||||
bridgeIpRangeStart?: number;
|
||||
/** End of VPN client IP range within the LAN subnet (host offset, e.g. 250 for .250).
|
||||
* Default: 250. */
|
||||
bridgeIpRangeEnd?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -209,6 +233,14 @@ export interface IVpnClientInfo {
|
||||
export interface IVpnServerStatistics extends IVpnStatistics {
|
||||
activeClients: number;
|
||||
totalConnections: number;
|
||||
/** Per-transport active client counts. */
|
||||
activeClientsWebsocket: number;
|
||||
activeClientsQuic: number;
|
||||
activeClientsWireguard: number;
|
||||
/** Per-transport total connection counts. */
|
||||
totalConnectionsWebsocket: number;
|
||||
totalConnectionsQuic: number;
|
||||
totalConnectionsWireguard: number;
|
||||
}
|
||||
|
||||
export interface IVpnKeypair {
|
||||
@@ -294,6 +326,10 @@ export interface IClientSecurity {
|
||||
maxConnections?: number;
|
||||
/** Per-client rate limiting. */
|
||||
rateLimit?: IClientRateLimit;
|
||||
/** Per-client destination routing policy override.
|
||||
* When set, overrides the server-level destinationPolicy for this client's traffic.
|
||||
* Supports the same options: forceTarget, block, allow with allow/block lists. */
|
||||
destinationPolicy?: IDestinationPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user