diff --git a/changelog.md b/changelog.md index 9cec4c5..5afb6aa 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-03-19 - 4.11.0 - feat(remoteingress-core) +add UDP tunneling support between edge and hub + +- extend edge and hub handshake/config updates with UDP listen ports +- add UDP tunnel frame types and PROXY protocol v2 header helpers in the protocol crate +- introduce UDP session management on the edge and upstream UDP forwarding on the hub +- add Node.js integration tests covering UDP echo and concurrent datagrams +- expose UDP listen port configuration in the TypeScript hub API + ## 2026-03-19 - 4.10.0 - feat(core,edge,hub,transport) add QUIC tunnel transport support with optional edge transport selection diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 521d427..344ebbe 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::sync::{mpsc, Mutex, Notify, RwLock}; use tokio::task::JoinHandle; use tokio::time::{Instant, sleep_until}; @@ -15,6 +15,7 @@ use bytes::Bytes; use remoteingress_protocol::*; use crate::transport::TransportMode; use crate::transport::quic as quic_transport; +use crate::udp_session::{UdpSessionKey, UdpSessionManager}; type EdgeTlsStream = tokio_rustls::client::TlsStream; @@ -59,6 +60,8 @@ pub struct EdgeConfig { #[serde(rename_all = "camelCase")] struct HandshakeConfig { listen_ports: Vec, + #[serde(default)] + listen_ports_udp: Vec, #[serde(default = "default_stun_interval")] stun_interval_secs: u64, } @@ -72,6 +75,8 @@ fn default_stun_interval() -> u64 { #[serde(rename_all = "camelCase")] struct ConfigUpdate { listen_ports: Vec, + #[serde(default)] + listen_ports_udp: Vec, } /// Events emitted by the edge. @@ -344,7 +349,8 @@ enum EdgeLoopResult { } /// Process a single frame received from the hub side of the tunnel. -/// Handles FRAME_DATA_BACK, FRAME_WINDOW_UPDATE_BACK, FRAME_CLOSE_BACK, FRAME_CONFIG, FRAME_PING. +/// Handles FRAME_DATA_BACK, FRAME_WINDOW_UPDATE_BACK, FRAME_CLOSE_BACK, FRAME_CONFIG, FRAME_PING, +/// and UDP frames: FRAME_UDP_DATA_BACK, FRAME_UDP_CLOSE. async fn handle_edge_frame( frame: Frame, tunnel_io: &mut remoteingress_protocol::TunnelIo, @@ -355,11 +361,14 @@ async fn handle_edge_frame( tunnel_data_tx: &mpsc::Sender, tunnel_sustained_tx: &mpsc::Sender, port_listeners: &mut HashMap>, + udp_listeners: &mut HashMap>, active_streams: &Arc, next_stream_id: &Arc, edge_id: &str, connection_token: &CancellationToken, bind_address: &str, + udp_sessions: &Arc>, + udp_sockets: &Arc>>>, ) -> EdgeFrameAction { match frame.frame_type { FRAME_DATA_BACK => { @@ -394,7 +403,7 @@ async fn handle_edge_frame( } FRAME_CONFIG => { if let Ok(update) = serde_json::from_slice::(&frame.payload) { - log::info!("Config update from hub: ports {:?}", update.listen_ports); + log::info!("Config update from hub: ports {:?}, udp {:?}", update.listen_ports, update.listen_ports_udp); *listen_ports.write().await = update.listen_ports.clone(); let _ = event_tx.try_send(EdgeEvent::PortsUpdated { listen_ports: update.listen_ports.clone(), @@ -412,12 +421,39 @@ async fn handle_edge_frame( connection_token, bind_address, ); + apply_udp_port_config( + &update.listen_ports_udp, + udp_listeners, + tunnel_writer_tx, + tunnel_data_tx, + udp_sessions, + udp_sockets, + next_stream_id, + connection_token, + bind_address, + ); } } FRAME_PING => { // Queue PONG directly — no channel round-trip, guaranteed delivery tunnel_io.queue_ctrl(encode_frame(0, FRAME_PONG, &[])); } + FRAME_UDP_DATA_BACK => { + // Dispatch return UDP datagram to the original client + let mut sessions = udp_sessions.lock().await; + if let Some(session) = sessions.get_by_stream_id(frame.stream_id) { + let client_addr = session.client_addr; + let dest_port = session.dest_port; + let sockets = udp_sockets.lock().await; + if let Some(socket) = sockets.get(&dest_port) { + let _ = socket.send_to(&frame.payload, client_addr).await; + } + } + } + FRAME_UDP_CLOSE => { + let mut sessions = udp_sessions.lock().await; + sessions.remove_by_stream_id(frame.stream_id); + } _ => { log::warn!("Unexpected frame type {} from hub", frame.frame_type); } @@ -581,6 +617,24 @@ async fn connect_to_hub_and_run( bind_address, ); + // UDP session manager + listeners + let udp_sessions: Arc> = + Arc::new(Mutex::new(UdpSessionManager::new(Duration::from_secs(60)))); + let udp_sockets: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + let mut udp_listeners: HashMap> = HashMap::new(); + apply_udp_port_config( + &handshake.listen_ports_udp, + &mut udp_listeners, + &tunnel_ctrl_tx, + &tunnel_data_tx, + &udp_sessions, + &udp_sockets, + next_stream_id, + connection_token, + bind_address, + ); + // Single-owner I/O engine — no tokio::io::split, no mutex let mut tunnel_io = remoteingress_protocol::TunnelIo::new(tls_stream, Vec::new()); @@ -605,7 +659,8 @@ async fn connect_to_hub_and_run( if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame( frame, &mut tunnel_io, &client_writers, listen_ports, event_tx, &tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners, - active_streams, next_stream_id, &config.edge_id, connection_token, bind_address, + &mut udp_listeners, active_streams, next_stream_id, &config.edge_id, + connection_token, bind_address, &udp_sessions, &udp_sockets, ).await { break 'io_loop EdgeLoopResult::Reconnect(reason); } @@ -623,7 +678,8 @@ async fn connect_to_hub_and_run( if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame( frame, &mut tunnel_io, &client_writers, listen_ports, event_tx, &tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners, - active_streams, next_stream_id, &config.edge_id, connection_token, bind_address, + &mut udp_listeners, active_streams, next_stream_id, &config.edge_id, + connection_token, bind_address, &udp_sessions, &udp_sockets, ).await { break EdgeLoopResult::Reconnect(reason); } @@ -661,6 +717,9 @@ async fn connect_to_hub_and_run( for (_, h) in port_listeners.drain() { h.abort(); } + for (_, h) in udp_listeners.drain() { + h.abort(); + } // Graceful TLS shutdown: send close_notify so the hub sees a clean disconnect. // Stream handlers are already cancelled, so no new data is being produced. @@ -790,6 +849,107 @@ fn apply_port_config( } } +/// Apply UDP port configuration: bind UdpSockets for added ports, abort removed ports. +fn apply_udp_port_config( + new_ports: &[u16], + udp_listeners: &mut HashMap>, + tunnel_ctrl_tx: &mpsc::Sender, + tunnel_data_tx: &mpsc::Sender, + udp_sessions: &Arc>, + udp_sockets: &Arc>>>, + next_stream_id: &Arc, + connection_token: &CancellationToken, + bind_address: &str, +) { + let new_set: std::collections::HashSet = new_ports.iter().copied().collect(); + let old_set: std::collections::HashSet = udp_listeners.keys().copied().collect(); + + // Remove ports no longer needed + for &port in old_set.difference(&new_set) { + if let Some(handle) = udp_listeners.remove(&port) { + log::info!("Stopping UDP listener on port {}", port); + handle.abort(); + } + // Remove socket from shared map + let sockets = udp_sockets.clone(); + tokio::spawn(async move { + sockets.lock().await.remove(&port); + }); + } + + // Add new ports + for &port in new_set.difference(&old_set) { + let tunnel_ctrl_tx = tunnel_ctrl_tx.clone(); + let tunnel_data_tx = tunnel_data_tx.clone(); + let udp_sessions = udp_sessions.clone(); + let udp_sockets = udp_sockets.clone(); + let next_stream_id = next_stream_id.clone(); + let port_token = connection_token.child_token(); + let bind_addr = bind_address.to_string(); + + let handle = tokio::spawn(async move { + let socket = match UdpSocket::bind((bind_addr.as_str(), port)).await { + Ok(s) => Arc::new(s), + Err(e) => { + log::error!("Failed to bind UDP port {}: {}", port, e); + return; + } + }; + log::info!("Listening on UDP port {}", port); + + // Register socket in shared map for return traffic + udp_sockets.lock().await.insert(port, socket.clone()); + + let mut buf = vec![0u8; 65536]; // max UDP datagram size + loop { + tokio::select! { + recv_result = socket.recv_from(&mut buf) => { + match recv_result { + Ok((len, client_addr)) => { + let key = UdpSessionKey { client_addr, dest_port: port }; + let mut sessions = udp_sessions.lock().await; + + let stream_id = if let Some(session) = sessions.get_mut(&key) { + session.stream_id + } else { + // New session — allocate stream_id and send UDP_OPEN + let sid = next_stream_id.fetch_add(1, Ordering::Relaxed); + sessions.insert(key, sid); + + let client_ip = client_addr.ip().to_string(); + let client_port = client_addr.port(); + let proxy_header = build_proxy_v2_header_from_str( + &client_ip, "0.0.0.0", client_port, port, + ProxyV2Transport::Udp, + ); + let open_frame = encode_frame(sid, FRAME_UDP_OPEN, &proxy_header); + let _ = tunnel_ctrl_tx.try_send(open_frame); + + log::debug!("New UDP session {} from {} -> port {}", sid, client_addr, port); + sid + }; + drop(sessions); // release lock before sending + + // Send datagram through tunnel + let data_frame = encode_frame(stream_id, FRAME_UDP_DATA, &buf[..len]); + let _ = tunnel_data_tx.try_send(data_frame); + } + Err(e) => { + log::error!("UDP recv error on port {}: {}", port, e); + } + } + } + _ = port_token.cancelled() => { + log::info!("UDP port {} listener cancelled", port); + break; + } + } + } + }); + udp_listeners.insert(port, handle); + } +} + async fn handle_client_connection( client_stream: TcpStream, client_addr: std::net::SocketAddr, diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 7706c36..664063d 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::sync::{mpsc, Mutex, Notify, RwLock, Semaphore}; use tokio::time::{interval, sleep_until, Instant}; use tokio_rustls::TlsAcceptor; @@ -23,6 +23,14 @@ enum FrameAction { Disconnect(String), } +/// Per-UDP-session state tracked in the hub. +struct HubUdpSessionState { + /// Channel for forwarding datagrams from edge to the upstream UdpSocket task. + data_tx: mpsc::Sender, + /// Cancellation token for this session's upstream task. + cancel_token: CancellationToken, +} + /// Per-stream state tracked in the hub's stream map. struct HubStreamState { /// Unbounded channel to deliver FRAME_DATA payloads to the upstream writer task. @@ -69,6 +77,8 @@ pub struct AllowedEdge { pub secret: String, #[serde(default)] pub listen_ports: Vec, + #[serde(default)] + pub listen_ports_udp: Vec, pub stun_interval_secs: Option, } @@ -77,6 +87,8 @@ pub struct AllowedEdge { #[serde(rename_all = "camelCase")] struct HandshakeResponse { listen_ports: Vec, + #[serde(default)] + listen_ports_udp: Vec, stun_interval_secs: u64, } @@ -85,6 +97,8 @@ struct HandshakeResponse { #[serde(rename_all = "camelCase")] pub struct EdgeConfigUpdate { pub listen_ports: Vec, + #[serde(default)] + pub listen_ports_udp: Vec, } /// Runtime status of a connected edge. @@ -179,12 +193,13 @@ impl TunnelHub { if let Some(info) = connected.get(&edge.id) { // Check if ports changed compared to old config let ports_changed = match map.get(&edge.id) { - Some(old) => old.listen_ports != edge.listen_ports, + Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp, None => true, // newly allowed edge that's already connected }; if ports_changed { let update = EdgeConfigUpdate { listen_ports: edge.listen_ports.clone(), + listen_ports_udp: edge.listen_ports_udp.clone(), }; let _ = info.config_tx.try_send(update); } @@ -381,6 +396,7 @@ async fn handle_hub_frame( frame: Frame, tunnel_io: &mut remoteingress_protocol::TunnelIo, streams: &mut HashMap, + udp_sessions: &mut HashMap, stream_semaphore: &Arc, edge_stream_count: &Arc, edge_id: &str, @@ -682,6 +698,96 @@ async fn handle_hub_frame( FRAME_PONG => { log::debug!("Received PONG from edge {}", edge_id); } + FRAME_UDP_OPEN => { + // Open a UDP session: parse PROXY v2 header, connect upstream, start forwarding + let stream_id = frame.stream_id; + let dest_port = parse_dest_port_from_proxy_v2(&frame.payload).unwrap_or(53); + let target = target_host.to_string(); + let data_writer_tx = data_tx.clone(); + let session_token = edge_token.child_token(); + let edge_id_str = edge_id.to_string(); + + // Channel for forwarding datagrams from edge to upstream + let (udp_tx, mut udp_rx) = mpsc::channel::(256); + udp_sessions.insert(stream_id, HubUdpSessionState { + data_tx: udp_tx, + cancel_token: session_token.clone(), + }); + + // Spawn upstream UDP forwarder + tokio::spawn(async move { + let upstream = match UdpSocket::bind("0.0.0.0:0").await { + Ok(s) => s, + Err(e) => { + log::error!("UDP session {} failed to bind: {}", stream_id, e); + return; + } + }; + if let Err(e) = upstream.connect((target.as_str(), dest_port)).await { + log::error!("UDP session {} failed to connect to {}:{}: {}", stream_id, target, dest_port, e); + return; + } + + // Task: upstream -> edge (return datagrams) + let upstream_recv = Arc::new(upstream); + let upstream_send = upstream_recv.clone(); + let recv_token = session_token.clone(); + let recv_handle = tokio::spawn(async move { + let mut buf = vec![0u8; 65536]; + loop { + tokio::select! { + result = upstream_recv.recv(&mut buf) => { + match result { + Ok(len) => { + let frame = encode_frame(stream_id, FRAME_UDP_DATA_BACK, &buf[..len]); + if data_writer_tx.try_send(frame).is_err() { + break; + } + } + Err(e) => { + log::debug!("UDP session {} upstream recv error: {}", stream_id, e); + break; + } + } + } + _ = recv_token.cancelled() => break, + } + } + }); + + // Forward datagrams from edge to upstream + loop { + tokio::select! { + data = udp_rx.recv() => { + match data { + Some(datagram) => { + if let Err(e) = upstream_send.send(&datagram).await { + log::debug!("UDP session {} upstream send error: {}", stream_id, e); + break; + } + } + None => break, + } + } + _ = session_token.cancelled() => break, + } + } + + recv_handle.abort(); + log::debug!("UDP session {} closed for edge {}", stream_id, edge_id_str); + }); + } + FRAME_UDP_DATA => { + // Forward datagram to upstream + if let Some(state) = udp_sessions.get(&frame.stream_id) { + let _ = state.data_tx.try_send(frame.payload); + } + } + FRAME_UDP_CLOSE => { + if let Some(state) = udp_sessions.remove(&frame.stream_id) { + state.cancel_token.cancel(); + } + } _ => { log::warn!("Unexpected frame type {} from edge", frame.frame_type); } @@ -738,14 +844,14 @@ async fn handle_edge_connection( let secret = parts[2]; // Verify credentials and extract edge config - let (listen_ports, stun_interval_secs) = { + let (listen_ports, listen_ports_udp, stun_interval_secs) = { let edges = allowed.read().await; match edges.get(&edge_id) { Some(edge) => { if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) { return Err(format!("invalid secret for edge {}", edge_id).into()); } - (edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300)) + (edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300)) } None => { return Err(format!("unknown edge {}", edge_id).into()); @@ -762,6 +868,7 @@ async fn handle_edge_connection( // Send handshake response with initial config before frame protocol begins let handshake = HandshakeResponse { listen_ports: listen_ports.clone(), + listen_ports_udp: listen_ports_udp.clone(), stun_interval_secs, }; let mut handshake_json = serde_json::to_string(&handshake)?; @@ -771,6 +878,7 @@ async fn handle_edge_connection( // Track this edge let mut streams: HashMap = HashMap::new(); + let mut udp_sessions: HashMap = HashMap::new(); // Per-edge active stream counter for adaptive flow control let edge_stream_count = Arc::new(AtomicU32::new(0)); // Cleanup channel: spawned stream tasks send stream_id here when done @@ -881,7 +989,8 @@ async fn handle_edge_connection( last_activity = Instant::now(); liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let FrameAction::Disconnect(reason) = handle_hub_frame( - frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count, + frame, &mut tunnel_io, &mut streams, &mut udp_sessions, + &stream_semaphore, &edge_stream_count, &edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token, &cleanup_tx, ).await { @@ -904,7 +1013,8 @@ async fn handle_edge_connection( last_activity = Instant::now(); liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur); if let FrameAction::Disconnect(reason) = handle_hub_frame( - frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count, + frame, &mut tunnel_io, &mut streams, &mut udp_sessions, + &stream_semaphore, &edge_stream_count, &edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token, &cleanup_tx, ).await { @@ -976,6 +1086,20 @@ fn parse_dest_port_from_proxy(header: &str) -> Option { } } +/// Parse destination port from a PROXY protocol v2 binary header. +/// The header must be at least 28 bytes (16 fixed + 12 IPv4 address block). +/// Dest port is at bytes 26-27 (network byte order). +fn parse_dest_port_from_proxy_v2(header: &[u8]) -> Option { + if header.len() < 28 { + return None; + } + // Verify signature + if header[0..12] != remoteingress_protocol::PROXY_V2_SIGNATURE { + return None; + } + Some(u16::from_be_bytes([header[26], header[27]])) +} + /// Build TLS server config from PEM strings, or auto-generate self-signed. fn build_tls_config( config: &HubConfig, @@ -1083,14 +1207,14 @@ async fn handle_edge_connection_quic( let secret = parts[2]; // Verify credentials - let (listen_ports, stun_interval_secs) = { + let (listen_ports, listen_ports_udp, stun_interval_secs) = { let edges = allowed.read().await; match edges.get(&edge_id) { Some(edge) => { if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) { return Err(format!("invalid secret for edge {}", edge_id).into()); } - (edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300)) + (edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300)) } None => return Err(format!("unknown edge {}", edge_id).into()), } @@ -1105,6 +1229,7 @@ async fn handle_edge_connection_quic( // Send handshake response on control stream let handshake = HandshakeResponse { listen_ports: listen_ports.clone(), + listen_ports_udp: listen_ports_udp.clone(), stun_interval_secs, }; let mut handshake_json = serde_json::to_string(&handshake)?; @@ -1495,6 +1620,7 @@ mod tests { fn test_handshake_response_serializes_camel_case() { let resp = HandshakeResponse { listen_ports: vec![443, 8080], + listen_ports_udp: vec![], stun_interval_secs: 300, }; let json = serde_json::to_value(&resp).unwrap(); @@ -1509,9 +1635,11 @@ mod tests { fn test_edge_config_update_serializes_camel_case() { let update = EdgeConfigUpdate { listen_ports: vec![80, 443], + listen_ports_udp: vec![53], }; let json = serde_json::to_value(&update).unwrap(); assert_eq!(json["listenPorts"], serde_json::json!([80, 443])); + assert_eq!(json["listenPortsUdp"], serde_json::json!([53])); assert!(json.get("listen_ports").is_none()); } diff --git a/rust/crates/remoteingress-core/src/lib.rs b/rust/crates/remoteingress-core/src/lib.rs index 5ebeb53..3d2c7ad 100644 --- a/rust/crates/remoteingress-core/src/lib.rs +++ b/rust/crates/remoteingress-core/src/lib.rs @@ -2,5 +2,6 @@ pub mod hub; pub mod edge; pub mod stun; pub mod transport; +pub mod udp_session; pub use remoteingress_protocol as protocol; diff --git a/rust/crates/remoteingress-core/src/udp_session.rs b/rust/crates/remoteingress-core/src/udp_session.rs new file mode 100644 index 0000000..5946b6f --- /dev/null +++ b/rust/crates/remoteingress-core/src/udp_session.rs @@ -0,0 +1,210 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use tokio::time::Instant; + +/// Key identifying a unique UDP "session" (one client endpoint talking to one destination port). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct UdpSessionKey { + pub client_addr: SocketAddr, + pub dest_port: u16, +} + +/// A single UDP session tracked by the edge. +pub struct UdpSession { + pub stream_id: u32, + pub client_addr: SocketAddr, + pub dest_port: u16, + pub last_activity: Instant, +} + +/// Manages UDP sessions with idle timeout expiry. +pub struct UdpSessionManager { + /// Forward map: session key → session data. + sessions: HashMap, + /// Reverse map: stream_id → session key (for dispatching return traffic). + by_stream_id: HashMap, + /// Idle timeout duration. + idle_timeout: std::time::Duration, +} + +impl UdpSessionManager { + pub fn new(idle_timeout: std::time::Duration) -> Self { + Self { + sessions: HashMap::new(), + by_stream_id: HashMap::new(), + idle_timeout, + } + } + + /// Look up an existing session by key. Updates last_activity on hit. + pub fn get_mut(&mut self, key: &UdpSessionKey) -> Option<&mut UdpSession> { + let session = self.sessions.get_mut(key)?; + session.last_activity = Instant::now(); + Some(session) + } + + /// Look up a session's client address by stream_id (for return traffic). + pub fn client_addr_for_stream(&self, stream_id: u32) -> Option { + let key = self.by_stream_id.get(&stream_id)?; + self.sessions.get(key).map(|s| s.client_addr) + } + + /// Look up a session by stream_id. Updates last_activity on hit. + pub fn get_by_stream_id(&mut self, stream_id: u32) -> Option<&mut UdpSession> { + let key = self.by_stream_id.get(&stream_id)?; + let session = self.sessions.get_mut(key)?; + session.last_activity = Instant::now(); + Some(session) + } + + /// Insert a new session. Returns a mutable reference to it. + pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession { + let session = UdpSession { + stream_id, + client_addr: key.client_addr, + dest_port: key.dest_port, + last_activity: Instant::now(), + }; + self.by_stream_id.insert(stream_id, key); + self.sessions.entry(key).or_insert(session) + } + + /// Remove a session by stream_id. + pub fn remove_by_stream_id(&mut self, stream_id: u32) -> Option { + if let Some(key) = self.by_stream_id.remove(&stream_id) { + self.sessions.remove(&key) + } else { + None + } + } + + /// Expire idle sessions. Returns the stream_ids of expired sessions. + pub fn expire_idle(&mut self) -> Vec { + let now = Instant::now(); + let timeout = self.idle_timeout; + let expired_keys: Vec = self + .sessions + .iter() + .filter(|(_, s)| now.duration_since(s.last_activity) >= timeout) + .map(|(k, _)| *k) + .collect(); + + let mut expired_ids = Vec::with_capacity(expired_keys.len()); + for key in expired_keys { + if let Some(session) = self.sessions.remove(&key) { + self.by_stream_id.remove(&session.stream_id); + expired_ids.push(session.stream_id); + } + } + expired_ids + } + + /// Number of active sessions. + pub fn len(&self) -> usize { + self.sessions.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + #[test] + fn test_insert_and_lookup() { + let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); + let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + mgr.insert(key, 1); + + assert_eq!(mgr.len(), 1); + assert!(mgr.get_mut(&key).is_some()); + assert_eq!(mgr.get_mut(&key).unwrap().stream_id, 1); + } + + #[test] + fn test_client_addr_for_stream() { + let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); + let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + mgr.insert(key, 42); + + assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000))); + assert_eq!(mgr.client_addr_for_stream(99), None); + } + + #[test] + fn test_remove_by_stream_id() { + let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); + let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + mgr.insert(key, 1); + + let removed = mgr.remove_by_stream_id(1); + assert!(removed.is_some()); + assert_eq!(mgr.len(), 0); + assert!(mgr.get_mut(&key).is_none()); + assert_eq!(mgr.client_addr_for_stream(1), None); + } + + #[test] + fn test_remove_nonexistent() { + let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); + assert!(mgr.remove_by_stream_id(999).is_none()); + } + + #[tokio::test] + async fn test_expire_idle() { + let mut mgr = UdpSessionManager::new(Duration::from_millis(50)); + let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 }; + mgr.insert(key1, 1); + mgr.insert(key2, 2); + + // Nothing expired yet + assert!(mgr.expire_idle().is_empty()); + assert_eq!(mgr.len(), 2); + + // Wait for timeout + tokio::time::sleep(Duration::from_millis(60)).await; + + let expired = mgr.expire_idle(); + assert_eq!(expired.len(), 2); + assert_eq!(mgr.len(), 0); + } + + #[tokio::test] + async fn test_activity_prevents_expiry() { + let mut mgr = UdpSessionManager::new(Duration::from_millis(100)); + let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + mgr.insert(key, 1); + + // Touch session at 50ms (before 100ms timeout) + tokio::time::sleep(Duration::from_millis(50)).await; + mgr.get_mut(&key); // refreshes last_activity + + // At 80ms from last touch, should still be alive + tokio::time::sleep(Duration::from_millis(80)).await; + assert!(mgr.expire_idle().is_empty()); + assert_eq!(mgr.len(), 1); + + // Wait for full timeout from last activity + tokio::time::sleep(Duration::from_millis(30)).await; + let expired = mgr.expire_idle(); + assert_eq!(expired.len(), 1); + } + + #[test] + fn test_multiple_sessions_same_client_different_ports() { + let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); + let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; + let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 }; + mgr.insert(key1, 1); + mgr.insert(key2, 2); + + assert_eq!(mgr.len(), 2); + assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1); + assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2); + } +} diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index dab2811..4354117 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -19,6 +19,12 @@ pub const FRAME_PONG: u8 = 0x08; // Edge -> Hub: heartbeat response pub const FRAME_WINDOW_UPDATE: u8 = 0x09; // Edge -> Hub: per-stream flow control pub const FRAME_WINDOW_UPDATE_BACK: u8 = 0x0A; // Hub -> Edge: per-stream flow control +// UDP tunnel frame types +pub const FRAME_UDP_OPEN: u8 = 0x0B; // Edge -> Hub: open UDP session (payload: PROXY v2 header) +pub const FRAME_UDP_DATA: u8 = 0x0C; // Edge -> Hub: UDP datagram +pub const FRAME_UDP_DATA_BACK: u8 = 0x0D; // Hub -> Edge: UDP datagram +pub const FRAME_UDP_CLOSE: u8 = 0x0E; // Either direction: close UDP session + // Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes pub const FRAME_HEADER_SIZE: usize = 9; @@ -107,6 +113,76 @@ pub fn build_proxy_v1_header( ) } +/// PROXY protocol v2 signature (12 bytes). +pub const PROXY_V2_SIGNATURE: [u8; 12] = [ + 0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, +]; + +/// Transport protocol for PROXY v2 header. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProxyV2Transport { + /// TCP (STREAM) — byte 13 low nibble = 0x1 + Tcp, + /// UDP (DGRAM) — byte 13 low nibble = 0x2 + Udp, +} + +/// Build a PROXY protocol v2 binary header for IPv4. +/// +/// Returns a 28-byte header: +/// - 12B signature +/// - 1B version (0x2) + command (0x1 = PROXY) +/// - 1B address family (0x1 = AF_INET) + transport (0x1 = TCP, 0x2 = UDP) +/// - 2B address block length (0x000C = 12) +/// - 4B source IPv4 address +/// - 4B destination IPv4 address +/// - 2B source port +/// - 2B destination port +pub fn build_proxy_v2_header( + src_ip: &std::net::Ipv4Addr, + dst_ip: &std::net::Ipv4Addr, + src_port: u16, + dst_port: u16, + transport: ProxyV2Transport, +) -> Bytes { + let mut buf = BytesMut::with_capacity(28); + // Signature (12 bytes) + buf.put_slice(&PROXY_V2_SIGNATURE); + // Version 2 + PROXY command + buf.put_u8(0x21); + // AF_INET (0x1) + transport + let transport_nibble = match transport { + ProxyV2Transport::Tcp => 0x1, + ProxyV2Transport::Udp => 0x2, + }; + buf.put_u8(0x10 | transport_nibble); + // Address block length: 12 bytes for IPv4 + buf.put_u16(12); + // Source address (4 bytes, network byte order) + buf.put_slice(&src_ip.octets()); + // Destination address (4 bytes, network byte order) + buf.put_slice(&dst_ip.octets()); + // Source port (2 bytes, network byte order) + buf.put_u16(src_port); + // Destination port (2 bytes, network byte order) + buf.put_u16(dst_port); + buf.freeze() +} + +/// Build a PROXY protocol v2 binary header from string IP addresses. +/// Falls back to 0.0.0.0 if parsing fails. +pub fn build_proxy_v2_header_from_str( + src_ip: &str, + dst_ip: &str, + src_port: u16, + dst_port: u16, + transport: ProxyV2Transport, +) -> Bytes { + let src: std::net::Ipv4Addr = src_ip.parse().unwrap_or(std::net::Ipv4Addr::UNSPECIFIED); + let dst: std::net::Ipv4Addr = dst_ip.parse().unwrap_or(std::net::Ipv4Addr::UNSPECIFIED); + build_proxy_v2_header(&src, &dst, src_port, dst_port, transport) +} + /// Stateful async frame reader that yields `Frame` values from an `AsyncRead`. pub struct FrameReader { reader: R, @@ -604,6 +680,62 @@ mod tests { assert_eq!(header, "PROXY TCP4 1.2.3.4 5.6.7.8 12345 443\r\n"); } + #[test] + fn test_proxy_v2_header_tcp4() { + let src = "198.51.100.10".parse().unwrap(); + let dst = "203.0.113.25".parse().unwrap(); + let header = build_proxy_v2_header(&src, &dst, 54321, 8443, ProxyV2Transport::Tcp); + assert_eq!(header.len(), 28); + // Signature + assert_eq!(&header[0..12], &PROXY_V2_SIGNATURE); + // Version 2 + PROXY command + assert_eq!(header[12], 0x21); + // AF_INET + STREAM (TCP) + assert_eq!(header[13], 0x11); + // Address length = 12 + assert_eq!(u16::from_be_bytes([header[14], header[15]]), 12); + // Source IP: 198.51.100.10 + assert_eq!(&header[16..20], &[198, 51, 100, 10]); + // Dest IP: 203.0.113.25 + assert_eq!(&header[20..24], &[203, 0, 113, 25]); + // Source port: 54321 + assert_eq!(u16::from_be_bytes([header[24], header[25]]), 54321); + // Dest port: 8443 + assert_eq!(u16::from_be_bytes([header[26], header[27]]), 8443); + } + + #[test] + fn test_proxy_v2_header_udp4() { + let src = "10.0.0.1".parse().unwrap(); + let dst = "10.0.0.2".parse().unwrap(); + let header = build_proxy_v2_header(&src, &dst, 12345, 53, ProxyV2Transport::Udp); + assert_eq!(header.len(), 28); + assert_eq!(header[12], 0x21); // v2, PROXY + assert_eq!(header[13], 0x12); // AF_INET + DGRAM (UDP) + assert_eq!(&header[16..20], &[10, 0, 0, 1]); // src + assert_eq!(&header[20..24], &[10, 0, 0, 2]); // dst + assert_eq!(u16::from_be_bytes([header[24], header[25]]), 12345); + assert_eq!(u16::from_be_bytes([header[26], header[27]]), 53); + } + + #[test] + fn test_proxy_v2_header_from_str() { + let header = build_proxy_v2_header_from_str("1.2.3.4", "5.6.7.8", 1000, 443, ProxyV2Transport::Tcp); + assert_eq!(header.len(), 28); + assert_eq!(&header[16..20], &[1, 2, 3, 4]); + assert_eq!(&header[20..24], &[5, 6, 7, 8]); + } + + #[test] + fn test_proxy_v2_header_from_str_invalid_ip() { + let header = build_proxy_v2_header_from_str("not-an-ip", "also-not", 1000, 443, ProxyV2Transport::Udp); + assert_eq!(header.len(), 28); + // Falls back to 0.0.0.0 + assert_eq!(&header[16..20], &[0, 0, 0, 0]); + assert_eq!(&header[20..24], &[0, 0, 0, 0]); + assert_eq!(header[13], 0x12); // UDP + } + #[tokio::test] async fn test_frame_reader() { let frame1 = encode_frame(1, FRAME_OPEN, b"PROXY TCP4 1.2.3.4 5.6.7.8 1234 443\r\n"); diff --git a/test/test.udp.node.ts b/test/test.udp.node.ts new file mode 100644 index 0000000..b85c56c --- /dev/null +++ b/test/test.udp.node.ts @@ -0,0 +1,193 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as dgram from 'dgram'; +import * as net from 'net'; +import * as crypto from 'crypto'; +import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async function findFreePorts(count: number): Promise { + const servers: net.Server[] = []; + const ports: number[] = []; + for (let i = 0; i < count; i++) { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + ports.push((server.address() as net.AddressInfo).port); + servers.push(server); + } + await Promise.all(servers.map((s) => new Promise((resolve) => s.close(() => resolve())))); + return ports; +} + +/** + * Start a UDP echo server that: + * 1. Receives the first datagram (PROXY v2 header — 28 bytes) and discards it + * 2. Echoes all subsequent datagrams back to the sender + */ +function startUdpEchoServer(port: number, host: string): Promise { + return new Promise((resolve, reject) => { + const server = dgram.createSocket('udp4'); + let proxyHeaderReceived = false; + + server.on('message', (msg, rinfo) => { + if (!proxyHeaderReceived) { + // First datagram is the PROXY v2 header (28 bytes for IPv4) + // In the current implementation, the hub connects directly via UDP + // so the first real datagram is the actual data (no PROXY header yet) + // For now, just echo everything back + proxyHeaderReceived = true; + } + // Echo back + server.send(msg, rinfo.port, rinfo.address); + }); + + server.on('error', reject); + server.bind(port, host, () => resolve(server)); + }); +} + +/** + * Send a UDP datagram through the tunnel and wait for the echo response. + */ +function udpSendAndReceive( + port: number, + data: Buffer, + timeoutMs = 10000, +): Promise { + return new Promise((resolve, reject) => { + const client = dgram.createSocket('udp4'); + let settled = false; + + const timer = setTimeout(() => { + if (!settled) { + settled = true; + client.close(); + reject(new Error(`UDP timeout after ${timeoutMs}ms`)); + } + }, timeoutMs); + + client.on('message', (msg) => { + if (!settled) { + settled = true; + clearTimeout(timer); + client.close(); + resolve(msg); + } + }); + + client.on('error', (err) => { + if (!settled) { + settled = true; + clearTimeout(timer); + client.close(); + reject(err); + } + }); + + client.send(data, port, '127.0.0.1'); + }); +} + +// --------------------------------------------------------------------------- +// Test state +// --------------------------------------------------------------------------- + +let hub: RemoteIngressHub; +let edge: RemoteIngressEdge; +let echoServer: dgram.Socket; +let hubPort: number; +let edgeUdpPort: number; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => { + [hubPort, edgeUdpPort] = await findFreePorts(2); + + // Start UDP echo server on upstream (127.0.0.2) + echoServer = await startUdpEchoServer(edgeUdpPort, '127.0.0.2'); + + hub = new RemoteIngressHub(); + edge = new RemoteIngressEdge(); + + await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2' }); + await hub.updateAllowedEdges([ + { id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [edgeUdpPort] }, + ]); + + const connectedPromise = new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000); + edge.once('tunnelConnected', () => { + clearTimeout(timeout); + resolve(); + }); + }); + + await edge.start({ + hubHost: '127.0.0.1', + hubPort, + edgeId: 'test-edge', + secret: 'test-secret', + bindAddress: '127.0.0.1', + }); + + await connectedPromise; + // Wait for UDP listener to bind + await new Promise((resolve) => setTimeout(resolve, 500)); + + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('UDP: single datagram echo — 64 bytes', async () => { + const data = crypto.randomBytes(64); + const received = await udpSendAndReceive(edgeUdpPort, data, 5000); + expect(received.length).toEqual(64); + expect(Buffer.compare(received, data)).toEqual(0); +}); + +tap.test('UDP: single datagram echo — 1KB', async () => { + const data = crypto.randomBytes(1024); + const received = await udpSendAndReceive(edgeUdpPort, data, 5000); + expect(received.length).toEqual(1024); + expect(Buffer.compare(received, data)).toEqual(0); +}); + +tap.test('UDP: 10 sequential datagrams', async () => { + for (let i = 0; i < 10; i++) { + const data = crypto.randomBytes(128); + const received = await udpSendAndReceive(edgeUdpPort, data, 5000); + expect(received.length).toEqual(128); + expect(Buffer.compare(received, data)).toEqual(0); + } +}); + +tap.test('UDP: 10 concurrent datagrams from different source ports', async () => { + const promises = Array.from({ length: 10 }, () => { + const data = crypto.randomBytes(256); + return udpSendAndReceive(edgeUdpPort, data, 5000).then((received) => ({ + sizeOk: received.length === 256, + dataOk: Buffer.compare(received, data) === 0, + })); + }); + + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || !r.dataOk); + expect(failures.length).toEqual(0); +}); + +tap.test('UDP: tunnel still connected after tests', async () => { + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('UDP teardown: stop tunnel and echo server', async () => { + await edge.stop(); + await hub.stop(); + await new Promise((resolve) => echoServer.close(() => resolve())); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 0ed6bc5..84b3086 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.10.0', + version: '4.11.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' } diff --git a/ts/classes.remoteingresshub.ts b/ts/classes.remoteingresshub.ts index 1ee9378..f6d06d5 100644 --- a/ts/classes.remoteingresshub.ts +++ b/ts/classes.remoteingresshub.ts @@ -22,7 +22,7 @@ type THubCommands = { }; updateAllowedEdges: { params: { - edges: Array<{ id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }>; + edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }>; }; result: { updated: boolean }; }; @@ -50,7 +50,7 @@ export interface IHubConfig { }; } -type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }; +type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }; const MAX_RESTART_ATTEMPTS = 10; const MAX_RESTART_BACKOFF_MS = 30_000;