feat(rustproxy-passthrough): add PROXY protocol v2 client IP handling for UDP and QUIC listeners
This commit is contained in:
@@ -3,13 +3,21 @@
|
||||
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
|
||||
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
|
||||
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
|
||||
//!
|
||||
//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2
|
||||
//! headers before they reach quinn, extracting real client IPs for attribution.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use dashmap::DashMap;
|
||||
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
|
||||
use rustls::ServerConfig as RustlsServerConfig;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -47,9 +55,274 @@ pub fn create_quic_endpoint(
|
||||
Ok(endpoint)
|
||||
}
|
||||
|
||||
// ===== PROXY protocol relay for QUIC =====
|
||||
|
||||
/// Result of creating a QUIC endpoint with a PROXY protocol relay layer.
|
||||
pub struct QuicProxyRelay {
|
||||
/// The quinn endpoint (bound to 127.0.0.1:ephemeral).
|
||||
pub endpoint: Endpoint,
|
||||
/// The relay recv loop task handle.
|
||||
pub relay_task: JoinHandle<()>,
|
||||
/// Maps relay socket local addr → real client SocketAddr (from PROXY v2).
|
||||
/// Consulted by `quic_accept_loop` to resolve real client IPs.
|
||||
pub real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
|
||||
}
|
||||
|
||||
/// A single relay session for forwarding datagrams between an external source
|
||||
/// and the internal quinn endpoint.
|
||||
struct RelaySession {
|
||||
socket: Arc<UdpSocket>,
|
||||
last_activity: AtomicU64,
|
||||
return_task: JoinHandle<()>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
|
||||
///
|
||||
/// Instead of giving the external socket to quinn, we:
|
||||
/// 1. Bind a raw UDP socket on 0.0.0.0:port (external)
|
||||
/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral)
|
||||
/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams
|
||||
///
|
||||
/// Only used when `proxy_ips` is non-empty.
|
||||
pub fn create_quic_endpoint_with_proxy_relay(
|
||||
port: u16,
|
||||
tls_config: Arc<RustlsServerConfig>,
|
||||
proxy_ips: Arc<Vec<IpAddr>>,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<QuicProxyRelay> {
|
||||
// Bind external socket on the real port
|
||||
let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?;
|
||||
external_socket.set_nonblocking(true)?;
|
||||
let external_socket = Arc::new(
|
||||
UdpSocket::from_std(external_socket)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?,
|
||||
);
|
||||
|
||||
// Bind quinn on localhost ephemeral port
|
||||
let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
|
||||
let quinn_internal_addr = internal_socket.local_addr()?;
|
||||
|
||||
let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?;
|
||||
let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto));
|
||||
|
||||
let endpoint = Endpoint::new(
|
||||
quinn::EndpointConfig::default(),
|
||||
Some(server_config),
|
||||
internal_socket,
|
||||
quinn::default_runtime()
|
||||
.ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?,
|
||||
)?;
|
||||
|
||||
let real_client_map = Arc::new(DashMap::new());
|
||||
|
||||
let relay_task = tokio::spawn(quic_proxy_relay_loop(
|
||||
external_socket,
|
||||
quinn_internal_addr,
|
||||
proxy_ips,
|
||||
Arc::clone(&real_client_map),
|
||||
cancel,
|
||||
));
|
||||
|
||||
info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr);
|
||||
Ok(QuicProxyRelay { endpoint, relay_task, real_client_map })
|
||||
}
|
||||
|
||||
/// Main relay loop: reads datagrams from the external socket, filters PROXY v2
|
||||
/// headers from trusted proxy IPs, and forwards everything else to quinn via
|
||||
/// per-session relay sockets.
|
||||
async fn quic_proxy_relay_loop(
|
||||
external_socket: Arc<UdpSocket>,
|
||||
quinn_internal_addr: SocketAddr,
|
||||
proxy_ips: Arc<Vec<IpAddr>>,
|
||||
real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
// Maps external source addr → real client addr (from PROXY v2 headers)
|
||||
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
|
||||
// Maps external source addr → relay session
|
||||
let relay_sessions: DashMap<SocketAddr, Arc<RelaySession>> = DashMap::new();
|
||||
let epoch = Instant::now();
|
||||
let mut buf = vec![0u8; 65535];
|
||||
|
||||
// Inline cleanup: periodically scan relay_sessions for stale entries
|
||||
let mut last_cleanup = Instant::now();
|
||||
let cleanup_interval = std::time::Duration::from_secs(30);
|
||||
let session_timeout_ms: u64 = 120_000;
|
||||
|
||||
loop {
|
||||
let (len, src_addr) = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
debug!("QUIC proxy relay loop cancelled");
|
||||
break;
|
||||
}
|
||||
result = external_socket.recv_from(&mut buf) => {
|
||||
match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!("QUIC proxy relay recv error: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let datagram = &buf[..len];
|
||||
|
||||
// PROXY v2 handling: only on first datagram from a trusted proxy IP
|
||||
// (before a relay session exists for this source)
|
||||
if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() {
|
||||
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
|
||||
match crate::proxy_protocol::parse_v2(datagram) {
|
||||
Ok((header, _consumed)) => {
|
||||
debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr);
|
||||
proxy_addr_map.insert(src_addr, header.source_addr);
|
||||
continue; // consume the PROXY v2 datagram
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Determine real client address
|
||||
let real_client = proxy_addr_map.get(&src_addr)
|
||||
.map(|r| *r)
|
||||
.unwrap_or(src_addr);
|
||||
|
||||
// Get or create relay session for this external source
|
||||
let session = match relay_sessions.get(&src_addr) {
|
||||
Some(s) => {
|
||||
s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||
Arc::clone(s.value())
|
||||
}
|
||||
None => {
|
||||
// Create new relay socket connected to quinn's internal address
|
||||
let relay_socket = match UdpSocket::bind("127.0.0.1:0").await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!("QUIC relay: failed to bind relay socket: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(e) = relay_socket.connect(quinn_internal_addr).await {
|
||||
warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e);
|
||||
continue;
|
||||
}
|
||||
let relay_local_addr = match relay_socket.local_addr() {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
warn!("QUIC relay: failed to get relay socket local addr: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let relay_socket = Arc::new(relay_socket);
|
||||
|
||||
// Store the real client mapping for the QUIC accept loop
|
||||
real_client_map.insert(relay_local_addr, real_client);
|
||||
|
||||
// Spawn return-path relay: quinn -> external socket -> original source
|
||||
let session_cancel = cancel.child_token();
|
||||
let return_task = tokio::spawn(relay_return_path(
|
||||
Arc::clone(&relay_socket),
|
||||
Arc::clone(&external_socket),
|
||||
src_addr,
|
||||
session_cancel.child_token(),
|
||||
));
|
||||
|
||||
let session = Arc::new(RelaySession {
|
||||
socket: relay_socket,
|
||||
last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64),
|
||||
return_task,
|
||||
cancel: session_cancel,
|
||||
});
|
||||
|
||||
relay_sessions.insert(src_addr, Arc::clone(&session));
|
||||
debug!("QUIC relay: new session for {} (relay {}), real client {}",
|
||||
src_addr, relay_local_addr, real_client);
|
||||
|
||||
session
|
||||
}
|
||||
};
|
||||
|
||||
// Forward datagram to quinn via the relay socket
|
||||
if let Err(e) = session.socket.send(datagram).await {
|
||||
debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e);
|
||||
}
|
||||
|
||||
// Periodic cleanup of stale relay sessions
|
||||
if last_cleanup.elapsed() >= cleanup_interval {
|
||||
last_cleanup = Instant::now();
|
||||
let now_ms = epoch.elapsed().as_millis() as u64;
|
||||
let stale_keys: Vec<SocketAddr> = relay_sessions.iter()
|
||||
.filter(|entry| {
|
||||
let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed));
|
||||
age > session_timeout_ms
|
||||
})
|
||||
.map(|entry| *entry.key())
|
||||
.collect();
|
||||
|
||||
for key in stale_keys {
|
||||
if let Some((_, session)) = relay_sessions.remove(&key) {
|
||||
session.cancel.cancel();
|
||||
session.return_task.abort();
|
||||
// Clean up real_client_map entry
|
||||
if let Ok(addr) = session.socket.local_addr() {
|
||||
real_client_map.remove(&addr);
|
||||
}
|
||||
proxy_addr_map.remove(&key);
|
||||
debug!("QUIC relay: cleaned up stale session for {}", key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown: cancel all relay sessions
|
||||
for entry in relay_sessions.iter() {
|
||||
entry.value().cancel.cancel();
|
||||
entry.value().return_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// Return-path relay: receives datagrams from quinn (via the relay socket)
|
||||
/// and forwards them back to the external client through the external socket.
|
||||
async fn relay_return_path(
|
||||
relay_socket: Arc<UdpSocket>,
|
||||
external_socket: Arc<UdpSocket>,
|
||||
external_src_addr: SocketAddr,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
let mut buf = vec![0u8; 65535];
|
||||
loop {
|
||||
let len = tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
result = relay_socket.recv(&mut buf) => {
|
||||
match result {
|
||||
Ok(len) => len,
|
||||
Err(e) => {
|
||||
debug!("QUIC relay return recv error for {}: {}", external_src_addr, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await {
|
||||
debug!("QUIC relay return send error to {}: {}", external_src_addr, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== QUIC accept loop =====
|
||||
|
||||
/// Run the QUIC accept loop for a single endpoint.
|
||||
///
|
||||
/// Accepts incoming QUIC connections and spawns a task per connection.
|
||||
/// When `real_client_map` is provided, it is consulted to resolve real client
|
||||
/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr).
|
||||
pub async fn quic_accept_loop(
|
||||
endpoint: Endpoint,
|
||||
port: u16,
|
||||
@@ -58,6 +331,7 @@ pub async fn quic_accept_loop(
|
||||
conn_tracker: Arc<ConnectionTracker>,
|
||||
cancel: CancellationToken,
|
||||
h3_service: Option<Arc<H3ProxyService>>,
|
||||
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
|
||||
) {
|
||||
loop {
|
||||
let incoming = tokio::select! {
|
||||
@@ -77,11 +351,16 @@ pub async fn quic_accept_loop(
|
||||
};
|
||||
|
||||
let remote_addr = incoming.remote_address();
|
||||
let ip = remote_addr.ip();
|
||||
|
||||
// Resolve real client IP from PROXY protocol map if available
|
||||
let real_addr = real_client_map.as_ref()
|
||||
.and_then(|map| map.get(&remote_addr).map(|r| *r))
|
||||
.unwrap_or(remote_addr);
|
||||
let ip = real_addr.ip();
|
||||
|
||||
// Per-IP rate limiting
|
||||
if !conn_tracker.try_accept(&ip) {
|
||||
debug!("QUIC connection rejected from {} (rate limit)", remote_addr);
|
||||
debug!("QUIC connection rejected from {} (rate limit)", real_addr);
|
||||
// Drop `incoming` to refuse the connection
|
||||
continue;
|
||||
}
|
||||
@@ -104,7 +383,7 @@ pub async fn quic_accept_loop(
|
||||
let route = match rm.find_route(&ctx) {
|
||||
Some(m) => m.route.clone(),
|
||||
None => {
|
||||
debug!("No QUIC route matched for port {} from {}", port, remote_addr);
|
||||
debug!("No QUIC route matched for port {} from {}", port, real_addr);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -117,11 +396,12 @@ pub async fn quic_accept_loop(
|
||||
let conn_tracker = Arc::clone(&conn_tracker);
|
||||
let cancel = cancel.child_token();
|
||||
let h3_svc = h3_service.clone();
|
||||
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
||||
|
||||
tokio::spawn(async move {
|
||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc).await {
|
||||
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
|
||||
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
|
||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
|
||||
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
||||
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
@@ -144,10 +424,11 @@ async fn handle_quic_connection(
|
||||
metrics: Arc<MetricsCollector>,
|
||||
cancel: &CancellationToken,
|
||||
h3_service: Option<Arc<H3ProxyService>>,
|
||||
real_client_addr: Option<SocketAddr>,
|
||||
) -> anyhow::Result<()> {
|
||||
let connection = incoming.await?;
|
||||
let remote_addr = connection.remote_address();
|
||||
debug!("QUIC connection established from {}", remote_addr);
|
||||
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||
debug!("QUIC connection established from {}", effective_addr);
|
||||
|
||||
// Check if this route has HTTP/3 enabled
|
||||
let enable_http3 = route.action.udp.as_ref()
|
||||
@@ -158,7 +439,7 @@ async fn handle_quic_connection(
|
||||
if enable_http3 {
|
||||
if let Some(ref h3_svc) = h3_service {
|
||||
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
|
||||
h3_svc.handle_connection(connection, &route, port).await
|
||||
h3_svc.handle_connection(connection, &route, port, real_client_addr).await
|
||||
} else {
|
||||
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
|
||||
// Keep connection alive until cancelled
|
||||
@@ -172,7 +453,7 @@ async fn handle_quic_connection(
|
||||
}
|
||||
} else {
|
||||
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
|
||||
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
|
||||
handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,8 +468,9 @@ async fn handle_quic_stream_forwarding(
|
||||
port: u16,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
cancel: &CancellationToken,
|
||||
real_client_addr: Option<SocketAddr>,
|
||||
) -> anyhow::Result<()> {
|
||||
let remote_addr = connection.remote_address();
|
||||
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||
let metrics_arc = metrics;
|
||||
|
||||
@@ -209,7 +491,7 @@ async fn handle_quic_stream_forwarding(
|
||||
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
|
||||
Err(quinn::ConnectionError::LocallyClosed) => break,
|
||||
Err(e) => {
|
||||
debug!("QUIC stream accept error from {}: {}", remote_addr, e);
|
||||
debug!("QUIC stream accept error from {}: {}", effective_addr, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -217,7 +499,7 @@ async fn handle_quic_stream_forwarding(
|
||||
};
|
||||
|
||||
let backend_addr = backend_addr.clone();
|
||||
let ip_str = remote_addr.ip().to_string();
|
||||
let ip_str = effective_addr.ip().to_string();
|
||||
let stream_metrics = Arc::clone(&metrics_arc);
|
||||
let stream_route_id = route_id.map(|s| s.to_string());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user