diff --git a/changelog.md b/changelog.md index e8f94a0..84abfe2 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-31 - 1.16.3 - fix(rust-nat) +defer TCP bridge startup until handshake completion and buffer partial NAT socket writes + +- Start TCP bridge tasks only after the smoltcp socket becomes active to prevent server data from arriving before the client handshake completes. +- Buffer pending TCP payloads and flush partial writes so bridge-to-socket data is not silently lost under backpressure. +- Keep closing TCP sessions alive until FIN processing completes and add logging for dropped packets when bridge or route channels are full. + ## 2026-03-31 - 1.16.2 - fix(wireguard) sync runtime peer management with client registration and derive the correct server public key from the WireGuard private key diff --git a/rust/src/userspace_nat.rs b/rust/src/userspace_nat.rs index 34bd9c9..c3d961c 100644 --- a/rust/src/userspace_nat.rs +++ b/rust/src/userspace_nat.rs @@ -101,7 +101,7 @@ impl Device for VirtualIpDevice { let mut caps = DeviceCapabilities::default(); caps.medium = Medium::Ip; caps.max_transmission_unit = self.mtu; - caps.max_burst_size = Some(1); + caps.max_burst_size = None; caps } } @@ -124,6 +124,14 @@ struct TcpSession { bridge_data_tx: mpsc::Sender>, #[allow(dead_code)] client_ip: Ipv4Addr, + /// Bridge task has been spawned (deferred until handshake completes) + bridge_started: bool, + /// Address to connect the bridge task to (may differ from dst if policy rewrote it) + connect_addr: SocketAddr, + /// Buffered data from bridge waiting to be written to smoltcp socket + pending_send: Vec, + /// Session is closing (FIN in progress), don't accept new SYNs + closing: bool, } struct UdpSession { @@ -308,7 +316,9 @@ impl NatEngine { // SYN without ACK = new connection let is_syn = (flags & 0x02) != 0 && (flags & 0x10) == 0; - if is_syn && !self.tcp_sessions.contains_key(&key) { + // Skip if session exists (including closing sessions — let FIN complete) + let session_exists = self.tcp_sessions.contains_key(&key); + if is_syn && !session_exists { match self.evaluate_destination(dst_ip, dst_port) { DestinationAction::Drop => { debug!("NAT: destination policy blocked TCP {}:{} -> {}:{}", src_ip, src_port, dst_ip, dst_port); @@ -376,22 +386,23 @@ impl NatEngine { let handle = self.sockets.add(socket); // Channel for sending data from NAT engine to bridge task - let (data_tx, data_rx) = mpsc::channel::>(256); + let (data_tx, _data_rx) = mpsc::channel::>(256); let session = TcpSession { smoltcp_handle: handle, bridge_data_tx: data_tx, client_ip: key.src_ip, + bridge_started: false, + connect_addr, + pending_send: Vec::new(), + closing: false, }; self.tcp_sessions.insert(key.clone(), session); - // Spawn bridge task that connects to the resolved destination - let bridge_tx = self.bridge_tx.clone(); - let key_clone = key.clone(); - let proxy_protocol = self.proxy_protocol; - tokio::spawn(async move { - tcp_bridge_task(key_clone, data_rx, bridge_tx, proxy_protocol, connect_addr).await; - }); + // NOTE: Bridge task is NOT spawned here — it will be spawned in process() + // once the smoltcp handshake completes (socket.is_active() == true). + // This prevents data from the real server arriving before the VPN client + // handshake is done, which would cause silent data loss. debug!( "NAT: new TCP session {}:{} -> {}:{}", @@ -451,13 +462,54 @@ impl NatEngine { self.iface .poll(now, &mut self.device, &mut self.sockets); + // Start bridge tasks for sessions whose handshake just completed + let bridge_tx_clone = self.bridge_tx.clone(); + let proxy_protocol = self.proxy_protocol; + for (key, session) in self.tcp_sessions.iter_mut() { + if !session.bridge_started && !session.closing { + let socket = self.sockets.get_mut::(session.smoltcp_handle); + if socket.is_active() { + session.bridge_started = true; + // Recreate the data channel — the old receiver was dropped + let (data_tx, data_rx) = mpsc::channel::>(256); + session.bridge_data_tx = data_tx; + let btx = bridge_tx_clone.clone(); + let k = key.clone(); + let addr = session.connect_addr; + let pp = proxy_protocol; + tokio::spawn(async move { + tcp_bridge_task(k, data_rx, btx, pp, addr).await; + }); + debug!("NAT: TCP handshake complete, starting bridge for {}:{} -> {}:{}", + key.src_ip, key.src_port, key.dst_ip, key.dst_port); + } + } + } + + // Flush pending send buffers to smoltcp sockets + for (_key, session) in self.tcp_sessions.iter_mut() { + if !session.pending_send.is_empty() { + let socket = self.sockets.get_mut::(session.smoltcp_handle); + if socket.can_send() { + match socket.send_slice(&session.pending_send) { + Ok(written) if written > 0 => { + session.pending_send.drain(..written); + } + _ => {} + } + } + } + } + // Bridge: read data from smoltcp TCP sockets → send to bridge tasks let mut closed_tcp: Vec = Vec::new(); + let mut tcp_outbound: Vec<(mpsc::Sender>, Vec)> = Vec::new(); for (key, session) in &self.tcp_sessions { let socket = self.sockets.get_mut::(session.smoltcp_handle); - if socket.can_recv() { + if session.bridge_started && socket.can_recv() { + let sender = session.bridge_data_tx.clone(); let _ = socket.recv(|data| { - let _ = session.bridge_data_tx.try_send(data.to_vec()); + tcp_outbound.push((sender.clone(), data.to_vec())); (data.len(), ()) }); } @@ -467,6 +519,13 @@ impl NatEngine { } } + // Send TCP data to bridge tasks (outside borrow of self.tcp_sessions) + for (sender, data) in tcp_outbound { + if sender.try_send(data).is_err() { + debug!("NAT: bridge channel full, TCP data dropped"); + } + } + // Clean up closed TCP sessions for key in closed_tcp { if let Some(session) = self.tcp_sessions.remove(&key) { @@ -479,7 +538,9 @@ impl NatEngine { for (_key, session) in &self.udp_sessions { let socket = self.sockets.get_mut::(session.smoltcp_handle); while let Ok((data, _meta)) = socket.recv() { - let _ = session.bridge_data_tx.try_send(data.to_vec()); + if session.bridge_data_tx.try_send(data.to_vec()).is_err() { + debug!("NAT: bridge channel full, UDP data dropped"); + } } } @@ -488,7 +549,9 @@ impl NatEngine { for packet in self.device.drain_tx() { if let Some(std::net::IpAddr::V4(dst_ip)) = tunnel::extract_dst_ip(&packet) { if let Some(sender) = routes.get(&dst_ip) { - let _ = sender.try_send(packet); + if sender.try_send(packet).is_err() { + debug!("NAT: tun_routes channel full for {}, packet dropped", dst_ip); + } } } } @@ -497,22 +560,51 @@ impl NatEngine { fn handle_bridge_message(&mut self, msg: BridgeMessage) { match msg { BridgeMessage::TcpData { key, data } => { - if let Some(session) = self.tcp_sessions.get(&key) { + if let Some(session) = self.tcp_sessions.get_mut(&key) { let socket = self.sockets.get_mut::(session.smoltcp_handle); if socket.can_send() { - let _ = socket.send_slice(&data); + // Try to write directly first + let all_data = if session.pending_send.is_empty() { + &data + } else { + session.pending_send.extend_from_slice(&data); + &session.pending_send.clone() + }; + match socket.send_slice(all_data) { + Ok(written) if written < all_data.len() => { + // Partial write — buffer the rest + if session.pending_send.is_empty() { + session.pending_send = data[written..].to_vec(); + } else { + session.pending_send.drain(..written); + } + } + Ok(_) => { + // Full write — clear any pending data + session.pending_send.clear(); + } + Err(_) => { + // Write failed — buffer everything + if session.pending_send.is_empty() { + session.pending_send = data; + } + } + } + } else { + // Can't send yet — buffer for later + session.pending_send.extend_from_slice(&data); } } } BridgeMessage::TcpClosed { key } => { - if let Some(session) = self.tcp_sessions.remove(&key) { + if let Some(session) = self.tcp_sessions.get_mut(&key) { let socket = self.sockets.get_mut::(session.smoltcp_handle); socket.close(); + session.closing = true; // Don't remove from SocketSet yet — let smoltcp send FIN // It will be cleaned up in process() when is_open() returns false - self.tcp_sessions.insert(key, session); } } BridgeMessage::UdpData { key, data } => { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 885d91e..afe8452 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartvpn', - version: '1.16.2', + version: '1.16.3', description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' }