From e890bda8fc2d4a806a2b7113825173622fa576cb Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 19 Mar 2026 16:09:51 +0000 Subject: [PATCH] feat(udp,http3): add UDP datagram handler relay support and stream HTTP/3 request bodies to backends --- changelog.md | 8 + rust/crates/rustproxy-http/src/h3_service.rs | 72 +++++-- .../rustproxy-passthrough/src/quic_handler.rs | 22 +- .../rustproxy-passthrough/src/udp_listener.rs | 198 +++++++++++++++--- .../rustproxy-passthrough/src/udp_session.rs | 4 + rust/crates/rustproxy/src/lib.rs | 2 +- test/test.datagram-handler.ts | 125 +++++++++++ test/test.udp-forwarding.ts | 142 +++++++++++++ test/test.udp-metrics.ts | 114 ++++++++++ ts/00_commitinfo_data.ts | 2 +- ts/proxies/smart-proxy/smart-proxy.ts | 15 ++ .../smart-proxy/utils/route-validator.ts | 17 +- 12 files changed, 660 insertions(+), 61 deletions(-) create mode 100644 test/test.datagram-handler.ts create mode 100644 test/test.udp-forwarding.ts create mode 100644 test/test.udp-metrics.ts diff --git a/changelog.md b/changelog.md index 472f871..2fa7d59 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-19 - 25.14.0 - feat(udp,http3) +add UDP datagram handler relay support and stream HTTP/3 request bodies to backends + +- establish a persistent Unix socket relay for UDP datagram handlers and process handler replies back to clients +- update route validation and smart proxy route reload logic to support datagramHandler routes +- record UDP, QUIC, and HTTP/3 byte metrics more accurately, including request bytes in and UDP session cleanup connection tracking +- add integration tests for UDP forwarding, datagram handlers, and UDP metrics + ## 2026-03-19 - 25.13.0 - feat(smart-proxy) add UDP transport support with QUIC/HTTP3 routing and datagram handler relay diff --git a/rust/crates/rustproxy-http/src/h3_service.rs b/rust/crates/rustproxy-http/src/h3_service.rs index 0738b5f..86beec5 100644 --- a/rust/crates/rustproxy-http/src/h3_service.rs +++ b/rust/crates/rustproxy-http/src/h3_service.rs @@ -4,11 +4,14 @@ //! and forwards them to backends using the same routing and pool infrastructure //! as the HTTP/1+2 proxy. +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; use arc_swap::ArcSwap; use bytes::{Buf, Bytes}; +use http_body::Frame; use tracing::{debug, warn}; use rustproxy_config::{RouteConfig, TransportProtocol}; @@ -165,15 +168,6 @@ async fn handle_h3_request( let backend_port = target.port.resolve(port); let backend_addr = format!("{}:{}", backend_host, backend_port); - // Read request body - let mut body_data = Vec::new(); - while let Some(mut chunk) = stream.recv_data().await - .map_err(|e| anyhow::anyhow!("Failed to read H3 request body: {}", e))? - { - body_data.extend_from_slice(chunk.chunk()); - chunk.advance(chunk.remaining()); - } - // Connect to backend via TCP HTTP/1.1 with timeout let tcp_stream = tokio::time::timeout( connect_timeout, @@ -194,11 +188,37 @@ async fn handle_h3_request( } }); - let body = http_body_util::Full::new(Bytes::from(body_data)); + // Stream request body from H3 client to backend via an mpsc channel. + // This avoids buffering the entire request body in memory. + let (body_tx, body_rx) = tokio::sync::mpsc::channel::(4); + let total_bytes_in = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let total_bytes_in_writer = Arc::clone(&total_bytes_in); + + // Spawn the H3 body reader task + let body_reader = tokio::spawn(async move { + while let Ok(Some(mut chunk)) = stream.recv_data().await { + let data = Bytes::copy_from_slice(chunk.chunk()); + total_bytes_in_writer.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed); + chunk.advance(chunk.remaining()); + if body_tx.send(data).await.is_err() { + break; + } + } + stream + }); + + // Create a body that polls from the mpsc receiver + let body = H3RequestBody { receiver: body_rx }; let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?; + let response = sender.send_request(backend_req).await .map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?; + // Await the body reader to get the stream back + let mut stream = body_reader.await + .map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?; + let total_bytes_in = total_bytes_in.load(std::sync::atomic::Ordering::Relaxed); + // Build H3 response let status = response.status(); let mut h3_response = hyper::Response::builder().status(status); @@ -252,7 +272,7 @@ async fn handle_h3_request( // Record metrics let route_id = route.name.as_deref().or(route.id.as_deref()); - metrics.record_bytes(0, total_bytes_out, route_id, Some(client_ip)); + metrics.record_bytes(total_bytes_in, total_bytes_out, route_id, Some(client_ip)); // Finish the stream stream.finish().await @@ -262,14 +282,14 @@ async fn handle_h3_request( } /// Build an HTTP/1.1 backend request from the H3 frontend request. -fn build_backend_request( +fn build_backend_request( method: &hyper::Method, backend_addr: &str, path: &str, host: &str, original_request: &hyper::Request<()>, - body: http_body_util::Full, -) -> anyhow::Result>> { + body: B, +) -> anyhow::Result> { let mut req = hyper::Request::builder() .method(method) .uri(format!("http://{}{}", backend_addr, path)) @@ -286,3 +306,27 @@ fn build_backend_request( req.body(body) .map_err(|e| anyhow::anyhow!("Failed to build backend request: {}", e)) } + +/// A streaming request body backed by an mpsc channel receiver. +/// +/// Implements `http_body::Body` so hyper can poll chunks as they arrive +/// from the H3 client, avoiding buffering the entire request body in memory. +struct H3RequestBody { + receiver: tokio::sync::mpsc::Receiver, +} + +impl http_body::Body for H3RequestBody { + type Data = Bytes; + type Error = hyper::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.receiver.poll_recv(cx) { + Poll::Ready(Some(data)) => Poll::Ready(Some(Ok(Frame::data(data)))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/rust/crates/rustproxy-passthrough/src/quic_handler.rs b/rust/crates/rustproxy-passthrough/src/quic_handler.rs index 17e7076..31ba614 100644 --- a/rust/crates/rustproxy-passthrough/src/quic_handler.rs +++ b/rust/crates/rustproxy-passthrough/src/quic_handler.rs @@ -20,7 +20,6 @@ use rustproxy_metrics::MetricsCollector; use rustproxy_routing::{MatchContext, RouteManager}; use crate::connection_tracker::ConnectionTracker; -use crate::forwarder::ForwardMetricsCtx; /// Create a QUIC server endpoint on the given port with the provided TLS config. /// @@ -116,7 +115,7 @@ pub async fn quic_accept_loop( let cancel = cancel.child_token(); tokio::spawn(async move { - match handle_quic_connection(incoming, route, port, &metrics, &cancel).await { + match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel).await { Ok(()) => debug!("QUIC connection from {} completed", remote_addr), Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e), } @@ -138,7 +137,7 @@ async fn handle_quic_connection( incoming: quinn::Incoming, route: RouteConfig, port: u16, - metrics: &MetricsCollector, + metrics: Arc, cancel: &CancellationToken, ) -> anyhow::Result<()> { let connection = incoming.await?; @@ -155,7 +154,7 @@ async fn handle_quic_connection( // Phase 5: dispatch to H3ProxyService // For now, log and accept streams for basic handling debug!("HTTP/3 enabled for route {:?}, dispatching to H3 handler", route.name); - handle_h3_connection(connection, route, port, metrics, cancel).await + handle_h3_connection(connection, route, port, &metrics, cancel).await } else { // Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await @@ -171,11 +170,12 @@ async fn handle_quic_stream_forwarding( connection: quinn::Connection, route: RouteConfig, port: u16, - _metrics: &MetricsCollector, + metrics: Arc, cancel: &CancellationToken, ) -> anyhow::Result<()> { let remote_addr = connection.remote_address(); let route_id = route.name.as_deref().or(route.id.as_deref()); + let metrics_arc = metrics; // Resolve backend target let target = route.action.targets.as_ref() @@ -203,11 +203,8 @@ async fn handle_quic_stream_forwarding( let backend_addr = backend_addr.clone(); let ip_str = remote_addr.ip().to_string(); - let _fwd_ctx = ForwardMetricsCtx { - collector: Arc::new(MetricsCollector::new()), // TODO: share real metrics - route_id: route_id.map(|s| s.to_string()), - source_ip: Some(ip_str), - }; + let stream_metrics = Arc::clone(&metrics_arc); + let stream_route_id = route_id.map(|s| s.to_string()); // Spawn a task for each QUIC stream → TCP bidirectional forwarding tokio::spawn(async move { @@ -217,6 +214,11 @@ async fn handle_quic_stream_forwarding( &backend_addr, ).await { Ok((bytes_in, bytes_out)) => { + stream_metrics.record_bytes( + bytes_in, bytes_out, + stream_route_id.as_deref(), + Some(&ip_str), + ); debug!("QUIC stream forwarded: {}B in, {}B out", bytes_in, bytes_out); } Err(e) => { diff --git a/rust/crates/rustproxy-passthrough/src/udp_listener.rs b/rust/crates/rustproxy-passthrough/src/udp_listener.rs index ecf6291..d0e1ca5 100644 --- a/rust/crates/rustproxy-passthrough/src/udp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/udp_listener.rs @@ -8,12 +8,12 @@ use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::sync::Arc; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use arc_swap::ArcSwap; use tokio::net::UdpSocket; use tokio::task::JoinHandle; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -40,6 +40,19 @@ pub struct UdpListenerManager { cancel_token: CancellationToken, /// Unix socket path for datagram handler relay datagram_handler_relay: Arc>>, + /// Persistent write half of the relay connection + relay_writer: Arc>>, + /// Cancel token for the current relay reply reader task + relay_reader_cancel: Option, +} + +impl Drop for UdpListenerManager { + fn drop(&mut self) { + self.cancel_token.cancel(); + for (_, handle) in self.listeners.drain() { + handle.abort(); + } + } } impl UdpListenerManager { @@ -57,6 +70,8 @@ impl UdpListenerManager { session_table: Arc::new(UdpSessionTable::new()), cancel_token, datagram_handler_relay: Arc::new(RwLock::new(None)), + relay_writer: Arc::new(Mutex::new(None)), + relay_reader_cancel: None, } } @@ -126,6 +141,7 @@ impl UdpListenerManager { Arc::clone(&self.conn_tracker), Arc::clone(&self.session_table), Arc::clone(&self.datagram_handler_relay), + Arc::clone(&self.relay_writer), self.cancel_token.child_token(), )); @@ -165,16 +181,49 @@ impl UdpListenerManager { self.session_table.session_count()); } - /// Set the datagram handler relay socket path. - pub async fn set_datagram_handler_relay(&self, path: String) { - let mut relay = self.datagram_handler_relay.write().await; - *relay = Some(path); + /// Set the datagram handler relay socket path and establish connection. + pub async fn set_datagram_handler_relay(&mut self, path: String) { + // Cancel previous relay reader task if any + if let Some(old_cancel) = self.relay_reader_cancel.take() { + old_cancel.cancel(); + } + + // Store the path + { + let mut relay = self.datagram_handler_relay.write().await; + *relay = Some(path.clone()); + } + + // Connect to the Unix socket + match tokio::net::UnixStream::connect(&path).await { + Ok(stream) => { + let (read_half, write_half) = stream.into_split(); + + // Store write half for sending datagrams + { + let mut writer = self.relay_writer.lock().await; + *writer = Some(write_half); + } + + // Spawn reply reader — reads length-prefixed JSON replies from TS + // and sends them back to clients via the listener sockets + let cancel = self.cancel_token.child_token(); + self.relay_reader_cancel = Some(cancel.clone()); + tokio::spawn(Self::relay_reply_reader(read_half, cancel)); + + info!("Datagram handler relay connected to {}", path); + } + Err(e) => { + error!("Failed to connect datagram handler relay to {}: {}", path, e); + } + } } /// Start periodic session cleanup task. fn start_cleanup_task(&self) { let session_table = Arc::clone(&self.session_table); let metrics = Arc::clone(&self.metrics); + let conn_tracker = Arc::clone(&self.conn_tracker); let cancel = self.cancel_token.child_token(); let route_manager = Arc::clone(&self.route_manager); @@ -188,7 +237,7 @@ impl UdpListenerManager { // or default 60s if none configured) let rm = route_manager.load(); let timeout_ms = Self::get_min_session_timeout(&rm); - let removed = session_table.cleanup_idle(timeout_ms, &metrics); + let removed = session_table.cleanup_idle(timeout_ms, &metrics, &conn_tracker); if removed > 0 { debug!("UDP session cleanup: removed {} idle sessions, {} remaining", removed, session_table.session_count()); @@ -213,7 +262,8 @@ impl UdpListenerManager { metrics: Arc, conn_tracker: Arc, session_table: Arc, - datagram_handler_relay: Arc>>, + _datagram_handler_relay: Arc>>, + relay_writer: Arc>>, cancel: CancellationToken, ) { // Use a reasonably large buffer; actual max is per-route but we need a single buffer @@ -264,21 +314,16 @@ impl UdpListenerManager { let route = route_match.route; let route_id = route.name.as_deref().or(route.id.as_deref()); - // Socket handler routes → relay datagram to TS via Unix socket + // Socket handler routes → relay datagram to TS via persistent Unix socket if route.action.action_type == RouteActionType::SocketHandler { - let relay_path = datagram_handler_relay.read().await; - if let Some(ref path) = *relay_path { - if let Err(e) = Self::relay_datagram_to_ts( - path, - route_id.unwrap_or("unknown"), - &client_addr, - port, - datagram, - ).await { - debug!("Failed to relay UDP datagram to TS: {}", e); - } - } else { - debug!("UDP datagram handler relay not configured for route {:?}", route_id); + if let Err(e) = Self::relay_datagram_via_writer( + &relay_writer, + route_id.unwrap_or("unknown"), + &client_addr, + port, + datagram, + ).await { + debug!("Failed to relay UDP datagram to TS: {}", e); } continue; } @@ -441,10 +486,9 @@ impl UdpListenerManager { } } - /// Relay a UDP datagram to the TypeScript handler via Unix socket. - /// Uses length-prefixed JSON framing: [4-byte BE length][JSON payload] - async fn relay_datagram_to_ts( - relay_path: &str, + /// Send a datagram to TS via the persistent relay writer. + async fn relay_datagram_via_writer( + writer: &Mutex>, route_key: &str, client_addr: &SocketAddr, dest_port: u16, @@ -463,8 +507,9 @@ impl UdpListenerManager { }); let json = serde_json::to_vec(&msg)?; - // Connect to relay (one-shot for now; persistent connection optimization deferred) - let mut stream = tokio::net::UnixStream::connect(relay_path).await?; + let mut guard = writer.lock().await; + let stream = guard.as_mut() + .ok_or_else(|| anyhow::anyhow!("Datagram relay not connected"))?; // Length-prefixed frame let len_bytes = (json.len() as u32).to_be_bytes(); @@ -474,4 +519,101 @@ impl UdpListenerManager { Ok(()) } + + /// Background task reading reply frames from the TS datagram handler. + /// Parses replies and sends them back to the original client via UDP. + async fn relay_reply_reader( + mut reader: tokio::net::unix::OwnedReadHalf, + cancel: CancellationToken, + ) { + use base64::Engine; + + let mut len_buf = [0u8; 4]; + loop { + // Read length prefix + let read_result = tokio::select! { + _ = cancel.cancelled() => break, + result = reader.read_exact(&mut len_buf) => result, + }; + + match read_result { + Ok(_) => {} + Err(e) => { + debug!("Datagram relay reader closed: {}", e); + break; + } + } + + let frame_len = u32::from_be_bytes(len_buf) as usize; + if frame_len > 10 * 1024 * 1024 { + error!("Datagram relay frame too large: {} bytes", frame_len); + break; + } + + let mut frame_buf = vec![0u8; frame_len]; + match reader.read_exact(&mut frame_buf).await { + Ok(_) => {} + Err(e) => { + debug!("Datagram relay reader frame error: {}", e); + break; + } + } + + // Parse the reply JSON + let reply: serde_json::Value = match serde_json::from_slice(&frame_buf) { + Ok(v) => v, + Err(e) => { + debug!("Datagram relay reply parse error: {}", e); + continue; + } + }; + + if reply.get("type").and_then(|v| v.as_str()) != Some("reply") { + continue; + } + + let source_ip = reply.get("sourceIp").and_then(|v| v.as_str()).unwrap_or(""); + let source_port = reply.get("sourcePort").and_then(|v| v.as_u64()).unwrap_or(0) as u16; + let dest_port = reply.get("destPort").and_then(|v| v.as_u64()).unwrap_or(0) as u16; + let payload_b64 = reply.get("payloadBase64").and_then(|v| v.as_str()).unwrap_or(""); + + let payload = match base64::engine::general_purpose::STANDARD.decode(payload_b64) { + Ok(p) => p, + Err(e) => { + debug!("Datagram relay reply base64 decode error: {}", e); + continue; + } + }; + + let client_addr: SocketAddr = match format!("{}:{}", source_ip, source_port).parse() { + Ok(a) => a, + Err(e) => { + debug!("Datagram relay reply address parse error: {}", e); + continue; + } + }; + + // Send the reply back to the client via a temporary UDP socket bound to the dest_port + // We need the listener socket for this port. For simplicity, use a fresh socket. + let reply_socket = match UdpSocket::bind(format!("0.0.0.0:{}", dest_port)).await { + Ok(s) => s, + Err(_) => { + // Port already bound by the listener — use unbound socket + match UdpSocket::bind("0.0.0.0:0").await { + Ok(s) => s, + Err(e) => { + debug!("Failed to create reply socket: {}", e); + continue; + } + } + } + }; + + if let Err(e) = reply_socket.send_to(&payload, client_addr).await { + debug!("Failed to send datagram reply to {}: {}", client_addr, e); + } + } + + debug!("Datagram relay reply reader stopped"); + } } diff --git a/rust/crates/rustproxy-passthrough/src/udp_session.rs b/rust/crates/rustproxy-passthrough/src/udp_session.rs index ca3d8a0..322403c 100644 --- a/rust/crates/rustproxy-passthrough/src/udp_session.rs +++ b/rust/crates/rustproxy-passthrough/src/udp_session.rs @@ -18,6 +18,8 @@ use tracing::debug; use rustproxy_metrics::MetricsCollector; +use crate::connection_tracker::ConnectionTracker; + /// A single UDP session (flow). pub struct UdpSession { /// Socket bound to ephemeral port, connected to backend @@ -165,6 +167,7 @@ impl UdpSessionTable { &self, timeout_ms: u64, metrics: &MetricsCollector, + conn_tracker: &ConnectionTracker, ) -> usize { let now_ms = self.elapsed_ms(); let mut removed = 0; @@ -185,6 +188,7 @@ impl UdpSessionTable { session.client_addr, key.1, now_ms.saturating_sub(session.last_activity.load(Ordering::Relaxed)) ); + conn_tracker.connection_closed(&session.source_ip); metrics.connection_closed( session.route_id.as_deref(), Some(&session.source_ip.to_string()), diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index 27c4526..2a205ed 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -1008,7 +1008,7 @@ impl RustProxy { /// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks. pub async fn set_datagram_handler_relay_path(&mut self, path: Option) { info!("Datagram handler relay path set to: {:?}", path); - if let Some(ref udp_mgr) = self.udp_listener_manager { + if let Some(ref mut udp_mgr) = self.udp_listener_manager { if let Some(ref p) = path { udp_mgr.set_datagram_handler_relay(p.clone()).await; } diff --git a/test/test.datagram-handler.ts b/test/test.datagram-handler.ts new file mode 100644 index 0000000..26f0c42 --- /dev/null +++ b/test/test.datagram-handler.ts @@ -0,0 +1,125 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as dgram from 'dgram'; +import { SmartProxy } from '../ts/index.js'; +import type { TDatagramHandler, IDatagramInfo } from '../ts/index.js'; +import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js'; + +let smartProxy: SmartProxy; +let PROXY_PORT: number; + +// Helper: send a single UDP datagram and wait for a response +function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise { + return new Promise((resolve, reject) => { + const client = dgram.createSocket('udp4'); + const timeout = setTimeout(() => { + client.close(); + reject(new Error(`UDP response timeout after ${timeoutMs}ms`)); + }, timeoutMs); + client.send(Buffer.from(msg), port, '127.0.0.1'); + client.on('message', (data) => { + clearTimeout(timeout); + client.close(); + resolve(data.toString()); + }); + client.on('error', (err) => { + clearTimeout(timeout); + client.close(); + reject(err); + }); + }); +} + +tap.test('setup: start SmartProxy with datagramHandler', async () => { + [PROXY_PORT] = await findFreePorts(1); + + const handler: TDatagramHandler = (datagram, info, reply) => { + reply(Buffer.from(`Handled: ${datagram.toString()}`)); + }; + + smartProxy = new SmartProxy({ + routes: [ + { + name: 'dgram-handler-test', + match: { + ports: PROXY_PORT, + transport: 'udp' as const, + }, + action: { + type: 'socket-handler', + datagramHandler: handler, + }, + }, + ], + defaults: { + security: { + ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'], + }, + }, + }); + + await smartProxy.start(); +}); + +tap.test('datagram handler: receives and replies to datagram', async () => { + const response = await sendDatagram(PROXY_PORT, 'Hello Handler'); + expect(response).toEqual('Handled: Hello Handler'); +}); + +tap.test('datagram handler: async handler works', async () => { + // Stop and restart with async handler + await smartProxy.stop(); + + [PROXY_PORT] = await findFreePorts(1); + + const asyncHandler: TDatagramHandler = async (datagram, info, reply) => { + // Simulate async work + await new Promise((resolve) => setTimeout(resolve, 10)); + reply(Buffer.from(`Async: ${datagram.toString()}`)); + }; + + smartProxy = new SmartProxy({ + routes: [ + { + name: 'dgram-async-handler', + match: { + ports: PROXY_PORT, + transport: 'udp' as const, + }, + action: { + type: 'socket-handler', + datagramHandler: asyncHandler, + }, + }, + ], + defaults: { + security: { + ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'], + }, + }, + }); + + await smartProxy.start(); + + const response = await sendDatagram(PROXY_PORT, 'Test Async'); + expect(response).toEqual('Async: Test Async'); +}); + +tap.test('datagram handler: multiple rapid datagrams', async () => { + const promises: Promise[] = []; + for (let i = 0; i < 5; i++) { + promises.push(sendDatagram(PROXY_PORT, `msg-${i}`)); + } + + const responses = await Promise.all(promises); + + for (let i = 0; i < 5; i++) { + expect(responses).toContain(`Async: msg-${i}`); + } +}); + +tap.test('cleanup: stop SmartProxy', async () => { + await smartProxy.stop(); + await assertPortsFree([PROXY_PORT]); +}); + +export default tap.start(); diff --git a/test/test.udp-forwarding.ts b/test/test.udp-forwarding.ts new file mode 100644 index 0000000..2c0d303 --- /dev/null +++ b/test/test.udp-forwarding.ts @@ -0,0 +1,142 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as dgram from 'dgram'; +import { SmartProxy } from '../ts/index.js'; +import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js'; + +let smartProxy: SmartProxy; +let backendServer: dgram.Socket; +let PROXY_PORT: number; +let BACKEND_PORT: number; + +// Helper: send a single UDP datagram and wait for a response +function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise { + return new Promise((resolve, reject) => { + const client = dgram.createSocket('udp4'); + const timeout = setTimeout(() => { + client.close(); + reject(new Error(`UDP response timeout after ${timeoutMs}ms`)); + }, timeoutMs); + client.send(Buffer.from(msg), port, '127.0.0.1'); + client.on('message', (data) => { + clearTimeout(timeout); + client.close(); + resolve(data.toString()); + }); + client.on('error', (err) => { + clearTimeout(timeout); + client.close(); + reject(err); + }); + }); +} + +// Helper: create a UDP echo server +function createUdpEchoServer(port: number): Promise { + return new Promise((resolve) => { + const server = dgram.createSocket('udp4'); + server.on('message', (msg, rinfo) => { + server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address); + }); + server.bind(port, '127.0.0.1', () => resolve(server)); + }); +} + +tap.test('setup: start UDP echo server and SmartProxy', async () => { + [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); + + // Start backend UDP echo server + backendServer = await createUdpEchoServer(BACKEND_PORT); + + // Start SmartProxy with a UDP forwarding route + smartProxy = new SmartProxy({ + routes: [ + { + name: 'udp-forward-test', + match: { + ports: PROXY_PORT, + transport: 'udp' as const, + }, + action: { + type: 'forward', + targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], + udp: { + sessionTimeout: 5000, + }, + }, + }, + ], + defaults: { + security: { + ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'], + }, + }, + }); + + await smartProxy.start(); +}); + +tap.test('UDP forwarding: basic datagram round-trip', async () => { + const response = await sendDatagram(PROXY_PORT, 'Hello UDP'); + expect(response).toEqual('Echo: Hello UDP'); +}); + +tap.test('UDP forwarding: multiple datagrams same session', async () => { + // Use a single client socket for session reuse + const client = dgram.createSocket('udp4'); + const responses: string[] = []; + + const done = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + client.close(); + reject(new Error('Timeout waiting for 3 responses')); + }, 5000); + + client.on('message', (data) => { + responses.push(data.toString()); + if (responses.length === 3) { + clearTimeout(timeout); + client.close(); + resolve(); + } + }); + client.on('error', (err) => { + clearTimeout(timeout); + client.close(); + reject(err); + }); + }); + + client.send(Buffer.from('msg1'), PROXY_PORT, '127.0.0.1'); + client.send(Buffer.from('msg2'), PROXY_PORT, '127.0.0.1'); + client.send(Buffer.from('msg3'), PROXY_PORT, '127.0.0.1'); + + await done; + + expect(responses).toContain('Echo: msg1'); + expect(responses).toContain('Echo: msg2'); + expect(responses).toContain('Echo: msg3'); +}); + +tap.test('UDP forwarding: multiple clients', async () => { + const [resp1, resp2] = await Promise.all([ + sendDatagram(PROXY_PORT, 'client1'), + sendDatagram(PROXY_PORT, 'client2'), + ]); + + expect(resp1).toEqual('Echo: client1'); + expect(resp2).toEqual('Echo: client2'); +}); + +tap.test('UDP forwarding: large datagram (1400 bytes)', async () => { + const payload = 'X'.repeat(1400); + const response = await sendDatagram(PROXY_PORT, payload); + expect(response).toEqual(`Echo: ${payload}`); +}); + +tap.test('cleanup: stop SmartProxy and backend', async () => { + await smartProxy.stop(); + await new Promise((resolve) => backendServer.close(() => resolve())); + await assertPortsFree([PROXY_PORT, BACKEND_PORT]); +}); + +export default tap.start(); diff --git a/test/test.udp-metrics.ts b/test/test.udp-metrics.ts new file mode 100644 index 0000000..d73f35c --- /dev/null +++ b/test/test.udp-metrics.ts @@ -0,0 +1,114 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as dgram from 'dgram'; +import { SmartProxy } from '../ts/index.js'; +import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js'; + +let smartProxy: SmartProxy; +let backendServer: dgram.Socket; +let PROXY_PORT: number; +let BACKEND_PORT: number; + +// Helper: send a single UDP datagram and wait for a response +function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise { + return new Promise((resolve, reject) => { + const client = dgram.createSocket('udp4'); + const timeout = setTimeout(() => { + client.close(); + reject(new Error(`UDP response timeout after ${timeoutMs}ms`)); + }, timeoutMs); + client.send(Buffer.from(msg), port, '127.0.0.1'); + client.on('message', (data) => { + clearTimeout(timeout); + client.close(); + resolve(data.toString()); + }); + client.on('error', (err) => { + clearTimeout(timeout); + client.close(); + reject(err); + }); + }); +} + +// Helper: create a UDP echo server +function createUdpEchoServer(port: number): Promise { + return new Promise((resolve) => { + const server = dgram.createSocket('udp4'); + server.on('message', (msg, rinfo) => { + server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address); + }); + server.bind(port, '127.0.0.1', () => resolve(server)); + }); +} + +tap.test('setup: start UDP echo server and SmartProxy with metrics', async () => { + [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); + + backendServer = await createUdpEchoServer(BACKEND_PORT); + + smartProxy = new SmartProxy({ + routes: [ + { + name: 'udp-metrics-test', + match: { + ports: PROXY_PORT, + transport: 'udp' as const, + }, + action: { + type: 'forward', + targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], + udp: { + sessionTimeout: 10000, + }, + }, + }, + ], + defaults: { + security: { + ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'], + }, + }, + metrics: { + enabled: true, + sampleIntervalMs: 1000, + retentionSeconds: 60, + }, + }); + + await smartProxy.start(); +}); + +tap.test('UDP metrics: counters increase after traffic', async () => { + // Send a few datagrams + const resp1 = await sendDatagram(PROXY_PORT, 'metrics-test-1'); + expect(resp1).toEqual('Echo: metrics-test-1'); + + const resp2 = await sendDatagram(PROXY_PORT, 'metrics-test-2'); + expect(resp2).toEqual('Echo: metrics-test-2'); + + // Wait for metrics to propagate and cache to refresh + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Get metrics (returns the adapter, need to ensure cache is fresh) + const metrics = smartProxy.getMetrics(); + + // The udp property reads from the Rust JSON snapshot + expect(metrics.udp).toBeDefined(); + const totalSessions = metrics.udp.totalSessions(); + const datagramsIn = metrics.udp.datagramsIn(); + const datagramsOut = metrics.udp.datagramsOut(); + + console.log(`UDP metrics: sessions=${totalSessions}, in=${datagramsIn}, out=${datagramsOut}`); + + expect(totalSessions).toBeGreaterThan(0); + expect(datagramsIn).toBeGreaterThan(0); + expect(datagramsOut).toBeGreaterThan(0); +}); + +tap.test('cleanup: stop SmartProxy and backend', async () => { + await smartProxy.stop(); + await new Promise((resolve) => backendServer.close(() => resolve())); + await assertPortsFree([PROXY_PORT, BACKEND_PORT]); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 2990963..99b07b9 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.13.0', + version: '25.14.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/smart-proxy.ts b/ts/proxies/smart-proxy/smart-proxy.ts index cb71e8e..cfba498 100644 --- a/ts/proxies/smart-proxy/smart-proxy.ts +++ b/ts/proxies/smart-proxy/smart-proxy.ts @@ -303,6 +303,21 @@ export class SmartProxy extends plugins.EventEmitter { this.socketHandlerServer = null; } + // Update datagram handler relay if datagram handler routes changed + const hasDatagramHandlers = newRoutes.some( + (r) => r.action.type === 'socket-handler' && r.action.datagramHandler + ); + + if (hasDatagramHandlers && !this.datagramHandlerServer) { + const dgPath = `/tmp/smartproxy-dgram-relay-${process.pid}.sock`; + this.datagramHandlerServer = new DatagramHandlerServer(dgPath, this.preprocessor); + await this.datagramHandlerServer.start(); + await this.bridge.setDatagramHandlerRelay(this.datagramHandlerServer.getSocketPath()); + } else if (!hasDatagramHandlers && this.datagramHandlerServer) { + await this.datagramHandlerServer.stop(); + this.datagramHandlerServer = null; + } + // Update stored routes this.settings.routes = newRoutes; diff --git a/ts/proxies/smart-proxy/utils/route-validator.ts b/ts/proxies/smart-proxy/utils/route-validator.ts index 9d42c71..e9ee8d8 100644 --- a/ts/proxies/smart-proxy/utils/route-validator.ts +++ b/ts/proxies/smart-proxy/utils/route-validator.ts @@ -123,10 +123,10 @@ export class RouteValidator { errors.push(`Invalid action type: ${route.action.type}. Must be one of: ${this.VALID_ACTION_TYPES.join(', ')}`); } - // Validate socket-handler + // Validate socket-handler (TCP socketHandler or UDP datagramHandler) if (route.action.type === 'socket-handler') { - if (typeof route.action.socketHandler !== 'function') { - errors.push('socket-handler action requires a socketHandler function'); + if (typeof route.action.socketHandler !== 'function' && typeof route.action.datagramHandler !== 'function') { + errors.push('socket-handler action requires a socketHandler or datagramHandler function'); } } @@ -620,10 +620,12 @@ export function validateRouteAction(action: IRouteAction): { valid: boolean; err } if (action.type === 'socket-handler') { - if (!action.socketHandler) { - errors.push('Socket handler function is required for socket-handler action'); - } else if (typeof action.socketHandler !== 'function') { + if (!action.socketHandler && !action.datagramHandler) { + errors.push('Socket handler or datagram handler function is required for socket-handler action'); + } else if (action.socketHandler && typeof action.socketHandler !== 'function') { errors.push('Socket handler must be a function'); + } else if (action.datagramHandler && typeof action.datagramHandler !== 'function') { + errors.push('Datagram handler must be a function'); } } @@ -714,7 +716,8 @@ export function hasRequiredPropertiesForAction(route: IRouteConfig, actionType: route.action.targets.length > 0 && route.action.targets.every(t => t.host && t.port !== undefined); case 'socket-handler': - return !!route.action.socketHandler && typeof route.action.socketHandler === 'function'; + return (!!route.action.socketHandler && typeof route.action.socketHandler === 'function') || + (!!route.action.datagramHandler && typeof route.action.datagramHandler === 'function'); default: return false; }