feat(udp,http3): add UDP datagram handler relay support and stream HTTP/3 request bodies to backends

This commit is contained in:
2026-03-19 16:09:51 +00:00
parent bbe8b729ea
commit e890bda8fc
12 changed files with 660 additions and 61 deletions

View File

@@ -20,7 +20,6 @@ use rustproxy_metrics::MetricsCollector;
use rustproxy_routing::{MatchContext, RouteManager};
use crate::connection_tracker::ConnectionTracker;
use crate::forwarder::ForwardMetricsCtx;
/// Create a QUIC server endpoint on the given port with the provided TLS config.
///
@@ -116,7 +115,7 @@ pub async fn quic_accept_loop(
let cancel = cancel.child_token();
tokio::spawn(async move {
match handle_quic_connection(incoming, route, port, &metrics, &cancel).await {
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel).await {
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
}
@@ -138,7 +137,7 @@ async fn handle_quic_connection(
incoming: quinn::Incoming,
route: RouteConfig,
port: u16,
metrics: &MetricsCollector,
metrics: Arc<MetricsCollector>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let connection = incoming.await?;
@@ -155,7 +154,7 @@ async fn handle_quic_connection(
// Phase 5: dispatch to H3ProxyService
// For now, log and accept streams for basic handling
debug!("HTTP/3 enabled for route {:?}, dispatching to H3 handler", route.name);
handle_h3_connection(connection, route, port, metrics, cancel).await
handle_h3_connection(connection, route, port, &metrics, cancel).await
} else {
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
@@ -171,11 +170,12 @@ async fn handle_quic_stream_forwarding(
connection: quinn::Connection,
route: RouteConfig,
port: u16,
_metrics: &MetricsCollector,
metrics: Arc<MetricsCollector>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let remote_addr = connection.remote_address();
let route_id = route.name.as_deref().or(route.id.as_deref());
let metrics_arc = metrics;
// Resolve backend target
let target = route.action.targets.as_ref()
@@ -203,11 +203,8 @@ async fn handle_quic_stream_forwarding(
let backend_addr = backend_addr.clone();
let ip_str = remote_addr.ip().to_string();
let _fwd_ctx = ForwardMetricsCtx {
collector: Arc::new(MetricsCollector::new()), // TODO: share real metrics
route_id: route_id.map(|s| s.to_string()),
source_ip: Some(ip_str),
};
let stream_metrics = Arc::clone(&metrics_arc);
let stream_route_id = route_id.map(|s| s.to_string());
// Spawn a task for each QUIC stream → TCP bidirectional forwarding
tokio::spawn(async move {
@@ -217,6 +214,11 @@ async fn handle_quic_stream_forwarding(
&backend_addr,
).await {
Ok((bytes_in, bytes_out)) => {
stream_metrics.record_bytes(
bytes_in, bytes_out,
stream_route_id.as_deref(),
Some(&ip_str),
);
debug!("QUIC stream forwarded: {}B in, {}B out", bytes_in, bytes_out);
}
Err(e) => {

View File

@@ -8,12 +8,12 @@ use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use arc_swap::ArcSwap;
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@@ -40,6 +40,19 @@ pub struct UdpListenerManager {
cancel_token: CancellationToken,
/// Unix socket path for datagram handler relay
datagram_handler_relay: Arc<RwLock<Option<String>>>,
/// Persistent write half of the relay connection
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
/// Cancel token for the current relay reply reader task
relay_reader_cancel: Option<CancellationToken>,
}
impl Drop for UdpListenerManager {
fn drop(&mut self) {
self.cancel_token.cancel();
for (_, handle) in self.listeners.drain() {
handle.abort();
}
}
}
impl UdpListenerManager {
@@ -57,6 +70,8 @@ impl UdpListenerManager {
session_table: Arc::new(UdpSessionTable::new()),
cancel_token,
datagram_handler_relay: Arc::new(RwLock::new(None)),
relay_writer: Arc::new(Mutex::new(None)),
relay_reader_cancel: None,
}
}
@@ -126,6 +141,7 @@ impl UdpListenerManager {
Arc::clone(&self.conn_tracker),
Arc::clone(&self.session_table),
Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(),
));
@@ -165,16 +181,49 @@ impl UdpListenerManager {
self.session_table.session_count());
}
/// Set the datagram handler relay socket path.
pub async fn set_datagram_handler_relay(&self, path: String) {
let mut relay = self.datagram_handler_relay.write().await;
*relay = Some(path);
/// Set the datagram handler relay socket path and establish connection.
pub async fn set_datagram_handler_relay(&mut self, path: String) {
// Cancel previous relay reader task if any
if let Some(old_cancel) = self.relay_reader_cancel.take() {
old_cancel.cancel();
}
// Store the path
{
let mut relay = self.datagram_handler_relay.write().await;
*relay = Some(path.clone());
}
// Connect to the Unix socket
match tokio::net::UnixStream::connect(&path).await {
Ok(stream) => {
let (read_half, write_half) = stream.into_split();
// Store write half for sending datagrams
{
let mut writer = self.relay_writer.lock().await;
*writer = Some(write_half);
}
// Spawn reply reader — reads length-prefixed JSON replies from TS
// and sends them back to clients via the listener sockets
let cancel = self.cancel_token.child_token();
self.relay_reader_cancel = Some(cancel.clone());
tokio::spawn(Self::relay_reply_reader(read_half, cancel));
info!("Datagram handler relay connected to {}", path);
}
Err(e) => {
error!("Failed to connect datagram handler relay to {}: {}", path, e);
}
}
}
/// Start periodic session cleanup task.
fn start_cleanup_task(&self) {
let session_table = Arc::clone(&self.session_table);
let metrics = Arc::clone(&self.metrics);
let conn_tracker = Arc::clone(&self.conn_tracker);
let cancel = self.cancel_token.child_token();
let route_manager = Arc::clone(&self.route_manager);
@@ -188,7 +237,7 @@ impl UdpListenerManager {
// or default 60s if none configured)
let rm = route_manager.load();
let timeout_ms = Self::get_min_session_timeout(&rm);
let removed = session_table.cleanup_idle(timeout_ms, &metrics);
let removed = session_table.cleanup_idle(timeout_ms, &metrics, &conn_tracker);
if removed > 0 {
debug!("UDP session cleanup: removed {} idle sessions, {} remaining",
removed, session_table.session_count());
@@ -213,7 +262,8 @@ impl UdpListenerManager {
metrics: Arc<MetricsCollector>,
conn_tracker: Arc<ConnectionTracker>,
session_table: Arc<UdpSessionTable>,
datagram_handler_relay: Arc<RwLock<Option<String>>>,
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
cancel: CancellationToken,
) {
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
@@ -264,21 +314,16 @@ impl UdpListenerManager {
let route = route_match.route;
let route_id = route.name.as_deref().or(route.id.as_deref());
// Socket handler routes → relay datagram to TS via Unix socket
// Socket handler routes → relay datagram to TS via persistent Unix socket
if route.action.action_type == RouteActionType::SocketHandler {
let relay_path = datagram_handler_relay.read().await;
if let Some(ref path) = *relay_path {
if let Err(e) = Self::relay_datagram_to_ts(
path,
route_id.unwrap_or("unknown"),
&client_addr,
port,
datagram,
).await {
debug!("Failed to relay UDP datagram to TS: {}", e);
}
} else {
debug!("UDP datagram handler relay not configured for route {:?}", route_id);
if let Err(e) = Self::relay_datagram_via_writer(
&relay_writer,
route_id.unwrap_or("unknown"),
&client_addr,
port,
datagram,
).await {
debug!("Failed to relay UDP datagram to TS: {}", e);
}
continue;
}
@@ -441,10 +486,9 @@ impl UdpListenerManager {
}
}
/// Relay a UDP datagram to the TypeScript handler via Unix socket.
/// Uses length-prefixed JSON framing: [4-byte BE length][JSON payload]
async fn relay_datagram_to_ts(
relay_path: &str,
/// Send a datagram to TS via the persistent relay writer.
async fn relay_datagram_via_writer(
writer: &Mutex<Option<tokio::net::unix::OwnedWriteHalf>>,
route_key: &str,
client_addr: &SocketAddr,
dest_port: u16,
@@ -463,8 +507,9 @@ impl UdpListenerManager {
});
let json = serde_json::to_vec(&msg)?;
// Connect to relay (one-shot for now; persistent connection optimization deferred)
let mut stream = tokio::net::UnixStream::connect(relay_path).await?;
let mut guard = writer.lock().await;
let stream = guard.as_mut()
.ok_or_else(|| anyhow::anyhow!("Datagram relay not connected"))?;
// Length-prefixed frame
let len_bytes = (json.len() as u32).to_be_bytes();
@@ -474,4 +519,101 @@ impl UdpListenerManager {
Ok(())
}
/// Background task reading reply frames from the TS datagram handler.
/// Parses replies and sends them back to the original client via UDP.
async fn relay_reply_reader(
mut reader: tokio::net::unix::OwnedReadHalf,
cancel: CancellationToken,
) {
use base64::Engine;
let mut len_buf = [0u8; 4];
loop {
// Read length prefix
let read_result = tokio::select! {
_ = cancel.cancelled() => break,
result = reader.read_exact(&mut len_buf) => result,
};
match read_result {
Ok(_) => {}
Err(e) => {
debug!("Datagram relay reader closed: {}", e);
break;
}
}
let frame_len = u32::from_be_bytes(len_buf) as usize;
if frame_len > 10 * 1024 * 1024 {
error!("Datagram relay frame too large: {} bytes", frame_len);
break;
}
let mut frame_buf = vec![0u8; frame_len];
match reader.read_exact(&mut frame_buf).await {
Ok(_) => {}
Err(e) => {
debug!("Datagram relay reader frame error: {}", e);
break;
}
}
// Parse the reply JSON
let reply: serde_json::Value = match serde_json::from_slice(&frame_buf) {
Ok(v) => v,
Err(e) => {
debug!("Datagram relay reply parse error: {}", e);
continue;
}
};
if reply.get("type").and_then(|v| v.as_str()) != Some("reply") {
continue;
}
let source_ip = reply.get("sourceIp").and_then(|v| v.as_str()).unwrap_or("");
let source_port = reply.get("sourcePort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
let dest_port = reply.get("destPort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
let payload_b64 = reply.get("payloadBase64").and_then(|v| v.as_str()).unwrap_or("");
let payload = match base64::engine::general_purpose::STANDARD.decode(payload_b64) {
Ok(p) => p,
Err(e) => {
debug!("Datagram relay reply base64 decode error: {}", e);
continue;
}
};
let client_addr: SocketAddr = match format!("{}:{}", source_ip, source_port).parse() {
Ok(a) => a,
Err(e) => {
debug!("Datagram relay reply address parse error: {}", e);
continue;
}
};
// Send the reply back to the client via a temporary UDP socket bound to the dest_port
// We need the listener socket for this port. For simplicity, use a fresh socket.
let reply_socket = match UdpSocket::bind(format!("0.0.0.0:{}", dest_port)).await {
Ok(s) => s,
Err(_) => {
// Port already bound by the listener — use unbound socket
match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
debug!("Failed to create reply socket: {}", e);
continue;
}
}
}
};
if let Err(e) = reply_socket.send_to(&payload, client_addr).await {
debug!("Failed to send datagram reply to {}: {}", client_addr, e);
}
}
debug!("Datagram relay reply reader stopped");
}
}

View File

@@ -18,6 +18,8 @@ use tracing::debug;
use rustproxy_metrics::MetricsCollector;
use crate::connection_tracker::ConnectionTracker;
/// A single UDP session (flow).
pub struct UdpSession {
/// Socket bound to ephemeral port, connected to backend
@@ -165,6 +167,7 @@ impl UdpSessionTable {
&self,
timeout_ms: u64,
metrics: &MetricsCollector,
conn_tracker: &ConnectionTracker,
) -> usize {
let now_ms = self.elapsed_ms();
let mut removed = 0;
@@ -185,6 +188,7 @@ impl UdpSessionTable {
session.client_addr, key.1,
now_ms.saturating_sub(session.last_activity.load(Ordering::Relaxed))
);
conn_tracker.connection_closed(&session.source_ip);
metrics.connection_closed(
session.route_id.as_deref(),
Some(&session.source_ip.to_string()),