feat(rust-server, rust-client, ts-interfaces): add configurable packet forwarding with TUN and userspace NAT modes
This commit is contained in:
@@ -19,6 +19,7 @@ use crate::ratelimit::TokenBucket;
|
||||
use crate::transport;
|
||||
use crate::transport_trait::{self, TransportSink, TransportStream};
|
||||
use crate::quic_transport;
|
||||
use crate::tunnel::{self, TunConfig};
|
||||
|
||||
/// Dead-peer timeout: 3x max keepalive interval (Healthy=60s).
|
||||
const DEAD_PEER_TIMEOUT: Duration = Duration::from_secs(180);
|
||||
@@ -37,6 +38,9 @@ pub struct ServerConfig {
|
||||
pub mtu: Option<u16>,
|
||||
pub keepalive_interval_secs: Option<u64>,
|
||||
pub enable_nat: Option<bool>,
|
||||
/// Forwarding mode: "tun" (kernel TUN, requires root), "socket" (userspace NAT),
|
||||
/// or "testing" (monitoring only, no forwarding). Default: "testing".
|
||||
pub forwarding_mode: Option<String>,
|
||||
/// Default rate limit for new clients (bytes/sec). None = unlimited.
|
||||
pub default_rate_limit_bytes_per_sec: Option<u64>,
|
||||
/// Default burst size for new clients (bytes). None = unlimited.
|
||||
@@ -94,6 +98,16 @@ pub struct ServerStatistics {
|
||||
pub total_connections: u64,
|
||||
}
|
||||
|
||||
/// The forwarding engine determines how decrypted IP packets are routed.
|
||||
pub enum ForwardingEngine {
|
||||
/// Kernel TUN device — packets written to the TUN, kernel handles routing.
|
||||
Tun(tokio::io::WriteHalf<tun::AsyncDevice>),
|
||||
/// Userspace NAT — packets sent to smoltcp-based NAT engine via channel.
|
||||
Socket(mpsc::Sender<Vec<u8>>),
|
||||
/// Testing/monitoring — packets are counted but not forwarded.
|
||||
Testing,
|
||||
}
|
||||
|
||||
/// Shared server state.
|
||||
pub struct ServerState {
|
||||
pub config: ServerConfig,
|
||||
@@ -104,6 +118,12 @@ pub struct ServerState {
|
||||
pub mtu_config: MtuConfig,
|
||||
pub started_at: std::time::Instant,
|
||||
pub client_registry: RwLock<ClientRegistry>,
|
||||
/// The forwarding engine for decrypted IP packets.
|
||||
pub forwarding_engine: Mutex<ForwardingEngine>,
|
||||
/// Routing table: assigned VPN IP → channel sender for return packets.
|
||||
pub tun_routes: RwLock<HashMap<Ipv4Addr, mpsc::Sender<Vec<u8>>>>,
|
||||
/// Shutdown signal for the forwarding background task (TUN reader or NAT engine).
|
||||
pub tun_shutdown: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
/// The VPN server.
|
||||
@@ -139,6 +159,51 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
let link_mtu = config.mtu.unwrap_or(1420);
|
||||
let mode = config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let gateway_ip = ip_pool.gateway_addr();
|
||||
|
||||
// Create forwarding engine based on mode
|
||||
enum ForwardingSetup {
|
||||
Tun {
|
||||
writer: tokio::io::WriteHalf<tun::AsyncDevice>,
|
||||
reader: tokio::io::ReadHalf<tun::AsyncDevice>,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
Socket {
|
||||
packet_tx: mpsc::Sender<Vec<u8>>,
|
||||
packet_rx: mpsc::Receiver<Vec<u8>>,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
Testing,
|
||||
}
|
||||
|
||||
let (setup, fwd_shutdown_tx) = match mode {
|
||||
"tun" => {
|
||||
let tun_config = TunConfig {
|
||||
name: "svpn0".to_string(),
|
||||
address: gateway_ip,
|
||||
netmask: Ipv4Addr::new(255, 255, 255, 0),
|
||||
mtu: link_mtu,
|
||||
};
|
||||
let tun_device = tunnel::create_tun(&tun_config)?;
|
||||
tunnel::add_route(&config.subnet, &tun_config.name).await?;
|
||||
let (reader, writer) = tokio::io::split(tun_device);
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Tun { writer, reader, shutdown_rx: rx }, tx)
|
||||
}
|
||||
"socket" => {
|
||||
info!("Starting userspace NAT forwarding (no root required)");
|
||||
let (packet_tx, packet_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx: rx }, tx)
|
||||
}
|
||||
_ => {
|
||||
info!("Forwarding disabled (testing/monitoring mode)");
|
||||
let (tx, _rx) = mpsc::channel::<()>(1);
|
||||
(ForwardingSetup::Testing, tx)
|
||||
}
|
||||
};
|
||||
|
||||
// Compute effective MTU from overhead
|
||||
let overhead = TunnelOverhead::default_overhead();
|
||||
let mtu_config = MtuConfig::new(overhead.effective_tun_mtu(1500).max(link_mtu));
|
||||
@@ -158,8 +223,38 @@ impl VpnServer {
|
||||
mtu_config,
|
||||
started_at: std::time::Instant::now(),
|
||||
client_registry: RwLock::new(registry),
|
||||
forwarding_engine: Mutex::new(ForwardingEngine::Testing),
|
||||
tun_routes: RwLock::new(HashMap::new()),
|
||||
tun_shutdown: fwd_shutdown_tx,
|
||||
});
|
||||
|
||||
// Spawn the forwarding background task and set the engine
|
||||
match setup {
|
||||
ForwardingSetup::Tun { writer, reader, shutdown_rx } => {
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Tun(writer);
|
||||
let tun_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_tun_reader(tun_state, reader, shutdown_rx).await {
|
||||
error!("TUN reader error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
ForwardingSetup::Socket { packet_tx, packet_rx, shutdown_rx } => {
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Socket(packet_tx);
|
||||
let nat_engine = crate::userspace_nat::NatEngine::new(
|
||||
gateway_ip,
|
||||
link_mtu as usize,
|
||||
state.clone(),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = nat_engine.run(packet_rx, shutdown_rx).await {
|
||||
error!("NAT engine error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
ForwardingSetup::Testing => {}
|
||||
}
|
||||
|
||||
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
|
||||
self.state = Some(state.clone());
|
||||
self.shutdown_tx = Some(shutdown_tx);
|
||||
@@ -220,6 +315,34 @@ impl VpnServer {
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) -> Result<()> {
|
||||
if let Some(ref state) = self.state {
|
||||
let mode = state.config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
|
||||
match mode {
|
||||
"tun" => {
|
||||
let _ = state.tun_shutdown.send(()).await;
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Testing;
|
||||
if let Err(e) = tunnel::remove_route(&state.config.subnet, "svpn0").await {
|
||||
warn!("Failed to remove TUN route: {}", e);
|
||||
}
|
||||
}
|
||||
"socket" => {
|
||||
let _ = state.tun_shutdown.send(()).await;
|
||||
*state.forwarding_engine.lock().await = ForwardingEngine::Testing;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Clean up NAT rules
|
||||
if state.config.enable_nat.unwrap_or(false) {
|
||||
if let Ok(iface) = crate::network::get_default_interface() {
|
||||
if let Err(e) = crate::network::remove_nat(&state.config.subnet, &iface).await {
|
||||
warn!("Failed to remove NAT rules: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(tx) = self.shutdown_tx.take() {
|
||||
let _ = tx.send(()).await;
|
||||
}
|
||||
@@ -736,6 +859,56 @@ async fn run_quic_listener(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// TUN reader task: reads IP packets from the TUN device and dispatches them
|
||||
/// to the correct client via the routing table.
|
||||
async fn run_tun_reader(
|
||||
state: Arc<ServerState>,
|
||||
mut tun_reader: tokio::io::ReadHalf<tun::AsyncDevice>,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = tun_reader.read(&mut buf) => {
|
||||
let n = match result {
|
||||
Ok(0) => {
|
||||
info!("TUN reader: device closed");
|
||||
break;
|
||||
}
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
error!("TUN reader error: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// Extract destination IP from the raw IP packet
|
||||
let dst_ip = match tunnel::extract_dst_ip(&buf[..n]) {
|
||||
Some(std::net::IpAddr::V4(v4)) => v4,
|
||||
_ => continue, // IPv6 or malformed — skip
|
||||
};
|
||||
|
||||
// Look up client by destination IP
|
||||
let routes = state.tun_routes.read().await;
|
||||
if let Some(sender) = routes.get(&dst_ip) {
|
||||
if sender.try_send(buf[..n].to_vec()).is_err() {
|
||||
// Channel full or closed — drop packet (correct for IP best-effort)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("TUN reader shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Transport-agnostic client handler. Performs the Noise IK handshake, authenticates
|
||||
/// the client against the registry, and runs the main packet forwarding loop.
|
||||
async fn handle_client_connection(
|
||||
@@ -846,6 +1019,14 @@ async fn handle_client_connection(
|
||||
// Allocate IP
|
||||
let assigned_ip = state.ip_pool.lock().await.allocate(&client_id)?;
|
||||
|
||||
// Create return-packet channel for forwarding engine -> client
|
||||
let (tun_return_tx, mut tun_return_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
let fwd_mode = state.config.forwarding_mode.as_deref().unwrap_or("testing");
|
||||
let forwarding_active = fwd_mode == "tun" || fwd_mode == "socket";
|
||||
if forwarding_active {
|
||||
state.tun_routes.write().await.insert(assigned_ip, tun_return_tx);
|
||||
}
|
||||
|
||||
// Determine rate limits: per-client security overrides server defaults
|
||||
let (rate_limit, burst) = if let Some(ref sec) = client_security {
|
||||
if let Some(ref rl) = sec.rate_limit {
|
||||
@@ -973,6 +1154,24 @@ async fn handle_client_connection(
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
info.bytes_received += len as u64;
|
||||
}
|
||||
drop(clients);
|
||||
|
||||
// Forward decrypted packet via the active engine
|
||||
{
|
||||
let mut engine = state.forwarding_engine.lock().await;
|
||||
match &mut *engine {
|
||||
ForwardingEngine::Tun(writer) => {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
if let Err(e) = writer.write_all(&buf[..len]).await {
|
||||
warn!("TUN write error for client {}: {}", client_id, e);
|
||||
}
|
||||
}
|
||||
ForwardingEngine::Socket(sender) => {
|
||||
let _ = sender.try_send(buf[..len].to_vec());
|
||||
}
|
||||
ForwardingEngine::Testing => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Decrypt error from {}: {}", client_id, e);
|
||||
@@ -1029,6 +1228,37 @@ async fn handle_client_connection(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return packets from TUN device destined for this client
|
||||
Some(packet) = tun_return_rx.recv() => {
|
||||
let pkt_len = packet.len();
|
||||
match noise_transport.write_message(&packet, &mut buf) {
|
||||
Ok(len) => {
|
||||
let frame = Frame {
|
||||
packet_type: PacketType::IpPacket,
|
||||
payload: buf[..len].to_vec(),
|
||||
};
|
||||
let mut frame_bytes = BytesMut::new();
|
||||
<FrameCodec as tokio_util::codec::Encoder<Frame>>::encode(
|
||||
&mut FrameCodec, frame, &mut frame_bytes
|
||||
)?;
|
||||
sink.send_reliable(frame_bytes.to_vec()).await?;
|
||||
|
||||
// Update stats
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.bytes_sent += pkt_len as u64;
|
||||
stats.packets_sent += 1;
|
||||
drop(stats);
|
||||
let mut clients = state.clients.write().await;
|
||||
if let Some(info) = clients.get_mut(&client_id) {
|
||||
info.bytes_sent += pkt_len as u64;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Noise encrypt error for return packet to {}: {}", client_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep_until(last_activity + DEAD_PEER_TIMEOUT) => {
|
||||
warn!("Client {} dead-peer timeout ({}s inactivity)", client_id, DEAD_PEER_TIMEOUT.as_secs());
|
||||
break;
|
||||
@@ -1037,6 +1267,9 @@ async fn handle_client_connection(
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
if forwarding_active {
|
||||
state.tun_routes.write().await.remove(&assigned_ip);
|
||||
}
|
||||
state.clients.write().await.remove(&client_id);
|
||||
state.ip_pool.lock().await.release(&assigned_ip);
|
||||
state.rate_limiters.lock().await.remove(&client_id);
|
||||
|
||||
Reference in New Issue
Block a user