use std::collections::{HashMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use smoltcp::iface::{Config, Interface, SocketHandle, SocketSet}; use smoltcp::phy::{self, Device, DeviceCapabilities, Medium}; use smoltcp::socket::{tcp, udp}; use smoltcp::wire::{HardwareAddress, IpAddress, IpCidr, IpEndpoint}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpStream, UdpSocket}; use tokio::sync::mpsc; use tracing::{debug, info, warn}; use crate::acl; use crate::server::{DestinationPolicyConfig, ServerState}; use crate::tunnel; /// Maximum size of per-session pending send buffer (512KB = 8x socket buffer). /// Sessions exceeding this are aborted — the client cannot keep up. const TCP_PENDING_SEND_MAX: usize = 512 * 1024; // ============================================================================ // Virtual IP device for smoltcp // ============================================================================ pub struct VirtualIpDevice { rx_queue: VecDeque>, tx_queue: VecDeque>, mtu: usize, } impl VirtualIpDevice { pub fn new(mtu: usize) -> Self { Self { rx_queue: VecDeque::new(), tx_queue: VecDeque::new(), mtu, } } pub fn inject_packet(&mut self, packet: Vec) { self.rx_queue.push_back(packet); } pub fn drain_tx(&mut self) -> impl Iterator> + '_ { self.tx_queue.drain(..) } } pub struct VirtualRxToken { buffer: Vec, } impl phy::RxToken for VirtualRxToken { fn consume(self, f: F) -> R where F: FnOnce(&[u8]) -> R, { f(&self.buffer) } } pub struct VirtualTxToken<'a> { queue: &'a mut VecDeque>, } impl<'a> phy::TxToken for VirtualTxToken<'a> { fn consume(self, len: usize, f: F) -> R where F: FnOnce(&mut [u8]) -> R, { let mut buffer = vec![0u8; len]; let result = f(&mut buffer); self.queue.push_back(buffer); result } } impl Device for VirtualIpDevice { type RxToken<'a> = VirtualRxToken; type TxToken<'a> = VirtualTxToken<'a>; fn receive( &mut self, _timestamp: smoltcp::time::Instant, ) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { self.rx_queue.pop_front().map(|buffer| { let rx = VirtualRxToken { buffer }; let tx = VirtualTxToken { queue: &mut self.tx_queue, }; (rx, tx) }) } fn transmit(&mut self, _timestamp: smoltcp::time::Instant) -> Option> { Some(VirtualTxToken { queue: &mut self.tx_queue, }) } fn capabilities(&self) -> DeviceCapabilities { let mut caps = DeviceCapabilities::default(); caps.medium = Medium::Ip; caps.max_transmission_unit = self.mtu; caps.max_burst_size = None; caps } } // ============================================================================ // Session tracking // ============================================================================ #[derive(Debug, Clone, Hash, Eq, PartialEq)] struct SessionKey { src_ip: Ipv4Addr, src_port: u16, dst_ip: Ipv4Addr, dst_port: u16, protocol: u8, } struct TcpSession { smoltcp_handle: SocketHandle, /// Channel to send data to the bridge task. None until bridge starts. bridge_data_tx: Option>>, #[allow(dead_code)] client_ip: Ipv4Addr, /// Bridge task has been spawned (deferred until handshake completes) bridge_started: bool, /// Address to connect the bridge task to (may differ from dst if policy rewrote it) connect_addr: SocketAddr, /// Buffered data from bridge waiting to be written to smoltcp socket pending_send: Vec, /// Session is closing (FIN in progress), don't accept new SYNs closing: bool, /// Last time data flowed through this session (for idle timeout) last_activity: tokio::time::Instant, } struct UdpSession { smoltcp_handle: SocketHandle, bridge_data_tx: mpsc::Sender>, #[allow(dead_code)] client_ip: Ipv4Addr, last_activity: tokio::time::Instant, } enum BridgeMessage { TcpData { key: SessionKey, data: Vec }, TcpClosed { key: SessionKey }, UdpData { key: SessionKey, data: Vec }, } // ============================================================================ // IP packet parsing helpers // ============================================================================ fn parse_ipv4_header(packet: &[u8]) -> Option<(u8, Ipv4Addr, Ipv4Addr, u8)> { if packet.len() < 20 { return None; } let version = packet[0] >> 4; if version != 4 { return None; } let ihl = (packet[0] & 0x0F) as usize * 4; let protocol = packet[9]; let src = Ipv4Addr::new(packet[12], packet[13], packet[14], packet[15]); let dst = Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]); Some((ihl as u8, src, dst, protocol)) } fn parse_tcp_ports(packet: &[u8], ihl: usize) -> Option<(u16, u16, u8)> { if packet.len() < ihl + 14 { return None; } let src_port = u16::from_be_bytes([packet[ihl], packet[ihl + 1]]); let dst_port = u16::from_be_bytes([packet[ihl + 2], packet[ihl + 3]]); let flags = packet[ihl + 13]; Some((src_port, dst_port, flags)) } fn parse_udp_ports(packet: &[u8], ihl: usize) -> Option<(u16, u16)> { if packet.len() < ihl + 4 { return None; } let src_port = u16::from_be_bytes([packet[ihl], packet[ihl + 1]]); let dst_port = u16::from_be_bytes([packet[ihl + 2], packet[ihl + 3]]); Some((src_port, dst_port)) } // ============================================================================ // NAT Engine // ============================================================================ pub struct NatEngine { device: VirtualIpDevice, iface: Interface, sockets: SocketSet<'static>, tcp_sessions: HashMap, udp_sessions: HashMap, state: Arc, bridge_rx: mpsc::Receiver, bridge_tx: mpsc::Sender, start_time: std::time::Instant, /// When true, outbound TCP connections prepend PROXY protocol v2 headers /// with the VPN client's tunnel IP as source address. proxy_protocol: bool, /// Destination routing policy: forceTarget, block, or allow. destination_policy: Option, } /// Result of destination policy evaluation. enum DestinationAction { /// Connect to the original destination. PassThrough(SocketAddr), /// Redirect to a target IP, preserving original port. ForceTarget(SocketAddr), /// Drop the packet silently. Drop, } impl NatEngine { pub fn new(gateway_ip: Ipv4Addr, mtu: usize, state: Arc, proxy_protocol: bool, destination_policy: Option) -> Self { let mut device = VirtualIpDevice::new(mtu); let config = Config::new(HardwareAddress::Ip); let now = smoltcp::time::Instant::from_millis(0); let mut iface = Interface::new(config, &mut device, now); // Accept packets to ANY destination IP (essential for NAT) iface.set_any_ip(true); // Assign the gateway IP as the interface address iface.update_ip_addrs(|addrs| { addrs .push(IpCidr::new(IpAddress::Ipv4(gateway_ip.into()), 24)) .unwrap(); }); // Add a default route so smoltcp knows where to send packets iface.routes_mut().add_default_ipv4_route(gateway_ip.into()).unwrap(); let sockets = SocketSet::new(Vec::with_capacity(256)); let (bridge_tx, bridge_rx) = mpsc::channel(4096); Self { device, iface, sockets, tcp_sessions: HashMap::new(), udp_sessions: HashMap::new(), state, bridge_rx, bridge_tx, start_time: std::time::Instant::now(), proxy_protocol, destination_policy, } } fn smoltcp_now(&self) -> smoltcp::time::Instant { smoltcp::time::Instant::from_millis(self.start_time.elapsed().as_millis() as i64) } /// Evaluate destination policy for a packet's destination IP. fn evaluate_destination(&self, dst_ip: Ipv4Addr, dst_port: u16) -> DestinationAction { let policy = match &self.destination_policy { Some(p) => p, None => return DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port)), }; // 1. Block list wins (deny overrides allow) if let Some(ref block_list) = policy.block_list { if !block_list.is_empty() && acl::ip_matches_any(dst_ip, block_list) { return DestinationAction::Drop; } } // 2. Allow list — pass through directly if let Some(ref allow_list) = policy.allow_list { if !allow_list.is_empty() && acl::ip_matches_any(dst_ip, allow_list) { return DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port)); } } // 3. Default action match policy.default.as_str() { "forceTarget" => { let target_ip = policy.target.as_deref() .and_then(|t| t.parse::().ok()) .unwrap_or(Ipv4Addr::LOCALHOST); DestinationAction::ForceTarget(SocketAddr::new(target_ip.into(), dst_port)) } "block" => DestinationAction::Drop, _ => DestinationAction::PassThrough(SocketAddr::new(dst_ip.into(), dst_port)), } } /// Inject a raw IP packet from a VPN client and handle new session creation. fn inject_packet(&mut self, packet: Vec) { let Some((ihl, src_ip, dst_ip, protocol)) = parse_ipv4_header(&packet) else { return; }; let ihl = ihl as usize; match protocol { 6 => { // TCP let Some((src_port, dst_port, flags)) = parse_tcp_ports(&packet, ihl) else { return; }; let key = SessionKey { src_ip, src_port, dst_ip, dst_port, protocol: 6, }; // SYN without ACK = new connection let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0; // Skip if session exists (including closing sessions — let FIN complete) let session_exists = self.tcp_sessions.contains_key(&key); if is_syn && !session_exists { match self.evaluate_destination(dst_ip, dst_port) { DestinationAction::Drop => { debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port); return; } DestinationAction::PassThrough(addr) => self.create_tcp_session(&key, addr), DestinationAction::ForceTarget(addr) => self.create_tcp_session(&key, addr), } } } 17 => { // UDP let Some((src_port, dst_port)) = parse_udp_ports(&packet, ihl) else { return; }; let key = SessionKey { src_ip, src_port, dst_ip, dst_port, protocol: 17, }; if !self.udp_sessions.contains_key(&key) { match self.evaluate_destination(dst_ip, dst_port) { DestinationAction::Drop => { debug!("NAT: destination policy blocked UDP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port); return; } DestinationAction::PassThrough(addr) => self.create_udp_session(&key, addr), DestinationAction::ForceTarget(addr) => self.create_udp_session(&key, addr), } } // Update last_activity for existing sessions if let Some(session) = self.udp_sessions.get_mut(&key) { session.last_activity = tokio::time::Instant::now(); } } _ => { // ICMP and other protocols — not forwarded in socket mode return; } } self.device.inject_packet(packet); } fn create_tcp_session(&mut self, key: &SessionKey, connect_addr: SocketAddr) { // Create smoltcp TCP socket let tcp_rx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]); let tcp_tx_buf = tcp::SocketBuffer::new(vec![0u8; 65535]); let mut socket = tcp::Socket::new(tcp_rx_buf, tcp_tx_buf); // Listen on the destination address so smoltcp accepts the SYN let endpoint = IpEndpoint::new( IpAddress::Ipv4(key.dst_ip.into()), key.dst_port, ); if socket.listen(endpoint).is_err() { warn!("NAT: failed to listen on {:?}", endpoint); return; } let handle = self.sockets.add(socket); let session = TcpSession { smoltcp_handle: handle, bridge_data_tx: None, client_ip: key.src_ip, bridge_started: false, connect_addr, pending_send: Vec::new(), closing: false, last_activity: tokio::time::Instant::now(), }; self.tcp_sessions.insert(key.clone(), session); // NOTE: Bridge task is NOT spawned here — it will be spawned in process() // once the smoltcp handshake completes (socket.is_active() == true). // This prevents data from the real server arriving before the VPN client // handshake is done, which would cause silent data loss. debug!( "NAT: new TCP session {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port ); } fn create_udp_session(&mut self, key: &SessionKey, connect_addr: SocketAddr) { // Create smoltcp UDP socket let udp_rx_buf = udp::PacketBuffer::new( vec![udp::PacketMetadata::EMPTY; 32], vec![0u8; 65535], ); let udp_tx_buf = udp::PacketBuffer::new( vec![udp::PacketMetadata::EMPTY; 32], vec![0u8; 65535], ); let mut socket = udp::Socket::new(udp_rx_buf, udp_tx_buf); let endpoint = IpEndpoint::new( IpAddress::Ipv4(key.dst_ip.into()), key.dst_port, ); if socket.bind(endpoint).is_err() { warn!("NAT: failed to bind UDP on {:?}", endpoint); return; } let handle = self.sockets.add(socket); let (data_tx, data_rx) = mpsc::channel::>(256); let session = UdpSession { smoltcp_handle: handle, bridge_data_tx: data_tx, client_ip: key.src_ip, last_activity: tokio::time::Instant::now(), }; self.udp_sessions.insert(key.clone(), session); let bridge_tx = self.bridge_tx.clone(); let key_clone = key.clone(); tokio::spawn(async move { udp_bridge_task(key_clone, data_rx, bridge_tx, connect_addr).await; }); debug!( "NAT: new UDP session {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port ); } /// Poll smoltcp, bridge data between smoltcp sockets and bridge tasks, /// and dispatch outgoing packets to VPN clients. async fn process(&mut self) { let now = self.smoltcp_now(); self.iface .poll(now, &mut self.device, &mut self.sockets); // Start bridge tasks for sessions whose handshake just completed let bridge_tx_clone = self.bridge_tx.clone(); let proxy_protocol = self.proxy_protocol; for (key, session) in self.tcp_sessions.iter_mut() { if !session.bridge_started && !session.closing { let socket = self.sockets.get_mut::(session.smoltcp_handle); if socket.is_active() { session.bridge_started = true; let (data_tx, data_rx) = mpsc::channel::>(256); session.bridge_data_tx = Some(data_tx); let btx = bridge_tx_clone.clone(); let k = key.clone(); let addr = session.connect_addr; let pp = proxy_protocol; tokio::spawn(async move { tcp_bridge_task(k, data_rx, btx, pp, addr).await; }); debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port); } } } // Flush pending send buffers to smoltcp sockets for (_key, session) in self.tcp_sessions.iter_mut() { if !session.pending_send.is_empty() { let socket = self.sockets.get_mut::(session.smoltcp_handle); if socket.can_send() { match socket.send_slice(&session.pending_send) { Ok(written) if written > 0 => { session.pending_send.drain(..written); } _ => {} } } } } // Bridge: read data from smoltcp TCP sockets → send to bridge tasks let mut closed_tcp: Vec = Vec::new(); let mut active_tcp: Vec = Vec::new(); for (key, session) in &self.tcp_sessions { let socket = self.sockets.get_mut::(session.smoltcp_handle); if session.bridge_started && socket.can_recv() { if let Some(ref sender) = session.bridge_data_tx { // Reserve channel slot BEFORE consuming from smoltcp. // If the channel is full, we don't consume — smoltcp's RX buffer // fills up, it stops advertising TCP window space, and the VPN // client's TCP stack backs off. Proper end-to-end backpressure. match sender.try_reserve() { Ok(permit) => { let _ = socket.recv(|data| { permit.send(data.to_vec()); (data.len(), ()) }); active_tcp.push(key.clone()); } Err(_) => { debug!("NAT: bridge channel full for {}:{} -> {}:{}, applying backpressure", key.src_ip, key.src_port, key.dst_ip, key.dst_port); } } } } // Detect closed connections if !socket.is_open() && !socket.is_listening() { closed_tcp.push(key.clone()); } } // Update last_activity for sessions that had data flow let now = tokio::time::Instant::now(); for key in active_tcp { if let Some(session) = self.tcp_sessions.get_mut(&key) { session.last_activity = now; } } // Clean up closed TCP sessions for key in closed_tcp { if let Some(session) = self.tcp_sessions.remove(&key) { self.sockets.remove(session.smoltcp_handle); debug!("NAT: TCP session closed {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port); } } // Bridge: read data from smoltcp UDP sockets → send to bridge tasks for (_key, session) in &self.udp_sessions { let socket = self.sockets.get_mut::(session.smoltcp_handle); while let Ok((data, _meta)) = socket.recv() { if session.bridge_data_tx.try_send(data.to_vec()).is_err() { debug!("NAT: bridge channel full, UDP data dropped"); } } } // Dispatch outgoing packets from smoltcp to VPN clients let routes = self.state.tun_routes.read().await; for packet in self.device.drain_tx() { if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) { if let Some(sender) = routes.get(&dst_ip) { if sender.try_send(packet).is_err() { debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip); } } } } } fn handle_bridge_message(&mut self, msg: BridgeMessage) { match msg { BridgeMessage::TcpData { key, data } => { if let Some(session) = self.tcp_sessions.get_mut(&key) { session.last_activity = tokio::time::Instant::now(); // Append to pending buffer, then flush as much as possible session.pending_send.extend_from_slice(&data); let socket = self.sockets.get_mut::(session.smoltcp_handle); if socket.can_send() && !session.pending_send.is_empty() { match socket.send_slice(&session.pending_send) { Ok(written) if written > 0 => { session.pending_send.drain(..written); } _ => {} } } // Cap check — abort session if client can't keep up if session.pending_send.len() > TCP_PENDING_SEND_MAX { warn!( "NAT: TCP session {}:{} -> {}:{} pending buffer exceeded {}KB, aborting", key.src_ip, key.src_port, key.dst_ip, key.dst_port, TCP_PENDING_SEND_MAX / 1024 ); let socket = self.sockets.get_mut::(session.smoltcp_handle); socket.abort(); session.pending_send.clear(); session.closing = true; } } } BridgeMessage::TcpClosed { key } => { if let Some(session) = self.tcp_sessions.get_mut(&key) { let socket = self.sockets.get_mut::(session.smoltcp_handle); socket.close(); session.closing = true; // Don't remove from SocketSet yet — let smoltcp send FIN // It will be cleaned up in process() when is_open() returns false } } BridgeMessage::UdpData { key, data } => { if let Some(session) = self.udp_sessions.get_mut(&key) { session.last_activity = tokio::time::Instant::now(); let socket = self.sockets.get_mut::(session.smoltcp_handle); let dst_endpoint = IpEndpoint::new( IpAddress::Ipv4(key.src_ip.into()), key.src_port, ); // Send response: from the "server" (dst) back to the "client" (src) let _ = socket.send_slice(&data, dst_endpoint); } } } } fn cleanup_idle_udp_sessions(&mut self) { let timeout = Duration::from_secs(60); let now = tokio::time::Instant::now(); let expired: Vec = self .udp_sessions .iter() .filter(|(_, s)| now.duration_since(s.last_activity) > timeout) .map(|(k, _)| k.clone()) .collect(); for key in expired { if let Some(session) = self.udp_sessions.remove(&key) { self.sockets.remove(session.smoltcp_handle); debug!( "NAT: UDP session timed out {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port ); } } } fn cleanup_idle_tcp_sessions(&mut self) { let timeout = Duration::from_secs(300); // 5 minutes let now = tokio::time::Instant::now(); let expired: Vec = self .tcp_sessions .iter() .filter(|(_, s)| now.duration_since(s.last_activity) > timeout) .map(|(k, _)| k.clone()) .collect(); for key in expired { if let Some(session) = self.tcp_sessions.remove(&key) { let socket = self.sockets.get_mut::(session.smoltcp_handle); socket.abort(); self.sockets.remove(session.smoltcp_handle); warn!( "NAT: TCP session timed out {}:{} -> {}:{}", key.src_ip, key.src_port, key.dst_ip, key.dst_port ); } } } /// Main async event loop for the NAT engine. pub async fn run( mut self, mut packet_rx: mpsc::Receiver>, mut shutdown_rx: mpsc::Receiver<()>, ) -> Result<()> { info!("Userspace NAT engine started"); let default_poll_delay = Duration::from_millis(50); let mut cleanup_timer = tokio::time::interval(Duration::from_secs(10)); // Dynamic poll timer — reset after each event using smoltcp's poll_delay() let poll_sleep = tokio::time::sleep(default_poll_delay); tokio::pin!(poll_sleep); loop { tokio::select! { Some(packet) = packet_rx.recv() => { self.inject_packet(packet); self.process().await; } Some(msg) = self.bridge_rx.recv() => { self.handle_bridge_message(msg); self.process().await; } () = &mut poll_sleep => { // Periodic poll for smoltcp maintenance (TCP retransmit, etc.) self.process().await; } _ = cleanup_timer.tick() => { self.cleanup_idle_udp_sessions(); self.cleanup_idle_tcp_sessions(); } _ = shutdown_rx.recv() => { info!("Userspace NAT engine shutting down"); break; } } // Reset poll delay based on smoltcp's actual timer needs let now = self.smoltcp_now(); let delay = self.iface.poll_delay(now, &self.sockets) .map(|d| Duration::from_millis(d.total_millis())) .unwrap_or(default_poll_delay); poll_sleep.as_mut().reset(tokio::time::Instant::now() + delay); } Ok(()) } } // ============================================================================ // Bridge tasks // ============================================================================ async fn tcp_bridge_task( key: SessionKey, mut data_rx: mpsc::Receiver>, bridge_tx: mpsc::Sender, proxy_protocol: bool, connect_addr: SocketAddr, ) { // Connect to resolved destination (may differ from key.dst_ip if policy rewrote it) let stream = match tokio::time::timeout(Duration::from_secs(30), TcpStream::connect(connect_addr)).await { Ok(Ok(s)) => s, Ok(Err(e)) => { debug!("NAT TCP connect to {} failed: {}", connect_addr, e); let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await; return; } Err(_) => { debug!("NAT TCP connect to {} timed out", connect_addr); let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await; return; } }; let (mut reader, mut writer) = stream.into_split(); // Send PROXY protocol v2 header with VPN client's tunnel IP as source if proxy_protocol { let src = SocketAddr::new(key.src_ip.into(), key.src_port); let dst = SocketAddr::new(key.dst_ip.into(), key.dst_port); let pp_header = crate::proxy_protocol::build_pp_v2_header(src, dst); if let Err(e) = writer.write_all(&pp_header).await { debug!("NAT: failed to send PP v2 header to {}: {}", connect_addr, e); let _ = bridge_tx.send(BridgeMessage::TcpClosed { key }).await; return; } } // Read from real socket → send to NAT engine let bridge_tx2 = bridge_tx.clone(); let key2 = key.clone(); let read_task = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; loop { match reader.read(&mut buf).await { Ok(0) => break, Ok(n) => { if bridge_tx2 .send(BridgeMessage::TcpData { key: key2.clone(), data: buf[..n].to_vec(), }) .await .is_err() { break; } } Err(_) => break, } } let _ = bridge_tx2 .send(BridgeMessage::TcpClosed { key: key2 }) .await; }); // Receive from NAT engine → write to real socket while let Some(data) = data_rx.recv().await { if writer.write_all(&data).await.is_err() { break; } } read_task.abort(); } async fn udp_bridge_task( key: SessionKey, mut data_rx: mpsc::Receiver>, bridge_tx: mpsc::Sender, connect_addr: SocketAddr, ) { let socket = match UdpSocket::bind("0.0.0.0:0").await { Ok(s) => s, Err(e) => { warn!("NAT UDP bind failed: {}", e); return; } }; let dest = connect_addr; let socket = Arc::new(socket); let socket2 = socket.clone(); let bridge_tx2 = bridge_tx.clone(); let key2 = key.clone(); // Read responses from real socket let read_task = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; loop { match socket2.recv_from(&mut buf).await { Ok((n, _src)) => { if bridge_tx2 .send(BridgeMessage::UdpData { key: key2.clone(), data: buf[..n].to_vec(), }) .await .is_err() { break; } } Err(_) => break, } } }); // Forward data from NAT engine to real socket while let Some(data) = data_rx.recv().await { let _ = socket.send_to(&data, dest).await; } read_task.abort(); }