diff --git a/changelog.md b/changelog.md index 6720e8d..14d21e7 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-19 - 25.17.0 - feat(rustproxy-passthrough) +add PROXY protocol v2 client IP handling for UDP and QUIC listeners + +- propagate trusted proxy IP configuration into UDP and QUIC listener managers +- extract and preserve real client addresses from PROXY protocol v2 headers for HTTP/3 and QUIC stream handling +- apply rate limiting, session limits, routing, and metrics using the resolved client IP while preserving correct proxy return-path routing + ## 2026-03-19 - 25.16.3 - fix(rustproxy) upgrade fallback UDP listeners to QUIC when TLS certificates become available diff --git a/rust/crates/rustproxy-http/src/h3_service.rs b/rust/crates/rustproxy-http/src/h3_service.rs index 86beec5..8a940ac 100644 --- a/rust/crates/rustproxy-http/src/h3_service.rs +++ b/rust/crates/rustproxy-http/src/h3_service.rs @@ -4,6 +4,7 @@ //! and forwards them to backends using the same routing and pool infrastructure //! as the HTTP/1+2 proxy. +use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -61,13 +62,17 @@ impl H3ProxyService { } /// Handle an accepted QUIC connection as HTTP/3. + /// + /// If `real_client_addr` is provided (from PROXY protocol), it overrides + /// `connection.remote_address()` for client IP attribution. pub async fn handle_connection( &self, connection: quinn::Connection, _fallback_route: &RouteConfig, port: u16, + real_client_addr: Option, ) -> anyhow::Result<()> { - let remote_addr = connection.remote_address(); + let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address()); debug!("HTTP/3 connection from {} on port {}", remote_addr, port); let mut h3_conn: h3::server::Connection = diff --git a/rust/crates/rustproxy-passthrough/src/quic_handler.rs b/rust/crates/rustproxy-passthrough/src/quic_handler.rs index bbc48f7..d228348 100644 --- a/rust/crates/rustproxy-passthrough/src/quic_handler.rs +++ b/rust/crates/rustproxy-passthrough/src/quic_handler.rs @@ -3,13 +3,21 @@ //! Manages QUIC endpoints (via quinn), accepts connections, and either: //! - Forwards streams bidirectionally to TCP backends (QUIC termination) //! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5) +//! +//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2 +//! headers before they reach quinn, extracting real client IPs for attribution. -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Instant; use tokio::io::AsyncWriteExt; +use tokio::net::UdpSocket; +use tokio::task::JoinHandle; use arc_swap::ArcSwap; +use dashmap::DashMap; use quinn::{Endpoint, ServerConfig as QuinnServerConfig}; use rustls::ServerConfig as RustlsServerConfig; use tokio_util::sync::CancellationToken; @@ -47,9 +55,274 @@ pub fn create_quic_endpoint( Ok(endpoint) } +// ===== PROXY protocol relay for QUIC ===== + +/// Result of creating a QUIC endpoint with a PROXY protocol relay layer. +pub struct QuicProxyRelay { + /// The quinn endpoint (bound to 127.0.0.1:ephemeral). + pub endpoint: Endpoint, + /// The relay recv loop task handle. + pub relay_task: JoinHandle<()>, + /// Maps relay socket local addr → real client SocketAddr (from PROXY v2). + /// Consulted by `quic_accept_loop` to resolve real client IPs. + pub real_client_map: Arc>, +} + +/// A single relay session for forwarding datagrams between an external source +/// and the internal quinn endpoint. +struct RelaySession { + socket: Arc, + last_activity: AtomicU64, + return_task: JoinHandle<()>, + cancel: CancellationToken, +} + +/// Create a QUIC endpoint with a PROXY protocol v2 relay layer. +/// +/// Instead of giving the external socket to quinn, we: +/// 1. Bind a raw UDP socket on 0.0.0.0:port (external) +/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral) +/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams +/// +/// Only used when `proxy_ips` is non-empty. +pub fn create_quic_endpoint_with_proxy_relay( + port: u16, + tls_config: Arc, + proxy_ips: Arc>, + cancel: CancellationToken, +) -> anyhow::Result { + // Bind external socket on the real port + let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?; + external_socket.set_nonblocking(true)?; + let external_socket = Arc::new( + UdpSocket::from_std(external_socket) + .map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?, + ); + + // Bind quinn on localhost ephemeral port + let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?; + let quinn_internal_addr = internal_socket.local_addr()?; + + let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config) + .map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?; + let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto)); + + let endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + Some(server_config), + internal_socket, + quinn::default_runtime() + .ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?, + )?; + + let real_client_map = Arc::new(DashMap::new()); + + let relay_task = tokio::spawn(quic_proxy_relay_loop( + external_socket, + quinn_internal_addr, + proxy_ips, + Arc::clone(&real_client_map), + cancel, + )); + + info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr); + Ok(QuicProxyRelay { endpoint, relay_task, real_client_map }) +} + +/// Main relay loop: reads datagrams from the external socket, filters PROXY v2 +/// headers from trusted proxy IPs, and forwards everything else to quinn via +/// per-session relay sockets. +async fn quic_proxy_relay_loop( + external_socket: Arc, + quinn_internal_addr: SocketAddr, + proxy_ips: Arc>, + real_client_map: Arc>, + cancel: CancellationToken, +) { + // Maps external source addr → real client addr (from PROXY v2 headers) + let proxy_addr_map: DashMap = DashMap::new(); + // Maps external source addr → relay session + let relay_sessions: DashMap> = DashMap::new(); + let epoch = Instant::now(); + let mut buf = vec![0u8; 65535]; + + // Inline cleanup: periodically scan relay_sessions for stale entries + let mut last_cleanup = Instant::now(); + let cleanup_interval = std::time::Duration::from_secs(30); + let session_timeout_ms: u64 = 120_000; + + loop { + let (len, src_addr) = tokio::select! { + _ = cancel.cancelled() => { + debug!("QUIC proxy relay loop cancelled"); + break; + } + result = external_socket.recv_from(&mut buf) => { + match result { + Ok(r) => r, + Err(e) => { + warn!("QUIC proxy relay recv error: {}", e); + continue; + } + } + } + }; + + let datagram = &buf[..len]; + + // PROXY v2 handling: only on first datagram from a trusted proxy IP + // (before a relay session exists for this source) + if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() { + if crate::proxy_protocol::is_proxy_protocol_v2(datagram) { + match crate::proxy_protocol::parse_v2(datagram) { + Ok((header, _consumed)) => { + debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr); + proxy_addr_map.insert(src_addr, header.source_addr); + continue; // consume the PROXY v2 datagram + } + Err(e) => { + debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e); + } + } + } + } + + // Determine real client address + let real_client = proxy_addr_map.get(&src_addr) + .map(|r| *r) + .unwrap_or(src_addr); + + // Get or create relay session for this external source + let session = match relay_sessions.get(&src_addr) { + Some(s) => { + s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed); + Arc::clone(s.value()) + } + None => { + // Create new relay socket connected to quinn's internal address + let relay_socket = match UdpSocket::bind("127.0.0.1:0").await { + Ok(s) => s, + Err(e) => { + warn!("QUIC relay: failed to bind relay socket: {}", e); + continue; + } + }; + if let Err(e) = relay_socket.connect(quinn_internal_addr).await { + warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e); + continue; + } + let relay_local_addr = match relay_socket.local_addr() { + Ok(a) => a, + Err(e) => { + warn!("QUIC relay: failed to get relay socket local addr: {}", e); + continue; + } + }; + let relay_socket = Arc::new(relay_socket); + + // Store the real client mapping for the QUIC accept loop + real_client_map.insert(relay_local_addr, real_client); + + // Spawn return-path relay: quinn -> external socket -> original source + let session_cancel = cancel.child_token(); + let return_task = tokio::spawn(relay_return_path( + Arc::clone(&relay_socket), + Arc::clone(&external_socket), + src_addr, + session_cancel.child_token(), + )); + + let session = Arc::new(RelaySession { + socket: relay_socket, + last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64), + return_task, + cancel: session_cancel, + }); + + relay_sessions.insert(src_addr, Arc::clone(&session)); + debug!("QUIC relay: new session for {} (relay {}), real client {}", + src_addr, relay_local_addr, real_client); + + session + } + }; + + // Forward datagram to quinn via the relay socket + if let Err(e) = session.socket.send(datagram).await { + debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e); + } + + // Periodic cleanup of stale relay sessions + if last_cleanup.elapsed() >= cleanup_interval { + last_cleanup = Instant::now(); + let now_ms = epoch.elapsed().as_millis() as u64; + let stale_keys: Vec = relay_sessions.iter() + .filter(|entry| { + let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed)); + age > session_timeout_ms + }) + .map(|entry| *entry.key()) + .collect(); + + for key in stale_keys { + if let Some((_, session)) = relay_sessions.remove(&key) { + session.cancel.cancel(); + session.return_task.abort(); + // Clean up real_client_map entry + if let Ok(addr) = session.socket.local_addr() { + real_client_map.remove(&addr); + } + proxy_addr_map.remove(&key); + debug!("QUIC relay: cleaned up stale session for {}", key); + } + } + } + } + + // Shutdown: cancel all relay sessions + for entry in relay_sessions.iter() { + entry.value().cancel.cancel(); + entry.value().return_task.abort(); + } +} + +/// Return-path relay: receives datagrams from quinn (via the relay socket) +/// and forwards them back to the external client through the external socket. +async fn relay_return_path( + relay_socket: Arc, + external_socket: Arc, + external_src_addr: SocketAddr, + cancel: CancellationToken, +) { + let mut buf = vec![0u8; 65535]; + loop { + let len = tokio::select! { + _ = cancel.cancelled() => break, + result = relay_socket.recv(&mut buf) => { + match result { + Ok(len) => len, + Err(e) => { + debug!("QUIC relay return recv error for {}: {}", external_src_addr, e); + break; + } + } + } + }; + + if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await { + debug!("QUIC relay return send error to {}: {}", external_src_addr, e); + break; + } + } +} + +// ===== QUIC accept loop ===== + /// Run the QUIC accept loop for a single endpoint. /// /// Accepts incoming QUIC connections and spawns a task per connection. +/// When `real_client_map` is provided, it is consulted to resolve real client +/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr). pub async fn quic_accept_loop( endpoint: Endpoint, port: u16, @@ -58,6 +331,7 @@ pub async fn quic_accept_loop( conn_tracker: Arc, cancel: CancellationToken, h3_service: Option>, + real_client_map: Option>>, ) { loop { let incoming = tokio::select! { @@ -77,11 +351,16 @@ pub async fn quic_accept_loop( }; let remote_addr = incoming.remote_address(); - let ip = remote_addr.ip(); + + // Resolve real client IP from PROXY protocol map if available + let real_addr = real_client_map.as_ref() + .and_then(|map| map.get(&remote_addr).map(|r| *r)) + .unwrap_or(remote_addr); + let ip = real_addr.ip(); // Per-IP rate limiting if !conn_tracker.try_accept(&ip) { - debug!("QUIC connection rejected from {} (rate limit)", remote_addr); + debug!("QUIC connection rejected from {} (rate limit)", real_addr); // Drop `incoming` to refuse the connection continue; } @@ -104,7 +383,7 @@ pub async fn quic_accept_loop( let route = match rm.find_route(&ctx) { Some(m) => m.route.clone(), None => { - debug!("No QUIC route matched for port {} from {}", port, remote_addr); + debug!("No QUIC route matched for port {} from {}", port, real_addr); continue; } }; @@ -117,11 +396,12 @@ pub async fn quic_accept_loop( let conn_tracker = Arc::clone(&conn_tracker); let cancel = cancel.child_token(); let h3_svc = h3_service.clone(); + let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None }; tokio::spawn(async move { - match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc).await { - Ok(()) => debug!("QUIC connection from {} completed", remote_addr), - Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e), + match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await { + Ok(()) => debug!("QUIC connection from {} completed", real_addr), + Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e), } // Cleanup @@ -144,10 +424,11 @@ async fn handle_quic_connection( metrics: Arc, cancel: &CancellationToken, h3_service: Option>, + real_client_addr: Option, ) -> anyhow::Result<()> { let connection = incoming.await?; - let remote_addr = connection.remote_address(); - debug!("QUIC connection established from {}", remote_addr); + let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address()); + debug!("QUIC connection established from {}", effective_addr); // Check if this route has HTTP/3 enabled let enable_http3 = route.action.udp.as_ref() @@ -158,7 +439,7 @@ async fn handle_quic_connection( if enable_http3 { if let Some(ref h3_svc) = h3_service { debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name); - h3_svc.handle_connection(connection, &route, port).await + h3_svc.handle_connection(connection, &route, port, real_client_addr).await } else { warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name); // Keep connection alive until cancelled @@ -172,7 +453,7 @@ async fn handle_quic_connection( } } else { // Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend - handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await + handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await } } @@ -187,8 +468,9 @@ async fn handle_quic_stream_forwarding( port: u16, metrics: Arc, cancel: &CancellationToken, + real_client_addr: Option, ) -> anyhow::Result<()> { - let remote_addr = connection.remote_address(); + let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address()); let route_id = route.name.as_deref().or(route.id.as_deref()); let metrics_arc = metrics; @@ -209,7 +491,7 @@ async fn handle_quic_stream_forwarding( Err(quinn::ConnectionError::ApplicationClosed(_)) => break, Err(quinn::ConnectionError::LocallyClosed) => break, Err(e) => { - debug!("QUIC stream accept error from {}: {}", remote_addr, e); + debug!("QUIC stream accept error from {}: {}", effective_addr, e); break; } } @@ -217,7 +499,7 @@ async fn handle_quic_stream_forwarding( }; let backend_addr = backend_addr.clone(); - let ip_str = remote_addr.ip().to_string(); + let ip_str = effective_addr.ip().to_string(); let stream_metrics = Arc::clone(&metrics_arc); let stream_route_id = route_id.map(|s| s.to_string()); diff --git a/rust/crates/rustproxy-passthrough/src/udp_listener.rs b/rust/crates/rustproxy-passthrough/src/udp_listener.rs index b6225d7..f9697a5 100644 --- a/rust/crates/rustproxy-passthrough/src/udp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/udp_listener.rs @@ -2,12 +2,17 @@ //! //! Binds UDP sockets on configured ports, receives datagrams, matches routes, //! tracks sessions (flows), and forwards datagrams to backend UDP sockets. +//! +//! Supports PROXY protocol v2 on both raw UDP and QUIC paths when `proxy_ips` +//! is configured. For QUIC, a relay layer intercepts datagrams before they +//! reach the quinn endpoint. use std::collections::HashMap; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::Ordering; use std::sync::Arc; +use dashmap::DashMap; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use arc_swap::ArcSwap; @@ -48,6 +53,9 @@ pub struct UdpListenerManager { relay_reader_cancel: Option, /// H3 proxy service for HTTP/3 request handling h3_service: Option>, + /// Trusted proxy IPs that may send PROXY protocol v2 headers. + /// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths. + proxy_ips: Arc>, } impl Drop for UdpListenerManager { @@ -80,9 +88,18 @@ impl UdpListenerManager { relay_writer: Arc::new(Mutex::new(None)), relay_reader_cancel: None, h3_service: None, + proxy_ips: Arc::new(Vec::new()), } } + /// Set the trusted proxy IPs for PROXY protocol v2 detection. + pub fn set_proxy_ips(&mut self, ips: Vec) { + if !ips.is_empty() { + info!("UDP/QUIC PROXY protocol v2 enabled for {} trusted IPs", ips.len()); + } + self.proxy_ips = Arc::new(ips); + } + /// Set the H3 proxy service for HTTP/3 request handling. pub fn set_h3_service(&mut self, svc: Arc) { self.h3_service = Some(svc); @@ -122,20 +139,44 @@ impl UdpListenerManager { if has_quic { if let Some(tls) = tls_config { - // Create QUIC endpoint; clone it so we can hot-swap TLS later - let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?; - let endpoint_for_updates = endpoint.clone(); // quinn::Endpoint is Arc-based - let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( - endpoint, - port, - Arc::clone(&self.route_manager), - Arc::clone(&self.metrics), - Arc::clone(&self.conn_tracker), - self.cancel_token.child_token(), - self.h3_service.clone(), - )); - self.listeners.insert(port, (handle, Some(endpoint_for_updates))); - info!("QUIC endpoint started on port {}", port); + if self.proxy_ips.is_empty() { + // Direct path: quinn owns the external socket (zero overhead) + let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?; + let endpoint_for_updates = endpoint.clone(); + let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( + endpoint, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + self.cancel_token.child_token(), + self.h3_service.clone(), + None, + )); + self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + info!("QUIC endpoint started on port {}", port); + } else { + // Proxy relay path: we own external socket, quinn on localhost + let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay( + port, + tls, + Arc::clone(&self.proxy_ips), + self.cancel_token.child_token(), + )?; + let endpoint_for_updates = relay.endpoint.clone(); + let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( + relay.endpoint, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + self.cancel_token.child_token(), + self.h3_service.clone(), + Some(relay.real_client_map), + )); + self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + info!("QUIC endpoint with PROXY relay started on port {}", port); + } return Ok(()); } else { warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port); @@ -158,6 +199,7 @@ impl UdpListenerManager { Arc::clone(&self.datagram_handler_relay), Arc::clone(&self.relay_writer), self.cancel_token.child_token(), + Arc::clone(&self.proxy_ips), )); self.listeners.insert(port, (handle, None)); @@ -262,19 +304,14 @@ impl UdpListenerManager { tokio::task::yield_now().await; // Create QUIC endpoint on the now-free port - match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) { - Ok(endpoint) => { - let endpoint_for_updates = endpoint.clone(); - let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( - endpoint, - port, - Arc::clone(&self.route_manager), - Arc::clone(&self.metrics), - Arc::clone(&self.conn_tracker), - self.cancel_token.child_token(), - self.h3_service.clone(), - )); - self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + let create_result = if self.proxy_ips.is_empty() { + self.create_quic_direct(port, Arc::clone(&tls_config)) + } else { + self.create_quic_with_relay(port, Arc::clone(&tls_config)) + }; + + match create_result { + Ok(()) => { info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port); } Err(e) => { @@ -282,19 +319,14 @@ impl UdpListenerManager { warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e); tokio::time::sleep(std::time::Duration::from_millis(50)).await; - match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) { - Ok(endpoint) => { - let endpoint_for_updates = endpoint.clone(); - let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( - endpoint, - port, - Arc::clone(&self.route_manager), - Arc::clone(&self.metrics), - Arc::clone(&self.conn_tracker), - self.cancel_token.child_token(), - self.h3_service.clone(), - )); - self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + let retry_result = if self.proxy_ips.is_empty() { + self.create_quic_direct(port, Arc::clone(&tls_config)) + } else { + self.create_quic_with_relay(port, Arc::clone(&tls_config)) + }; + + match retry_result { + Ok(()) => { info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port); } Err(e2) => { @@ -311,6 +343,47 @@ impl UdpListenerManager { } } + /// Create a direct QUIC endpoint (quinn owns the socket). + fn create_quic_direct(&mut self, port: u16, tls_config: Arc) -> anyhow::Result<()> { + let endpoint = crate::quic_handler::create_quic_endpoint(port, tls_config)?; + let endpoint_for_updates = endpoint.clone(); + let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( + endpoint, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + self.cancel_token.child_token(), + self.h3_service.clone(), + None, + )); + self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + Ok(()) + } + + /// Create a QUIC endpoint with PROXY protocol relay. + fn create_quic_with_relay(&mut self, port: u16, tls_config: Arc) -> anyhow::Result<()> { + let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay( + port, + tls_config, + Arc::clone(&self.proxy_ips), + self.cancel_token.child_token(), + )?; + let endpoint_for_updates = relay.endpoint.clone(); + let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( + relay.endpoint, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + self.cancel_token.child_token(), + self.h3_service.clone(), + Some(relay.real_client_map), + )); + self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + Ok(()) + } + /// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails). async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> { let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into(); @@ -327,6 +400,7 @@ impl UdpListenerManager { Arc::clone(&self.datagram_handler_relay), Arc::clone(&self.relay_writer), self.cancel_token.child_token(), + Arc::clone(&self.proxy_ips), )); self.listeners.insert(port, (handle, None)); @@ -407,6 +481,10 @@ impl UdpListenerManager { } /// Main receive loop for a UDP port. + /// + /// When `proxy_ips` is non-empty, the first datagram from a trusted proxy IP + /// is checked for PROXY protocol v2. If found, the real client IP is extracted + /// and used for all subsequent session handling for that source address. async fn recv_loop( socket: Arc, port: u16, @@ -417,10 +495,15 @@ impl UdpListenerManager { _datagram_handler_relay: Arc>>, relay_writer: Arc>>, cancel: CancellationToken, + proxy_ips: Arc>, ) { // Use a reasonably large buffer; actual max is per-route but we need a single buffer let mut buf = vec![0u8; 65535]; + // Maps proxy source addr → real client addr (from PROXY v2 headers). + // Only populated when proxy_ips is non-empty. + let proxy_addr_map: DashMap = DashMap::new(); + loop { let (len, client_addr) = tokio::select! { _ = cancel.cancelled() => { @@ -440,9 +523,39 @@ impl UdpListenerManager { let datagram = &buf[..len]; - // Route matching + // PROXY protocol v2 detection for datagrams from trusted proxy IPs + let effective_client_ip = if !proxy_ips.is_empty() && proxy_ips.contains(&client_addr.ip()) { + let session_key: SessionKey = (client_addr, port); + if session_table.get(&session_key).is_none() && !proxy_addr_map.contains_key(&client_addr) { + // No session and no prior PROXY header — check for PROXY v2 + if crate::proxy_protocol::is_proxy_protocol_v2(datagram) { + match crate::proxy_protocol::parse_v2(datagram) { + Ok((header, _consumed)) => { + debug!("UDP PROXY v2 from {}: real client {}", client_addr, header.source_addr); + proxy_addr_map.insert(client_addr, header.source_addr); + continue; // discard the PROXY v2 datagram + } + Err(e) => { + debug!("UDP PROXY v2 parse error from {}: {}", client_addr, e); + client_addr.ip() + } + } + } else { + client_addr.ip() + } + } else { + // Use real client IP if we've previously seen a PROXY v2 header + proxy_addr_map.get(&client_addr) + .map(|r| r.ip()) + .unwrap_or_else(|| client_addr.ip()) + } + } else { + client_addr.ip() + }; + + // Route matching — use effective (real) client IP let rm = route_manager.load(); - let ip_str = client_addr.ip().to_string(); + let ip_str = effective_client_ip.to_string(); let ctx = MatchContext { port, domain: None, @@ -491,20 +604,21 @@ impl UdpListenerManager { } // Session lookup or create + // Session key uses the proxy's source addr for correct return-path routing let session_key: SessionKey = (client_addr, port); let session = match session_table.get(&session_key) { Some(s) => s, None => { - // New session — check per-IP limits - if !conn_tracker.try_accept(&client_addr.ip()) { - debug!("UDP session rejected for {} (rate limit)", client_addr); + // New session — check per-IP limits using the real client IP + if !conn_tracker.try_accept(&effective_client_ip) { + debug!("UDP session rejected for {} (rate limit)", effective_client_ip); continue; } if !session_table.can_create_session( - &client_addr.ip(), + &effective_client_ip, udp_config.max_sessions_per_ip, ) { - debug!("UDP session rejected for {} (per-IP session limit)", client_addr); + debug!("UDP session rejected for {} (per-IP session limit)", effective_client_ip); continue; } @@ -537,8 +651,8 @@ impl UdpListenerManager { } let backend_socket = Arc::new(backend_socket); - debug!("New UDP session: {} -> {} (via port {})", - client_addr, backend_addr, port); + debug!("New UDP session: {} -> {} (via port {}, real client {})", + client_addr, backend_addr, port, effective_client_ip); // Spawn return-path relay task let session_cancel = CancellationToken::new(); @@ -558,7 +672,7 @@ impl UdpListenerManager { last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()), created_at: std::time::Instant::now(), route_id: route_id.map(|s| s.to_string()), - source_ip: client_addr.ip(), + source_ip: effective_client_ip, client_addr, return_task, cancel: session_cancel, @@ -569,8 +683,8 @@ impl UdpListenerManager { continue; } - // Track in metrics - conn_tracker.connection_opened(&client_addr.ip()); + // Track in metrics using the real client IP + conn_tracker.connection_opened(&effective_client_ip); metrics.connection_opened(route_id, Some(&ip_str)); metrics.udp_session_opened(); diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index a45c58f..e2fa057 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -264,6 +264,8 @@ impl RustProxy { conn_config.socket_timeout_ms, conn_config.max_connection_lifetime_ms, ); + // Clone proxy_ips before conn_config is moved into the TCP listener + let udp_proxy_ips = conn_config.proxy_ips.clone(); listener.set_connection_config(conn_config); // Share the socket-handler relay path with the listener @@ -339,6 +341,7 @@ impl RustProxy { conn_tracker, self.cancel_token.clone(), ); + udp_mgr.set_proxy_ips(udp_proxy_ips.clone()); // Construct H3ProxyService for HTTP/3 request handling let h3_svc = rustproxy_http::h3_service::H3ProxyService::new( @@ -774,12 +777,15 @@ impl RustProxy { if self.udp_listener_manager.is_none() { if let Some(ref listener) = self.listener_manager { let conn_tracker = listener.conn_tracker().clone(); - self.udp_listener_manager = Some(UdpListenerManager::new( + let conn_config = Self::build_connection_config(&self.options); + let mut udp_mgr = UdpListenerManager::new( Arc::clone(&new_manager), Arc::clone(&self.metrics), conn_tracker, self.cancel_token.clone(), - )); + ); + udp_mgr.set_proxy_ips(conn_config.proxy_ips); + self.udp_listener_manager = Some(udp_mgr); } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 4cf54a3..9deab00 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.16.3', + version: '25.17.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }