From a40094537146f117ed988d2bbd5a566e78d4db2e Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 19 Mar 2026 14:09:32 +0000 Subject: [PATCH] fix(remoteingress-core): send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions --- changelog.md | 7 +++++ rust/crates/remoteingress-core/src/edge.rs | 36 ++++++++++++++++++++++ rust/crates/remoteingress-core/src/hub.rs | 14 +++++++++ test/test.udp.node.ts | 15 ++++----- ts/00_commitinfo_data.ts | 2 +- 5 files changed, 66 insertions(+), 8 deletions(-) diff --git a/changelog.md b/changelog.md index fcaf515..cdb3f4f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-19 - 4.12.1 - fix(remoteingress-core) +send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions + +- Adds periodic idle UDP session expiry in edge tunnel and QUIC loops, including UDP close signaling for expired tunnel sessions. +- Sends the PROXY v2 header as the first datagram for UDP upstream connections in both standard and QUIC hub paths. +- Updates the UDP node test server to ignore the initial PROXY v2 datagram per source before echoing payload traffic. + ## 2026-03-19 - 4.12.0 - feat(remoteingress-core) add UDP tunneling over QUIC datagrams and expand transport-specific test coverage diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index cb9eccf..59b0fad 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -642,8 +642,23 @@ async fn connect_to_hub_and_run( let liveness_timeout_dur = Duration::from_secs(45); let mut last_activity = Instant::now(); let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur)); + let mut next_udp_expiry = Instant::now() + Duration::from_secs(30); let result = 'io_loop: loop { + // Expire idle UDP sessions periodically + if Instant::now() >= next_udp_expiry { + let mut sessions = udp_sessions.lock().await; + let expired = sessions.expire_idle(); + for sid in &expired { + let close_frame = encode_frame(*sid, FRAME_UDP_CLOSE, &[]); + let _ = tunnel_data_tx.try_send(close_frame); + } + if !expired.is_empty() { + log::debug!("Expired {} idle UDP sessions", expired.len()); + } + next_udp_expiry = Instant::now() + Duration::from_secs(30); + } + // Drain any buffered frames loop { let frame = match tunnel_io.try_parse_frame() { @@ -1346,7 +1361,18 @@ async fn connect_to_hub_and_run_quic_with_connection( ); // Monitor control stream for config updates, connection health, and QUIC datagrams. + let mut next_udp_expiry_quic = Instant::now() + Duration::from_secs(30); let result = 'quic_loop: loop { + // Expire idle UDP sessions periodically + if Instant::now() >= next_udp_expiry_quic { + let mut sessions = udp_sessions_quic.lock().await; + let expired = sessions.expire_idle(); + if !expired.is_empty() { + log::debug!("Expired {} idle QUIC UDP sessions", expired.len()); + } + next_udp_expiry_quic = Instant::now() + Duration::from_secs(30); + } + tokio::select! { // Read control messages from hub ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => { @@ -1370,6 +1396,16 @@ async fn connect_to_hub_and_run_quic_with_connection( connection_token, bind_address, ); + apply_udp_port_config_quic( + &update.listen_ports_udp, + &mut udp_listeners_quic, + &quic_conn, + &udp_sessions_quic, + &udp_sockets_quic, + next_stream_id, + connection_token, + bind_address, + ); } } quic_transport::CTRL_PING => { diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 2d3f0b1..d2b5afa 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -706,6 +706,7 @@ async fn handle_hub_frame( let data_writer_tx = data_tx.clone(); let session_token = edge_token.child_token(); let edge_id_str = edge_id.to_string(); + let proxy_v2_header = frame.payload.clone(); // Channel for forwarding datagrams from edge to upstream let (udp_tx, mut udp_rx) = mpsc::channel::(256); @@ -728,6 +729,12 @@ async fn handle_hub_frame( return; } + // Send PROXY v2 header as first datagram so SmartProxy knows the original client + if let Err(e) = upstream.send(&proxy_v2_header).await { + log::error!("UDP session {} failed to send PROXY v2 header: {}", stream_id, e); + return; + } + // Task: upstream -> edge (return datagrams) let upstream_recv = Arc::new(upstream); let upstream_send = upstream_recv.clone(); @@ -1367,6 +1374,7 @@ async fn handle_edge_connection_quic( let sessions = dgram_sessions.clone(); let session_token = dgram_token.child_token(); let (tx, mut rx) = mpsc::channel::(256); + let proxy_v2_data: Vec = proxy_data.to_vec(); { let mut s = sessions.lock().await; @@ -1386,6 +1394,12 @@ async fn handle_edge_connection_quic( return; } + // Send PROXY v2 header as first datagram so SmartProxy knows the original client + if let Err(e) = upstream.send(&proxy_v2_data).await { + log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e); + return; + } + // Upstream recv → QUIC datagram back to edge let upstream_recv = upstream.clone(); let recv_conn = conn.clone(); diff --git a/test/test.udp.node.ts b/test/test.udp.node.ts index 152d827..713a70b 100644 --- a/test/test.udp.node.ts +++ b/test/test.udp.node.ts @@ -29,15 +29,16 @@ async function findFreePorts(count: number): Promise { function startUdpEchoServer(port: number, host: string): Promise { return new Promise((resolve, reject) => { const server = dgram.createSocket('udp4'); - let proxyHeaderReceived = false; + // Track which source endpoints have sent their PROXY v2 header. + // The hub sends a 28-byte PROXY v2 header as the first datagram per session. + const seenSources = new Set(); server.on('message', (msg, rinfo) => { - if (!proxyHeaderReceived) { - // First datagram is the PROXY v2 header (28 bytes for IPv4) - // In the current implementation, the hub connects directly via UDP - // so the first real datagram is the actual data (no PROXY header yet) - // For now, just echo everything back - proxyHeaderReceived = true; + const sourceKey = `${rinfo.address}:${rinfo.port}`; + if (!seenSources.has(sourceKey)) { + seenSources.add(sourceKey); + // First datagram from this source is the PROXY v2 header — skip it + return; } // Echo back server.send(msg, rinfo.port, rinfo.address); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 8a13e3c..20c7be5 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.12.0', + version: '4.12.1', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }