From 2087567f15350acb0ff33db8702d44b9e12399b9 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 19 Mar 2026 12:19:58 +0000 Subject: [PATCH] feat(remoteingress-core): add UDP tunneling over QUIC datagrams and expand transport-specific test coverage --- changelog.md | 7 + rust/crates/remoteingress-core/src/edge.rs | 145 +++++++++++++++++- rust/crates/remoteingress-core/src/hub.rs | 117 ++++++++++++++ .../remoteingress-core/src/transport/quic.rs | 3 + test/test.flowcontrol.node.ts | 18 +-- test/test.loadtest.node.ts | 14 +- test/test.quic.node.ts | 18 +-- test/test.udp.node.ts | 104 ++++++++++++- ts/00_commitinfo_data.ts | 2 +- 9 files changed, 393 insertions(+), 35 deletions(-) diff --git a/changelog.md b/changelog.md index 5afb6aa..fcaf515 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-19 - 4.12.0 - feat(remoteingress-core) +add UDP tunneling over QUIC datagrams and expand transport-specific test coverage + +- Implement QUIC datagram-based UDP forwarding on both edge and hub, including session setup, payload routing, and listener cleanup +- Enable QUIC datagram receive buffers in client and server transport configuration +- Add UDP-over-QUIC tests and clarify existing test names to distinguish TCP/TLS, UDP/TLS, and QUIC scenarios + ## 2026-03-19 - 4.11.0 - feat(remoteingress-core) add UDP tunneling support between edge and hub diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 344ebbe..cb9eccf 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -1328,8 +1328,24 @@ async fn connect_to_hub_and_run_quic_with_connection( bind_address, ); - // Monitor control stream for config updates, and connection health. - // Also handle shutdown signals. + // UDP listeners for QUIC transport — uses QUIC datagrams for low-latency forwarding. + let udp_sessions_quic: Arc> = + Arc::new(Mutex::new(UdpSessionManager::new(Duration::from_secs(60)))); + let udp_sockets_quic: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + let mut udp_listeners_quic: HashMap> = HashMap::new(); + apply_udp_port_config_quic( + &handshake.listen_ports_udp, + &mut udp_listeners_quic, + &quic_conn, + &udp_sessions_quic, + &udp_sockets_quic, + next_stream_id, + connection_token, + bind_address, + ); + + // Monitor control stream for config updates, connection health, and QUIC datagrams. let result = 'quic_loop: loop { tokio::select! { // Read control messages from hub @@ -1384,6 +1400,30 @@ async fn connect_to_hub_and_run_quic_with_connection( } } } + // Receive QUIC datagrams (UDP return traffic from hub) + datagram = quic_conn.read_datagram() => { + match datagram { + Ok(data) => { + // Format: [session_id:4][payload:N] + if data.len() >= 4 { + let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]); + let payload = &data[4..]; + let mut sessions = udp_sessions_quic.lock().await; + if let Some(session) = sessions.get_by_stream_id(session_id) { + let client_addr = session.client_addr; + let dest_port = session.dest_port; + let sockets = udp_sockets_quic.lock().await; + if let Some(socket) = sockets.get(&dest_port) { + let _ = socket.send_to(payload, client_addr).await; + } + } + } + } + Err(e) => { + log::debug!("QUIC datagram recv error: {}", e); + } + } + } // QUIC connection closed reason = quic_conn.closed() => { log::info!("QUIC connection closed: {}", reason); @@ -1405,6 +1445,9 @@ async fn connect_to_hub_and_run_quic_with_connection( for (_, h) in port_listeners.drain() { h.abort(); } + for (_, h) in udp_listeners_quic.drain() { + h.abort(); + } // Graceful QUIC close quic_conn.close(quinn::VarInt::from_u32(0), b"shutdown"); @@ -1513,6 +1556,104 @@ fn apply_port_config_quic( /// Handle a single client connection via QUIC transport. /// Opens a new QUIC bidirectional stream, sends the PROXY header, /// then bidirectionally copies data between the client TCP socket and the QUIC stream. +/// Apply UDP port config for QUIC transport: bind UdpSockets that send via QUIC datagrams. +fn apply_udp_port_config_quic( + new_ports: &[u16], + udp_listeners: &mut HashMap>, + quic_conn: &quinn::Connection, + udp_sessions: &Arc>, + udp_sockets: &Arc>>>, + next_stream_id: &Arc, + connection_token: &CancellationToken, + bind_address: &str, +) { + let new_set: std::collections::HashSet = new_ports.iter().copied().collect(); + let old_set: std::collections::HashSet = udp_listeners.keys().copied().collect(); + + for &port in old_set.difference(&new_set) { + if let Some(handle) = udp_listeners.remove(&port) { + log::info!("Stopping QUIC UDP listener on port {}", port); + handle.abort(); + } + let sockets = udp_sockets.clone(); + tokio::spawn(async move { sockets.lock().await.remove(&port); }); + } + + for &port in new_set.difference(&old_set) { + let quic_conn = quic_conn.clone(); + let udp_sessions = udp_sessions.clone(); + let udp_sockets = udp_sockets.clone(); + let next_stream_id = next_stream_id.clone(); + let port_token = connection_token.child_token(); + let bind_addr = bind_address.to_string(); + + let handle = tokio::spawn(async move { + let socket = match UdpSocket::bind((bind_addr.as_str(), port)).await { + Ok(s) => Arc::new(s), + Err(e) => { + log::error!("Failed to bind QUIC UDP port {}: {}", port, e); + return; + } + }; + log::info!("Listening on UDP port {} (QUIC datagram transport)", port); + udp_sockets.lock().await.insert(port, socket.clone()); + + let mut buf = vec![0u8; 65536]; + loop { + tokio::select! { + recv_result = socket.recv_from(&mut buf) => { + match recv_result { + Ok((len, client_addr)) => { + let key = UdpSessionKey { client_addr, dest_port: port }; + let mut sessions = udp_sessions.lock().await; + + let stream_id = if let Some(session) = sessions.get_mut(&key) { + session.stream_id + } else { + // New session — send PROXY v2 header via control-style datagram + let sid = next_stream_id.fetch_add(1, Ordering::Relaxed); + sessions.insert(key, sid); + + let client_ip = client_addr.ip().to_string(); + let client_port = client_addr.port(); + let proxy_header = build_proxy_v2_header_from_str( + &client_ip, "0.0.0.0", client_port, port, + ProxyV2Transport::Udp, + ); + // Send OPEN as a QUIC datagram: [session_id:4][0xFF magic:1][proxy_header:28] + let mut open_buf = Vec::with_capacity(4 + 1 + proxy_header.len()); + open_buf.extend_from_slice(&sid.to_be_bytes()); + open_buf.push(0xFF); // magic byte to distinguish OPEN from DATA + open_buf.extend_from_slice(&proxy_header); + let _ = quic_conn.send_datagram(open_buf.into()); + + log::debug!("New QUIC UDP session {} from {} -> port {}", sid, client_addr, port); + sid + }; + drop(sessions); + + // Send datagram: [session_id:4][payload:N] + let mut dgram = Vec::with_capacity(4 + len); + dgram.extend_from_slice(&stream_id.to_be_bytes()); + dgram.extend_from_slice(&buf[..len]); + let _ = quic_conn.send_datagram(dgram.into()); + } + Err(e) => { + log::error!("QUIC UDP recv error on port {}: {}", port, e); + } + } + } + _ = port_token.cancelled() => { + log::info!("QUIC UDP port {} listener cancelled", port); + break; + } + } + } + }); + udp_listeners.insert(port, handle); + } +} + async fn handle_client_connection_quic( client_stream: TcpStream, client_addr: std::net::SocketAddr, diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 664063d..2d3f0b1 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -1331,6 +1331,122 @@ async fn handle_edge_connection_quic( } }); + // UDP sessions for QUIC datagram transport + let quic_udp_sessions: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + // Spawn QUIC datagram receiver task + let dgram_conn = quic_conn.clone(); + let dgram_sessions = quic_udp_sessions.clone(); + let dgram_target = target_host.clone(); + let dgram_edge_id = edge_id.clone(); + let dgram_token = edge_token.clone(); + let dgram_handle = tokio::spawn(async move { + loop { + tokio::select! { + datagram = dgram_conn.read_datagram() => { + match datagram { + Ok(data) => { + if data.len() < 4 { continue; } + let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]); + let payload = &data[4..]; + + // Check for OPEN magic byte (0xFF) + if !payload.is_empty() && payload[0] == 0xFF { + // This is a session OPEN: [0xFF][proxy_v2_header:28] + let proxy_data = &payload[1..]; + let dest_port = if proxy_data.len() >= 28 { + u16::from_be_bytes([proxy_data[26], proxy_data[27]]) + } else { + 53 // fallback + }; + + // Create upstream UDP socket + let target = dgram_target.clone(); + let conn = dgram_conn.clone(); + let sessions = dgram_sessions.clone(); + let session_token = dgram_token.child_token(); + let (tx, mut rx) = mpsc::channel::(256); + + { + let mut s = sessions.lock().await; + s.insert(session_id, tx); + } + + tokio::spawn(async move { + let upstream = match UdpSocket::bind("0.0.0.0:0").await { + Ok(s) => Arc::new(s), + Err(e) => { + log::error!("QUIC UDP session {} bind failed: {}", session_id, e); + return; + } + }; + if let Err(e) = upstream.connect((target.as_str(), dest_port)).await { + log::error!("QUIC UDP session {} connect failed: {}", session_id, e); + return; + } + + // Upstream recv → QUIC datagram back to edge + let upstream_recv = upstream.clone(); + let recv_conn = conn.clone(); + let recv_token = session_token.clone(); + let recv_handle = tokio::spawn(async move { + let mut buf = vec![0u8; 65536]; + loop { + tokio::select! { + result = upstream_recv.recv(&mut buf) => { + match result { + Ok(len) => { + let mut dgram = Vec::with_capacity(4 + len); + dgram.extend_from_slice(&session_id.to_be_bytes()); + dgram.extend_from_slice(&buf[..len]); + let _ = recv_conn.send_datagram(dgram.into()); + } + Err(_) => break, + } + } + _ = recv_token.cancelled() => break, + } + } + }); + + // Edge datagrams → upstream + loop { + tokio::select! { + data = rx.recv() => { + match data { + Some(datagram) => { + let _ = upstream.send(&datagram).await; + } + None => break, + } + } + _ = session_token.cancelled() => break, + } + } + recv_handle.abort(); + }); + + continue; + } + + // Regular data datagram — forward to upstream + let sessions = dgram_sessions.lock().await; + if let Some(tx) = sessions.get(&session_id) { + let _ = tx.try_send(Bytes::copy_from_slice(payload)); + } + } + Err(e) => { + log::debug!("QUIC datagram recv error from edge {}: {}", dgram_edge_id, e); + break; + } + } + } + _ = dgram_token.cancelled() => break, + } + } + }); + // Control stream loop: forward config updates and handle PONG let disconnect_reason; loop { @@ -1399,6 +1515,7 @@ async fn handle_edge_connection_quic( // Cleanup edge_token.cancel(); data_handle.abort(); + dgram_handle.abort(); quic_conn.close(quinn::VarInt::from_u32(0), b"hub_shutdown"); { diff --git a/rust/crates/remoteingress-core/src/transport/quic.rs b/rust/crates/remoteingress-core/src/transport/quic.rs index 0cf46ac..902ff7c 100644 --- a/rust/crates/remoteingress-core/src/transport/quic.rs +++ b/rust/crates/remoteingress-core/src/transport/quic.rs @@ -31,6 +31,8 @@ pub fn build_quic_client_config() -> quinn::ClientConfig { // Match MAX_STREAMS_PER_EDGE (1024) from hub.rs. // Default is 100 which is too low for high-concurrency tunneling. transport.max_concurrent_bidi_streams(1024u32.into()); + // Enable QUIC datagrams (RFC 9221) for low-latency UDP tunneling. + transport.datagram_receive_buffer_size(Some(65536)); let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config)); client_config.transport_config(Arc::new(transport)); @@ -49,6 +51,7 @@ pub fn build_quic_server_config( quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(), )); transport.max_concurrent_bidi_streams(1024u32.into()); + transport.datagram_receive_buffer_size(Some(65536)); let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config)); server_config.transport_config(Arc::new(transport)); diff --git a/test/test.flowcontrol.node.ts b/test/test.flowcontrol.node.ts index f33b1b2..efbd4f9 100644 --- a/test/test.flowcontrol.node.ts +++ b/test/test.flowcontrol.node.ts @@ -315,7 +315,7 @@ let echoServer: TrackingServer; let hubPort: number; let edgePort: number; -tap.test('setup: start echo server and tunnel', async () => { +tap.test('TCP/TLS setup: start TCP echo server and TCP+TLS tunnel', async () => { [hubPort, edgePort] = await findFreePorts(2); echoServer = await startEchoServer(edgePort, '127.0.0.2'); @@ -324,7 +324,7 @@ tap.test('setup: start echo server and tunnel', async () => { expect(tunnel.hub.running).toBeTrue(); }); -tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple refills)', async () => { +tap.test('TCP/TLS: single TCP stream — 32MB transfer exceeding initial 4MB window', async () => { const size = 32 * 1024 * 1024; const data = crypto.randomBytes(size); const expectedHash = sha256(data); @@ -335,7 +335,7 @@ tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple re expect(sha256(received)).toEqual(expectedHash); }); -tap.test('200 concurrent streams with 64KB each', async () => { +tap.test('TCP/TLS: 200 concurrent TCP streams x 64KB each', async () => { const streamCount = 200; const payloadSize = 64 * 1024; @@ -355,7 +355,7 @@ tap.test('200 concurrent streams with 64KB each', async () => { expect(failures.length).toEqual(0); }); -tap.test('512 concurrent streams at minimum window boundary (16KB each)', async () => { +tap.test('TCP/TLS: 512 concurrent TCP streams at minimum window boundary (16KB each)', async () => { const streamCount = 512; const payloadSize = 16 * 1024; @@ -375,7 +375,7 @@ tap.test('512 concurrent streams at minimum window boundary (16KB each)', async expect(failures.length).toEqual(0); }); -tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => { +tap.test('TCP/TLS: asymmetric TCP transfer — 4KB request -> 4MB response', async () => { // Swap to large-response server await forceCloseServer(echoServer); const responseSize = 4 * 1024 * 1024; // 4 MB @@ -392,7 +392,7 @@ tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => { } }); -tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async () => { +tap.test('TCP/TLS: 100 TCP streams x 1MB each (100MB total exceeding 200MB budget)', async () => { const streamCount = 100; const payloadSize = 1 * 1024 * 1024; @@ -412,7 +412,7 @@ tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async () expect(failures.length).toEqual(0); }); -tap.test('active stream counter tracks concurrent connections', async () => { +tap.test('TCP/TLS: active TCP stream counter tracks concurrent connections', async () => { const N = 50; // Open N connections and keep them alive (send data but don't close) @@ -445,7 +445,7 @@ tap.test('active stream counter tracks concurrent connections', async () => { } }); -tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => { +tap.test('TCP/TLS: 50 TCP streams x 2MB each (forces multiple window refills)', async () => { // At 50 concurrent streams: adaptive window = 200MB/50 = 4MB per stream // Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream const streamCount = 50; @@ -467,7 +467,7 @@ tap.test('50 streams x 2MB each (forces multiple window refills per stream)', as expect(failures.length).toEqual(0); }); -tap.test('teardown: stop tunnel and echo server', async () => { +tap.test('TCP/TLS teardown: stop tunnel and TCP echo server', async () => { await tunnel.cleanup(); await forceCloseServer(echoServer); }); diff --git a/test/test.loadtest.node.ts b/test/test.loadtest.node.ts index e10d3cd..fe49da0 100644 --- a/test/test.loadtest.node.ts +++ b/test/test.loadtest.node.ts @@ -231,7 +231,7 @@ let edgePort: number; // Tests // --------------------------------------------------------------------------- -tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => { +tap.test('TCP/TLS setup: start throttled TCP+TLS tunnel (100 Mbit/s)', async () => { [hubPort, proxyPort, edgePort] = await findFreePorts(3); echoServer = await startEchoServer(edgePort, '127.0.0.2'); @@ -271,7 +271,7 @@ tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => { expect(status.connected).toBeTrue(); }); -tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () => { +tap.test('TCP/TLS throttled: 5 TCP streams x 20MB each through 100Mbit tunnel', async () => { const streamCount = 5; const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB total round-trip @@ -293,7 +293,7 @@ tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () => expect(status.connected).toBeTrue(); }); -tap.test('throttled: slow consumer with 20MB does not kill other streams', async () => { +tap.test('TCP/TLS throttled: slow TCP consumer with 20MB does not kill other streams', async () => { // Open a connection that creates download-direction backpressure: // send 20MB but DON'T read the response — client TCP receive buffer fills const slowSock = net.createConnection({ host: '127.0.0.1', port: edgePort }); @@ -326,7 +326,7 @@ tap.test('throttled: slow consumer with 20MB does not kill other streams', async slowSock.destroy(); }); -tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', async () => { +tap.test('TCP/TLS throttled: rapid churn — 3 x 20MB long + 50 x 1MB short TCP streams', async () => { // 3 long streams (20MB each) running alongside 50 short streams (1MB each) const longPayload = crypto.randomBytes(20 * 1024 * 1024); const longHash = sha256(longPayload); @@ -360,7 +360,7 @@ tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', as expect(status.connected).toBeTrue(); }); -tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => { +tap.test('TCP/TLS throttled: 3 burst waves of 5 TCP streams x 20MB each', async () => { for (let wave = 0; wave < 3; wave++) { const streamCount = 5; const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB per wave @@ -382,7 +382,7 @@ tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => { } }); -tap.test('throttled: tunnel still works after all load tests', async () => { +tap.test('TCP/TLS throttled: TCP tunnel still works after all load tests', async () => { const data = crypto.randomBytes(1024); const hash = sha256(data); const received = await sendAndReceive(edgePort, data, 30000); @@ -392,7 +392,7 @@ tap.test('throttled: tunnel still works after all load tests', async () => { expect(status.connected).toBeTrue(); }); -tap.test('teardown: stop tunnel', async () => { +tap.test('TCP/TLS teardown: stop throttled tunnel', async () => { await edge.stop(); await hub.stop(); if (throttle) await throttle.close(); diff --git a/test/test.quic.node.ts b/test/test.quic.node.ts index 260f1b0..16f553b 100644 --- a/test/test.quic.node.ts +++ b/test/test.quic.node.ts @@ -176,7 +176,7 @@ let echoServer: TrackingServer; let hubPort: number; let edgePort: number; -tap.test('QUIC setup: start echo server and QUIC tunnel', async () => { +tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => { [hubPort, edgePort] = await findFreePorts(2); echoServer = await startEchoServer(edgePort, '127.0.0.2'); @@ -187,7 +187,7 @@ tap.test('QUIC setup: start echo server and QUIC tunnel', async () => { expect(status.connected).toBeTrue(); }); -tap.test('QUIC: single stream echo — 1KB', async () => { +tap.test('QUIC: single TCP stream echo — 1KB', async () => { const data = crypto.randomBytes(1024); const hash = sha256(data); const received = await sendAndReceive(edgePort, data, 10000); @@ -195,7 +195,7 @@ tap.test('QUIC: single stream echo — 1KB', async () => { expect(sha256(received)).toEqual(hash); }); -tap.test('QUIC: single stream echo — 1MB', async () => { +tap.test('QUIC: single TCP stream echo — 1MB', async () => { const size = 1024 * 1024; const data = crypto.randomBytes(size); const hash = sha256(data); @@ -204,7 +204,7 @@ tap.test('QUIC: single stream echo — 1MB', async () => { expect(sha256(received)).toEqual(hash); }); -tap.test('QUIC: single stream echo — 16MB', async () => { +tap.test('QUIC: single TCP stream echo — 16MB', async () => { const size = 16 * 1024 * 1024; const data = crypto.randomBytes(size); const hash = sha256(data); @@ -213,7 +213,7 @@ tap.test('QUIC: single stream echo — 16MB', async () => { expect(sha256(received)).toEqual(hash); }); -tap.test('QUIC: 10 concurrent streams x 1MB each', async () => { +tap.test('QUIC: 10 concurrent TCP streams x 1MB each', async () => { const streamCount = 10; const payloadSize = 1024 * 1024; @@ -232,7 +232,7 @@ tap.test('QUIC: 10 concurrent streams x 1MB each', async () => { expect(failures.length).toEqual(0); }); -tap.test('QUIC: 50 concurrent streams x 64KB each', async () => { +tap.test('QUIC: 50 concurrent TCP streams x 64KB each', async () => { const streamCount = 50; const payloadSize = 64 * 1024; @@ -251,7 +251,7 @@ tap.test('QUIC: 50 concurrent streams x 64KB each', async () => { expect(failures.length).toEqual(0); }); -tap.test('QUIC: 200 concurrent streams x 16KB each', async () => { +tap.test('QUIC: 200 concurrent TCP streams x 16KB each', async () => { const streamCount = 200; const payloadSize = 16 * 1024; @@ -270,12 +270,12 @@ tap.test('QUIC: 200 concurrent streams x 16KB each', async () => { expect(failures.length).toEqual(0); }); -tap.test('QUIC: tunnel still connected after all tests', async () => { +tap.test('QUIC: TCP tunnel still connected after all tests', async () => { const status = await tunnel.edge.getStatus(); expect(status.connected).toBeTrue(); }); -tap.test('QUIC teardown: stop tunnel and echo server', async () => { +tap.test('QUIC teardown: stop TCP tunnel and echo server', async () => { await tunnel.cleanup(); await forceCloseServer(echoServer); }); diff --git a/test/test.udp.node.ts b/test/test.udp.node.ts index b85c56c..152d827 100644 --- a/test/test.udp.node.ts +++ b/test/test.udp.node.ts @@ -104,7 +104,7 @@ let edgeUdpPort: number; // Tests // --------------------------------------------------------------------------- -tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => { +tap.test('UDP/TLS setup: start UDP echo server and TCP+TLS tunnel with UDP ports', async () => { [hubPort, edgeUdpPort] = await findFreePorts(2); // Start UDP echo server on upstream (127.0.0.2) @@ -142,21 +142,21 @@ tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => { expect(status.connected).toBeTrue(); }); -tap.test('UDP: single datagram echo — 64 bytes', async () => { +tap.test('UDP/TLS: single UDP datagram echo — 64 bytes', async () => { const data = crypto.randomBytes(64); const received = await udpSendAndReceive(edgeUdpPort, data, 5000); expect(received.length).toEqual(64); expect(Buffer.compare(received, data)).toEqual(0); }); -tap.test('UDP: single datagram echo — 1KB', async () => { +tap.test('UDP/TLS: single UDP datagram echo — 1KB', async () => { const data = crypto.randomBytes(1024); const received = await udpSendAndReceive(edgeUdpPort, data, 5000); expect(received.length).toEqual(1024); expect(Buffer.compare(received, data)).toEqual(0); }); -tap.test('UDP: 10 sequential datagrams', async () => { +tap.test('UDP/TLS: 10 sequential UDP datagrams', async () => { for (let i = 0; i < 10; i++) { const data = crypto.randomBytes(128); const received = await udpSendAndReceive(edgeUdpPort, data, 5000); @@ -165,7 +165,7 @@ tap.test('UDP: 10 sequential datagrams', async () => { } }); -tap.test('UDP: 10 concurrent datagrams from different source ports', async () => { +tap.test('UDP/TLS: 10 concurrent UDP datagrams from different source ports', async () => { const promises = Array.from({ length: 10 }, () => { const data = crypto.randomBytes(256); return udpSendAndReceive(edgeUdpPort, data, 5000).then((received) => ({ @@ -179,15 +179,105 @@ tap.test('UDP: 10 concurrent datagrams from different source ports', async () => expect(failures.length).toEqual(0); }); -tap.test('UDP: tunnel still connected after tests', async () => { +tap.test('UDP/TLS: tunnel still connected after UDP tests', async () => { const status = await edge.getStatus(); expect(status.connected).toBeTrue(); }); -tap.test('UDP teardown: stop tunnel and echo server', async () => { +tap.test('UDP/TLS teardown: stop tunnel and UDP echo server', async () => { await edge.stop(); await hub.stop(); await new Promise((resolve) => echoServer.close(() => resolve())); }); +// --------------------------------------------------------------------------- +// QUIC transport UDP tests +// --------------------------------------------------------------------------- + +let quicHub: RemoteIngressHub; +let quicEdge: RemoteIngressEdge; +let quicEchoServer: dgram.Socket; +let quicHubPort: number; +let quicEdgeUdpPort: number; + +tap.test('UDP/QUIC setup: start UDP echo server and QUIC tunnel with UDP ports', async () => { + [quicHubPort, quicEdgeUdpPort] = await findFreePorts(2); + + quicEchoServer = await startUdpEchoServer(quicEdgeUdpPort, '127.0.0.2'); + + quicHub = new RemoteIngressHub(); + quicEdge = new RemoteIngressEdge(); + + await quicHub.start({ tunnelPort: quicHubPort, targetHost: '127.0.0.2' }); + await quicHub.updateAllowedEdges([ + { id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [quicEdgeUdpPort] }, + ]); + + const connectedPromise = new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000); + quicEdge.once('tunnelConnected', () => { + clearTimeout(timeout); + resolve(); + }); + }); + + await quicEdge.start({ + hubHost: '127.0.0.1', + hubPort: quicHubPort, + edgeId: 'test-edge', + secret: 'test-secret', + bindAddress: '127.0.0.1', + transportMode: 'quic', + }); + + await connectedPromise; + await new Promise((resolve) => setTimeout(resolve, 500)); + + const status = await quicEdge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('UDP/QUIC: single UDP datagram echo — 64 bytes', async () => { + const data = crypto.randomBytes(64); + const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000); + expect(received.length).toEqual(64); + expect(Buffer.compare(received, data)).toEqual(0); +}); + +tap.test('UDP/QUIC: single UDP datagram echo — 1KB', async () => { + const data = crypto.randomBytes(1024); + const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000); + expect(received.length).toEqual(1024); + expect(Buffer.compare(received, data)).toEqual(0); +}); + +tap.test('UDP/QUIC: 10 sequential UDP datagrams', async () => { + for (let i = 0; i < 10; i++) { + const data = crypto.randomBytes(128); + const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000); + expect(received.length).toEqual(128); + expect(Buffer.compare(received, data)).toEqual(0); + } +}); + +tap.test('UDP/QUIC: 10 concurrent UDP datagrams', async () => { + const promises = Array.from({ length: 10 }, () => { + const data = crypto.randomBytes(256); + return udpSendAndReceive(quicEdgeUdpPort, data, 5000).then((received) => ({ + sizeOk: received.length === 256, + dataOk: Buffer.compare(received, data) === 0, + })); + }); + + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || !r.dataOk); + expect(failures.length).toEqual(0); +}); + +tap.test('UDP/QUIC teardown: stop QUIC tunnel and UDP echo server', async () => { + await quicEdge.stop(); + await quicHub.stop(); + await new Promise((resolve) => quicEchoServer.close(() => resolve())); +}); + export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 84b3086..8a13e3c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.11.0', + version: '4.12.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }