Compare commits

...

16 Commits

Author SHA1 Message Date
2fce910795 v25.17.7
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 07:50:41 +00:00
ff09cef350 fix(readme): document QUIC and HTTP/3 compatibility caveats 2026-03-20 07:50:41 +00:00
d0148b2ac3 v25.17.6
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 07:48:26 +00:00
7217e15649 fix(rustproxy-http): disable HTTP/3 GREASE for client and server connections 2026-03-20 07:48:26 +00:00
bfcf92a855 v25.17.5
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 07:43:32 +00:00
8e0804cd20 fix(rustproxy): add HTTP/3 integration test for QUIC response stream FIN handling 2026-03-20 07:43:32 +00:00
c63f6fcd5f v25.17.4
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 03:19:57 +00:00
f3cd4d193e fix(rustproxy-http): prevent HTTP/3 response body streaming from hanging on backend completion 2026-03-20 03:19:57 +00:00
81de611255 v25.17.3
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 02:54:44 +00:00
91598b3be9 fix(repository): no changes detected 2026-03-20 02:54:44 +00:00
4e3c548012 v25.17.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 02:53:41 +00:00
1a2d7529db fix(rustproxy-http): enable TLS connections for HTTP/3 upstream requests when backend re-encryption or TLS is configured 2026-03-20 02:53:41 +00:00
31514f54ae v25.17.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 02:35:22 +00:00
247653c9d0 fix(rustproxy-routing): allow QUIC UDP TLS connections without SNI to match domain-restricted routes 2026-03-20 02:35:22 +00:00
07d88f6f6a v25.17.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 23:16:42 +00:00
4b64de2c67 feat(rustproxy-passthrough): add PROXY protocol v2 client IP handling for UDP and QUIC listeners 2026-03-19 23:16:42 +00:00
13 changed files with 883 additions and 96 deletions

View File

@@ -1,5 +1,55 @@
# Changelog
## 2026-03-20 - 25.17.7 - fix(readme)
document QUIC and HTTP/3 compatibility caveats
- Add notes explaining that GREASE frames are disabled on both server and client HTTP/3 paths to avoid interoperability issues
- Document that the current HTTP/3 stack depends on pre-1.0 h3 ecosystem components and may still have rough edges
## 2026-03-20 - 25.17.6 - fix(rustproxy-http)
disable HTTP/3 GREASE for client and server connections
- Switch the HTTP/3 server connection setup to use the builder API with send_grease(false)
- Switch the HTTP/3 client handshake to use the builder API with send_grease(false) to improve compatibility
## 2026-03-20 - 25.17.5 - fix(rustproxy)
add HTTP/3 integration test for QUIC response stream FIN handling
- adds an integration test covering HTTP/3 proxying over QUIC with TLS termination
- verifies response bodies fully arrive and the client receives stream termination instead of hanging
- adds test-only dependencies for quinn, h3, h3-quinn, rustls, bytes, and http
## 2026-03-20 - 25.17.4 - fix(rustproxy-http)
prevent HTTP/3 response body streaming from hanging on backend completion
- extract and track Content-Length before consuming the response body
- stop the HTTP/3 body loop when the stream reports end-of-stream or the expected byte count has been sent
- add a per-frame idle timeout to avoid indefinite waits on stalled or close-delimited backend bodies
## 2026-03-20 - 25.17.3 - fix(repository)
no changes detected
## 2026-03-20 - 25.17.2 - fix(rustproxy-http)
enable TLS connections for HTTP/3 upstream requests when backend re-encryption or TLS is configured
- Pass backend TLS client configuration into the HTTP/3 request handler.
- Detect TLS-required upstream targets using route and target TLS settings before connecting.
- Build backend request URIs with the correct http or https scheme to match the upstream connection.
## 2026-03-20 - 25.17.1 - fix(rustproxy-routing)
allow QUIC UDP TLS connections without SNI to match domain-restricted routes
- Exempts UDP transport from the no-SNI rejection logic because QUIC encrypts the TLS ClientHello and SNI is unavailable at accept time
- Adds regression tests to confirm QUIC route matching succeeds without SNI while TCP TLS without SNI remains rejected
## 2026-03-19 - 25.17.0 - feat(rustproxy-passthrough)
add PROXY protocol v2 client IP handling for UDP and QUIC listeners
- propagate trusted proxy IP configuration into UDP and QUIC listener managers
- extract and preserve real client addresses from PROXY protocol v2 headers for HTTP/3 and QUIC stream handling
- apply rate limiting, session limits, routing, and metrics using the resolved client IP while preserving correct proxy return-path routing
## 2026-03-19 - 25.16.3 - fix(rustproxy)
upgrade fallback UDP listeners to QUIC when TLS certificates become available

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "25.16.3",
"version": "25.17.7",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",

View File

@@ -1111,6 +1111,10 @@ SmartProxy searches for the Rust binary in this order:
5. Local dev build (`./rust/target/release/rustproxy`)
6. System PATH (`rustproxy`)
### QUIC / HTTP3 Caveats
- **GREASE frames are disabled.** The underlying h3 crate sends [GREASE frames](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) by default to test protocol extensibility. However, some HTTP/3 clients and servers don't properly ignore unknown frame types, causing 400/500 errors or stream hangs ([h3#206](https://github.com/hyperium/h3/issues/206)). SmartProxy disables GREASE on both the server side (for incoming H3 requests) and the client side (for H3 backend connections) to maximize compatibility.
- **HTTP/3 is pre-release.** The h3 ecosystem (h3 0.0.8, h3-quinn 0.0.10, quinn 0.11) is still pre-1.0. Expect rough edges.
### Performance Tuning
- ✅ Use NFTables forwarding for high-traffic routes (Linux only)
- ✅ Enable connection keep-alive where appropriate

4
rust/Cargo.lock generated
View File

@@ -1224,10 +1224,14 @@ dependencies = [
"bytes",
"clap",
"dashmap",
"h3",
"h3-quinn",
"http",
"http-body-util",
"hyper",
"hyper-util",
"mimalloc",
"quinn",
"rcgen",
"rustls",
"rustls-pemfile",

View File

@@ -4,6 +4,7 @@
//! and forwards them to backends using the same routing and pool infrastructure
//! as the HTTP/1+2 proxy.
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -35,7 +36,6 @@ pub struct H3ProxyService {
protocol_cache: Arc<ProtocolCache>,
#[allow(dead_code)]
upstream_selector: UpstreamSelector,
#[allow(dead_code)]
backend_tls_config: Arc<rustls::ClientConfig>,
connect_timeout: Duration,
}
@@ -61,17 +61,23 @@ impl H3ProxyService {
}
/// Handle an accepted QUIC connection as HTTP/3.
///
/// If `real_client_addr` is provided (from PROXY protocol), it overrides
/// `connection.remote_address()` for client IP attribution.
pub async fn handle_connection(
&self,
connection: quinn::Connection,
_fallback_route: &RouteConfig,
port: u16,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let remote_addr = connection.remote_address();
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
h3::server::Connection::new(h3_quinn::Connection::new(connection))
h3::server::builder()
.send_grease(false)
.build(h3_quinn::Connection::new(connection))
.await
.map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?;
@@ -93,12 +99,14 @@ impl H3ProxyService {
let rm = self.route_manager.load();
let pool = Arc::clone(&self.connection_pool);
let metrics = Arc::clone(&self.metrics);
let backend_tls = Arc::clone(&self.backend_tls_config);
let connect_timeout = self.connect_timeout;
let client_ip = client_ip.clone();
tokio::spawn(async move {
if let Err(e) = handle_h3_request(
request, stream, port, &client_ip, &rm, &pool, &metrics, connect_timeout,
request, stream, port, &client_ip, &rm, &pool, &metrics,
&backend_tls, connect_timeout,
).await {
debug!("HTTP/3 request error from {}: {}", client_ip, e);
}
@@ -128,6 +136,7 @@ async fn handle_h3_request(
route_manager: &RouteManager,
_connection_pool: &ConnectionPool,
metrics: &MetricsCollector,
backend_tls_config: &Arc<rustls::ClientConfig>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
let method = request.method().clone();
@@ -168,7 +177,15 @@ async fn handle_h3_request(
let backend_port = target.port.resolve(port);
let backend_addr = format!("{}:{}", backend_host, backend_port);
// Connect to backend via TCP HTTP/1.1 with timeout
// Determine if backend requires TLS (same logic as proxy_service.rs)
let mut use_tls = target.tls.is_some();
if let Some(ref tls) = route.action.tls {
if tls.mode == rustproxy_config::TlsMode::TerminateAndReencrypt {
use_tls = true;
}
}
// Connect to backend via TCP with timeout
let tcp_stream = tokio::time::timeout(
connect_timeout,
tokio::net::TcpStream::connect(&backend_addr),
@@ -178,15 +195,27 @@ async fn handle_h3_request(
let _ = tcp_stream.set_nodelay(true);
let io = hyper_util::rt::TokioIo::new(tcp_stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
tokio::spawn(async move {
if let Err(e) = conn.await {
debug!("Backend connection closed: {}", e);
}
});
// Branch: wrap in TLS if backend requires it, then HTTP/1.1 handshake.
// hyper's SendRequest<B> is NOT generic over the IO type, so both branches
// produce the same type and can be unified.
let mut sender = if use_tls {
let connector = tokio_rustls::TlsConnector::from(Arc::clone(backend_tls_config));
let server_name = rustls::pki_types::ServerName::try_from(backend_host.to_string())
.map_err(|e| anyhow::anyhow!("Invalid backend SNI '{}': {}", backend_host, e))?;
let tls_stream = connector.connect(server_name, tcp_stream).await
.map_err(|e| anyhow::anyhow!("Backend TLS handshake to {} failed: {}", backend_addr, e))?;
let io = hyper_util::rt::TokioIo::new(tls_stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
tokio::spawn(async move { let _ = conn.await; });
sender
} else {
let io = hyper_util::rt::TokioIo::new(tcp_stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
tokio::spawn(async move { let _ = conn.await; });
sender
};
// Stream request body from H3 client to backend via an mpsc channel.
// This avoids buffering the entire request body in memory.
@@ -209,7 +238,7 @@ async fn handle_h3_request(
// Create a body that polls from the mpsc receiver
let body = H3RequestBody { receiver: body_rx };
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body, use_tls)?;
let response = sender.send_request(backend_req).await
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
@@ -232,6 +261,12 @@ async fn handle_h3_request(
h3_response = h3_response.header(name, value);
}
// Extract content-length for body loop termination (must be before into_body())
let content_length: Option<u64> = response.headers()
.get(hyper::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok());
// Add Alt-Svc for HTTP/3 advertisement
let alt_svc = route.action.udp.as_ref()
.and_then(|u| u.quic.as_ref())
@@ -252,21 +287,52 @@ async fn handle_h3_request(
// Stream response body back
use http_body_util::BodyExt;
use http_body::Body as _;
let mut body = response.into_body();
let mut total_bytes_out: u64 = 0;
while let Some(frame) = body.frame().await {
match frame {
Ok(frame) => {
// Per-frame idle timeout: if no frame arrives within this duration, assume
// the body is complete (or the backend has stalled). This prevents indefinite
// hangs on close-delimited bodies or when hyper's internal trailers oneshot
// never resolves after all data has been received.
const FRAME_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
loop {
// Layer 1: If the body already knows it is finished (Content-Length
// bodies track remaining bytes internally), break immediately to
// avoid blocking on hyper's internal trailers oneshot.
if body.is_end_stream() {
break;
}
// Layer 3: Per-frame idle timeout safety net
match tokio::time::timeout(FRAME_IDLE_TIMEOUT, body.frame()).await {
Ok(Some(Ok(frame))) => {
if let Some(data) = frame.data_ref() {
total_bytes_out += data.len() as u64;
stream.send_data(Bytes::copy_from_slice(data)).await
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
// Layer 2: Content-Length byte count check
if let Some(cl) = content_length {
if total_bytes_out >= cl {
break;
}
}
}
}
Err(e) => {
Ok(Some(Err(e))) => {
warn!("Backend body read error: {}", e);
break;
}
Ok(None) => break, // Body ended naturally
Err(_) => {
debug!(
"H3 body frame idle timeout ({:?}) after {} bytes; finishing stream",
FRAME_IDLE_TIMEOUT, total_bytes_out
);
break;
}
}
}
@@ -289,10 +355,12 @@ fn build_backend_request<B>(
host: &str,
original_request: &hyper::Request<()>,
body: B,
use_tls: bool,
) -> anyhow::Result<hyper::Request<B>> {
let scheme = if use_tls { "https" } else { "http" };
let mut req = hyper::Request::builder()
.method(method)
.uri(format!("http://{}{}", backend_addr, path))
.uri(format!("{}://{}{}", scheme, backend_addr, path))
.header("host", host);
// Forward non-pseudo headers

View File

@@ -2550,7 +2550,11 @@ impl HttpProxyService {
backend_key: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, mut send_request) = match h3::client::new(h3_quinn_conn).await {
let (mut driver, mut send_request) = match h3::client::builder()
.send_grease(false)
.build(h3_quinn_conn)
.await
{
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");

View File

@@ -3,13 +3,21 @@
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
//!
//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2
//! headers before they reach quinn, extracting real client IPs for attribution.
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::io::AsyncWriteExt;
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
use rustls::ServerConfig as RustlsServerConfig;
use tokio_util::sync::CancellationToken;
@@ -47,9 +55,274 @@ pub fn create_quic_endpoint(
Ok(endpoint)
}
// ===== PROXY protocol relay for QUIC =====
/// Result of creating a QUIC endpoint with a PROXY protocol relay layer.
pub struct QuicProxyRelay {
/// The quinn endpoint (bound to 127.0.0.1:ephemeral).
pub endpoint: Endpoint,
/// The relay recv loop task handle.
pub relay_task: JoinHandle<()>,
/// Maps relay socket local addr → real client SocketAddr (from PROXY v2).
/// Consulted by `quic_accept_loop` to resolve real client IPs.
pub real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
}
/// A single relay session for forwarding datagrams between an external source
/// and the internal quinn endpoint.
struct RelaySession {
socket: Arc<UdpSocket>,
last_activity: AtomicU64,
return_task: JoinHandle<()>,
cancel: CancellationToken,
}
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
///
/// Instead of giving the external socket to quinn, we:
/// 1. Bind a raw UDP socket on 0.0.0.0:port (external)
/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral)
/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams
///
/// Only used when `proxy_ips` is non-empty.
pub fn create_quic_endpoint_with_proxy_relay(
port: u16,
tls_config: Arc<RustlsServerConfig>,
proxy_ips: Arc<Vec<IpAddr>>,
cancel: CancellationToken,
) -> anyhow::Result<QuicProxyRelay> {
// Bind external socket on the real port
let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?;
external_socket.set_nonblocking(true)?;
let external_socket = Arc::new(
UdpSocket::from_std(external_socket)
.map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?,
);
// Bind quinn on localhost ephemeral port
let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
let quinn_internal_addr = internal_socket.local_addr()?;
let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config)
.map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?;
let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto));
let endpoint = Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
internal_socket,
quinn::default_runtime()
.ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?,
)?;
let real_client_map = Arc::new(DashMap::new());
let relay_task = tokio::spawn(quic_proxy_relay_loop(
external_socket,
quinn_internal_addr,
proxy_ips,
Arc::clone(&real_client_map),
cancel,
));
info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr);
Ok(QuicProxyRelay { endpoint, relay_task, real_client_map })
}
/// Main relay loop: reads datagrams from the external socket, filters PROXY v2
/// headers from trusted proxy IPs, and forwards everything else to quinn via
/// per-session relay sockets.
async fn quic_proxy_relay_loop(
external_socket: Arc<UdpSocket>,
quinn_internal_addr: SocketAddr,
proxy_ips: Arc<Vec<IpAddr>>,
real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
cancel: CancellationToken,
) {
// Maps external source addr → real client addr (from PROXY v2 headers)
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
// Maps external source addr → relay session
let relay_sessions: DashMap<SocketAddr, Arc<RelaySession>> = DashMap::new();
let epoch = Instant::now();
let mut buf = vec![0u8; 65535];
// Inline cleanup: periodically scan relay_sessions for stale entries
let mut last_cleanup = Instant::now();
let cleanup_interval = std::time::Duration::from_secs(30);
let session_timeout_ms: u64 = 120_000;
loop {
let (len, src_addr) = tokio::select! {
_ = cancel.cancelled() => {
debug!("QUIC proxy relay loop cancelled");
break;
}
result = external_socket.recv_from(&mut buf) => {
match result {
Ok(r) => r,
Err(e) => {
warn!("QUIC proxy relay recv error: {}", e);
continue;
}
}
}
};
let datagram = &buf[..len];
// PROXY v2 handling: only on first datagram from a trusted proxy IP
// (before a relay session exists for this source)
if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() {
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
match crate::proxy_protocol::parse_v2(datagram) {
Ok((header, _consumed)) => {
debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr);
proxy_addr_map.insert(src_addr, header.source_addr);
continue; // consume the PROXY v2 datagram
}
Err(e) => {
debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e);
}
}
}
}
// Determine real client address
let real_client = proxy_addr_map.get(&src_addr)
.map(|r| *r)
.unwrap_or(src_addr);
// Get or create relay session for this external source
let session = match relay_sessions.get(&src_addr) {
Some(s) => {
s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed);
Arc::clone(s.value())
}
None => {
// Create new relay socket connected to quinn's internal address
let relay_socket = match UdpSocket::bind("127.0.0.1:0").await {
Ok(s) => s,
Err(e) => {
warn!("QUIC relay: failed to bind relay socket: {}", e);
continue;
}
};
if let Err(e) = relay_socket.connect(quinn_internal_addr).await {
warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e);
continue;
}
let relay_local_addr = match relay_socket.local_addr() {
Ok(a) => a,
Err(e) => {
warn!("QUIC relay: failed to get relay socket local addr: {}", e);
continue;
}
};
let relay_socket = Arc::new(relay_socket);
// Store the real client mapping for the QUIC accept loop
real_client_map.insert(relay_local_addr, real_client);
// Spawn return-path relay: quinn -> external socket -> original source
let session_cancel = cancel.child_token();
let return_task = tokio::spawn(relay_return_path(
Arc::clone(&relay_socket),
Arc::clone(&external_socket),
src_addr,
session_cancel.child_token(),
));
let session = Arc::new(RelaySession {
socket: relay_socket,
last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64),
return_task,
cancel: session_cancel,
});
relay_sessions.insert(src_addr, Arc::clone(&session));
debug!("QUIC relay: new session for {} (relay {}), real client {}",
src_addr, relay_local_addr, real_client);
session
}
};
// Forward datagram to quinn via the relay socket
if let Err(e) = session.socket.send(datagram).await {
debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e);
}
// Periodic cleanup of stale relay sessions
if last_cleanup.elapsed() >= cleanup_interval {
last_cleanup = Instant::now();
let now_ms = epoch.elapsed().as_millis() as u64;
let stale_keys: Vec<SocketAddr> = relay_sessions.iter()
.filter(|entry| {
let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed));
age > session_timeout_ms
})
.map(|entry| *entry.key())
.collect();
for key in stale_keys {
if let Some((_, session)) = relay_sessions.remove(&key) {
session.cancel.cancel();
session.return_task.abort();
// Clean up real_client_map entry
if let Ok(addr) = session.socket.local_addr() {
real_client_map.remove(&addr);
}
proxy_addr_map.remove(&key);
debug!("QUIC relay: cleaned up stale session for {}", key);
}
}
}
}
// Shutdown: cancel all relay sessions
for entry in relay_sessions.iter() {
entry.value().cancel.cancel();
entry.value().return_task.abort();
}
}
/// Return-path relay: receives datagrams from quinn (via the relay socket)
/// and forwards them back to the external client through the external socket.
async fn relay_return_path(
relay_socket: Arc<UdpSocket>,
external_socket: Arc<UdpSocket>,
external_src_addr: SocketAddr,
cancel: CancellationToken,
) {
let mut buf = vec![0u8; 65535];
loop {
let len = tokio::select! {
_ = cancel.cancelled() => break,
result = relay_socket.recv(&mut buf) => {
match result {
Ok(len) => len,
Err(e) => {
debug!("QUIC relay return recv error for {}: {}", external_src_addr, e);
break;
}
}
}
};
if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await {
debug!("QUIC relay return send error to {}: {}", external_src_addr, e);
break;
}
}
}
// ===== QUIC accept loop =====
/// Run the QUIC accept loop for a single endpoint.
///
/// Accepts incoming QUIC connections and spawns a task per connection.
/// When `real_client_map` is provided, it is consulted to resolve real client
/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr).
pub async fn quic_accept_loop(
endpoint: Endpoint,
port: u16,
@@ -58,6 +331,7 @@ pub async fn quic_accept_loop(
conn_tracker: Arc<ConnectionTracker>,
cancel: CancellationToken,
h3_service: Option<Arc<H3ProxyService>>,
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
) {
loop {
let incoming = tokio::select! {
@@ -77,11 +351,16 @@ pub async fn quic_accept_loop(
};
let remote_addr = incoming.remote_address();
let ip = remote_addr.ip();
// Resolve real client IP from PROXY protocol map if available
let real_addr = real_client_map.as_ref()
.and_then(|map| map.get(&remote_addr).map(|r| *r))
.unwrap_or(remote_addr);
let ip = real_addr.ip();
// Per-IP rate limiting
if !conn_tracker.try_accept(&ip) {
debug!("QUIC connection rejected from {} (rate limit)", remote_addr);
debug!("QUIC connection rejected from {} (rate limit)", real_addr);
// Drop `incoming` to refuse the connection
continue;
}
@@ -104,7 +383,7 @@ pub async fn quic_accept_loop(
let route = match rm.find_route(&ctx) {
Some(m) => m.route.clone(),
None => {
debug!("No QUIC route matched for port {} from {}", port, remote_addr);
debug!("No QUIC route matched for port {} from {}", port, real_addr);
continue;
}
};
@@ -117,11 +396,12 @@ pub async fn quic_accept_loop(
let conn_tracker = Arc::clone(&conn_tracker);
let cancel = cancel.child_token();
let h3_svc = h3_service.clone();
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
tokio::spawn(async move {
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc).await {
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
}
// Cleanup
@@ -144,10 +424,11 @@ async fn handle_quic_connection(
metrics: Arc<MetricsCollector>,
cancel: &CancellationToken,
h3_service: Option<Arc<H3ProxyService>>,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let connection = incoming.await?;
let remote_addr = connection.remote_address();
debug!("QUIC connection established from {}", remote_addr);
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("QUIC connection established from {}", effective_addr);
// Check if this route has HTTP/3 enabled
let enable_http3 = route.action.udp.as_ref()
@@ -158,7 +439,7 @@ async fn handle_quic_connection(
if enable_http3 {
if let Some(ref h3_svc) = h3_service {
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
h3_svc.handle_connection(connection, &route, port).await
h3_svc.handle_connection(connection, &route, port, real_client_addr).await
} else {
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
// Keep connection alive until cancelled
@@ -172,7 +453,7 @@ async fn handle_quic_connection(
}
} else {
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await
}
}
@@ -187,8 +468,9 @@ async fn handle_quic_stream_forwarding(
port: u16,
metrics: Arc<MetricsCollector>,
cancel: &CancellationToken,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let remote_addr = connection.remote_address();
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
let route_id = route.name.as_deref().or(route.id.as_deref());
let metrics_arc = metrics;
@@ -209,7 +491,7 @@ async fn handle_quic_stream_forwarding(
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(quinn::ConnectionError::LocallyClosed) => break,
Err(e) => {
debug!("QUIC stream accept error from {}: {}", remote_addr, e);
debug!("QUIC stream accept error from {}: {}", effective_addr, e);
break;
}
}
@@ -217,7 +499,7 @@ async fn handle_quic_stream_forwarding(
};
let backend_addr = backend_addr.clone();
let ip_str = remote_addr.ip().to_string();
let ip_str = effective_addr.ip().to_string();
let stream_metrics = Arc::clone(&metrics_arc);
let stream_route_id = route_id.map(|s| s.to_string());

View File

@@ -2,12 +2,17 @@
//!
//! Binds UDP sockets on configured ports, receives datagrams, matches routes,
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
//!
//! Supports PROXY protocol v2 on both raw UDP and QUIC paths when `proxy_ips`
//! is configured. For QUIC, a relay layer intercepts datagrams before they
//! reach the quinn endpoint.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use dashmap::DashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use arc_swap::ArcSwap;
@@ -48,6 +53,9 @@ pub struct UdpListenerManager {
relay_reader_cancel: Option<CancellationToken>,
/// H3 proxy service for HTTP/3 request handling
h3_service: Option<Arc<H3ProxyService>>,
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
proxy_ips: Arc<Vec<IpAddr>>,
}
impl Drop for UdpListenerManager {
@@ -80,9 +88,18 @@ impl UdpListenerManager {
relay_writer: Arc::new(Mutex::new(None)),
relay_reader_cancel: None,
h3_service: None,
proxy_ips: Arc::new(Vec::new()),
}
}
/// Set the trusted proxy IPs for PROXY protocol v2 detection.
pub fn set_proxy_ips(&mut self, ips: Vec<IpAddr>) {
if !ips.is_empty() {
info!("UDP/QUIC PROXY protocol v2 enabled for {} trusted IPs", ips.len());
}
self.proxy_ips = Arc::new(ips);
}
/// Set the H3 proxy service for HTTP/3 request handling.
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
self.h3_service = Some(svc);
@@ -122,20 +139,44 @@ impl UdpListenerManager {
if has_quic {
if let Some(tls) = tls_config {
// Create QUIC endpoint; clone it so we can hot-swap TLS later
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
let endpoint_for_updates = endpoint.clone(); // quinn::Endpoint is Arc-based
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint started on port {}", port);
if self.proxy_ips.is_empty() {
// Direct path: quinn owns the external socket (zero overhead)
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
None,
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint started on port {}", port);
} else {
// Proxy relay path: we own external socket, quinn on localhost
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
port,
tls,
Arc::clone(&self.proxy_ips),
self.cancel_token.child_token(),
)?;
let endpoint_for_updates = relay.endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
relay.endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
Some(relay.real_client_map),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint with PROXY relay started on port {}", port);
}
return Ok(());
} else {
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
@@ -158,6 +199,7 @@ impl UdpListenerManager {
Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(),
Arc::clone(&self.proxy_ips),
));
self.listeners.insert(port, (handle, None));
@@ -262,19 +304,14 @@ impl UdpListenerManager {
tokio::task::yield_now().await;
// Create QUIC endpoint on the now-free port
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
Ok(endpoint) => {
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
let create_result = if self.proxy_ips.is_empty() {
self.create_quic_direct(port, Arc::clone(&tls_config))
} else {
self.create_quic_with_relay(port, Arc::clone(&tls_config))
};
match create_result {
Ok(()) => {
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
}
Err(e) => {
@@ -282,19 +319,14 @@ impl UdpListenerManager {
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
Ok(endpoint) => {
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
let retry_result = if self.proxy_ips.is_empty() {
self.create_quic_direct(port, Arc::clone(&tls_config))
} else {
self.create_quic_with_relay(port, Arc::clone(&tls_config))
};
match retry_result {
Ok(()) => {
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
}
Err(e2) => {
@@ -311,6 +343,47 @@ impl UdpListenerManager {
}
}
/// Create a direct QUIC endpoint (quinn owns the socket).
fn create_quic_direct(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls_config)?;
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
None,
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
Ok(())
}
/// Create a QUIC endpoint with PROXY protocol relay.
fn create_quic_with_relay(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
port,
tls_config,
Arc::clone(&self.proxy_ips),
self.cancel_token.child_token(),
)?;
let endpoint_for_updates = relay.endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
relay.endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
Some(relay.real_client_map),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
Ok(())
}
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
@@ -327,6 +400,7 @@ impl UdpListenerManager {
Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(),
Arc::clone(&self.proxy_ips),
));
self.listeners.insert(port, (handle, None));
@@ -407,6 +481,10 @@ impl UdpListenerManager {
}
/// Main receive loop for a UDP port.
///
/// When `proxy_ips` is non-empty, the first datagram from a trusted proxy IP
/// is checked for PROXY protocol v2. If found, the real client IP is extracted
/// and used for all subsequent session handling for that source address.
async fn recv_loop(
socket: Arc<UdpSocket>,
port: u16,
@@ -417,10 +495,15 @@ impl UdpListenerManager {
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
cancel: CancellationToken,
proxy_ips: Arc<Vec<IpAddr>>,
) {
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
let mut buf = vec![0u8; 65535];
// Maps proxy source addr → real client addr (from PROXY v2 headers).
// Only populated when proxy_ips is non-empty.
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
loop {
let (len, client_addr) = tokio::select! {
_ = cancel.cancelled() => {
@@ -440,9 +523,39 @@ impl UdpListenerManager {
let datagram = &buf[..len];
// Route matching
// PROXY protocol v2 detection for datagrams from trusted proxy IPs
let effective_client_ip = if !proxy_ips.is_empty() && proxy_ips.contains(&client_addr.ip()) {
let session_key: SessionKey = (client_addr, port);
if session_table.get(&session_key).is_none() && !proxy_addr_map.contains_key(&client_addr) {
// No session and no prior PROXY header — check for PROXY v2
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
match crate::proxy_protocol::parse_v2(datagram) {
Ok((header, _consumed)) => {
debug!("UDP PROXY v2 from {}: real client {}", client_addr, header.source_addr);
proxy_addr_map.insert(client_addr, header.source_addr);
continue; // discard the PROXY v2 datagram
}
Err(e) => {
debug!("UDP PROXY v2 parse error from {}: {}", client_addr, e);
client_addr.ip()
}
}
} else {
client_addr.ip()
}
} else {
// Use real client IP if we've previously seen a PROXY v2 header
proxy_addr_map.get(&client_addr)
.map(|r| r.ip())
.unwrap_or_else(|| client_addr.ip())
}
} else {
client_addr.ip()
};
// Route matching — use effective (real) client IP
let rm = route_manager.load();
let ip_str = client_addr.ip().to_string();
let ip_str = effective_client_ip.to_string();
let ctx = MatchContext {
port,
domain: None,
@@ -491,20 +604,21 @@ impl UdpListenerManager {
}
// Session lookup or create
// Session key uses the proxy's source addr for correct return-path routing
let session_key: SessionKey = (client_addr, port);
let session = match session_table.get(&session_key) {
Some(s) => s,
None => {
// New session — check per-IP limits
if !conn_tracker.try_accept(&client_addr.ip()) {
debug!("UDP session rejected for {} (rate limit)", client_addr);
// New session — check per-IP limits using the real client IP
if !conn_tracker.try_accept(&effective_client_ip) {
debug!("UDP session rejected for {} (rate limit)", effective_client_ip);
continue;
}
if !session_table.can_create_session(
&client_addr.ip(),
&effective_client_ip,
udp_config.max_sessions_per_ip,
) {
debug!("UDP session rejected for {} (per-IP session limit)", client_addr);
debug!("UDP session rejected for {} (per-IP session limit)", effective_client_ip);
continue;
}
@@ -537,8 +651,8 @@ impl UdpListenerManager {
}
let backend_socket = Arc::new(backend_socket);
debug!("New UDP session: {} -> {} (via port {})",
client_addr, backend_addr, port);
debug!("New UDP session: {} -> {} (via port {}, real client {})",
client_addr, backend_addr, port, effective_client_ip);
// Spawn return-path relay task
let session_cancel = CancellationToken::new();
@@ -558,7 +672,7 @@ impl UdpListenerManager {
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
created_at: std::time::Instant::now(),
route_id: route_id.map(|s| s.to_string()),
source_ip: client_addr.ip(),
source_ip: effective_client_ip,
client_addr,
return_task,
cancel: session_cancel,
@@ -569,8 +683,8 @@ impl UdpListenerManager {
continue;
}
// Track in metrics
conn_tracker.connection_opened(&client_addr.ip());
// Track in metrics using the real client IP
conn_tracker.connection_opened(&effective_client_ip);
metrics.connection_opened(route_id, Some(&ip_str));
metrics.udp_session_opened();

View File

@@ -122,10 +122,16 @@ impl RouteManager {
// This prevents session-ticket resumption from misrouting when clients
// omit SNI (RFC 8446 recommends but doesn't mandate SNI on resumption).
// Wildcard-only routes (domains: ["*"]) still match since they accept all.
let patterns = domains.to_vec();
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
if !is_wildcard_only {
return false;
//
// Exception: QUIC (UDP transport) encrypts the TLS ClientHello, so SNI
// is unavailable at accept time. Domain verification happens per-request
// in H3ProxyService via the :authority header.
if ctx.transport != Some(TransportProtocol::Udp) {
let patterns = domains.to_vec();
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
if !is_wildcard_only {
return false;
}
}
}
}
@@ -997,4 +1003,52 @@ mod tests {
let result = manager.find_route(&udp_ctx).unwrap();
assert_eq!(result.route.name.as_deref(), Some("udp-route"));
}
#[test]
fn test_quic_tls_no_sni_matches_domain_restricted_route() {
// QUIC accept-level matching: is_tls=true, domain=None, transport=Udp.
// Should match because QUIC encrypts the ClientHello — SNI is unavailable
// at accept time but verified per-request in H3ProxyService.
let mut route = make_route(443, Some("example.com"), 0);
route.route_match.transport = Some(TransportProtocol::Udp);
let routes = vec![route];
let manager = RouteManager::new(routes);
let ctx = MatchContext {
port: 443,
domain: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
is_tls: true,
protocol: Some("quic"),
transport: Some(TransportProtocol::Udp),
};
assert!(manager.find_route(&ctx).is_some(),
"QUIC (UDP) with is_tls=true and domain=None should match domain-restricted routes");
}
#[test]
fn test_tcp_tls_no_sni_still_rejects_domain_restricted_route() {
// TCP TLS without SNI must still be rejected (no QUIC exemption).
let routes = vec![make_route(443, Some("example.com"), 0)];
let manager = RouteManager::new(routes);
let ctx = MatchContext {
port: 443,
domain: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
is_tls: true,
protocol: None,
transport: None, // TCP (default)
};
assert!(manager.find_route(&ctx).is_none(),
"TCP TLS without SNI should NOT match domain-restricted routes");
}
}

View File

@@ -44,3 +44,9 @@ mimalloc = { workspace = true }
[dev-dependencies]
rcgen = { workspace = true }
quinn = { workspace = true }
h3 = { workspace = true }
h3-quinn = { workspace = true }
bytes = { workspace = true }
rustls = { workspace = true }
http = "1"

View File

@@ -264,6 +264,8 @@ impl RustProxy {
conn_config.socket_timeout_ms,
conn_config.max_connection_lifetime_ms,
);
// Clone proxy_ips before conn_config is moved into the TCP listener
let udp_proxy_ips = conn_config.proxy_ips.clone();
listener.set_connection_config(conn_config);
// Share the socket-handler relay path with the listener
@@ -339,6 +341,7 @@ impl RustProxy {
conn_tracker,
self.cancel_token.clone(),
);
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
// Construct H3ProxyService for HTTP/3 request handling
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(
@@ -774,12 +777,15 @@ impl RustProxy {
if self.udp_listener_manager.is_none() {
if let Some(ref listener) = self.listener_manager {
let conn_tracker = listener.conn_tracker().clone();
self.udp_listener_manager = Some(UdpListenerManager::new(
let conn_config = Self::build_connection_config(&self.options);
let mut udp_mgr = UdpListenerManager::new(
Arc::clone(&new_manager),
Arc::clone(&self.metrics),
conn_tracker,
self.cancel_token.clone(),
));
);
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
self.udp_listener_manager = Some(udp_mgr);
}
}

View File

@@ -0,0 +1,195 @@
mod common;
use common::*;
use rustproxy::RustProxy;
use rustproxy_config::{RustProxyOptions, TransportProtocol, RouteUdp, RouteQuic};
use bytes::Buf;
use std::sync::Arc;
/// Build a route that listens on UDP with HTTP/3 enabled and TLS terminate.
fn make_h3_route(
port: u16,
target_host: &str,
target_port: u16,
cert_pem: &str,
key_pem: &str,
) -> rustproxy_config::RouteConfig {
let mut route = make_tls_terminate_route(port, "localhost", target_host, target_port, cert_pem, key_pem);
route.route_match.transport = Some(TransportProtocol::Udp);
// Keep domain="localhost" from make_tls_terminate_route — needed for TLS cert extraction
route.action.udp = Some(RouteUdp {
session_timeout: None,
max_sessions_per_ip: None,
max_datagram_size: None,
quic: Some(RouteQuic {
max_idle_timeout: Some(30000),
max_concurrent_bidi_streams: None,
max_concurrent_uni_streams: None,
enable_http3: Some(true),
alt_svc_port: None,
alt_svc_max_age: None,
initial_congestion_window: None,
}),
});
route
}
/// Build a quinn client endpoint with insecure TLS for testing.
fn make_h3_client_endpoint() -> quinn::Endpoint {
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureVerifier))
.with_no_client_auth();
tls_config.alpn_protocols = vec![b"h3".to_vec()];
let quic_client_config = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("Failed to build QUIC client config");
let client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
.expect("Failed to create QUIC client endpoint");
endpoint.set_default_client_config(client_config);
endpoint
}
/// Test that HTTP/3 response streams properly finish (FIN is received by client).
///
/// This is the critical regression test for the FIN bug: the proxy must send
/// a QUIC stream FIN after the response body so the client's `recv_data()`
/// returns `None` instead of hanging forever.
#[tokio::test]
async fn test_h3_response_stream_finishes() {
let backend_port = next_port();
let proxy_port = next_port();
let body_text = "Hello from HTTP/3 backend! This body has a known length for testing.";
// 1. Start plain HTTP backend with known body + content-length
let _backend = start_http_server(backend_port, 200, body_text).await;
// 2. Generate self-signed cert and configure H3 route
let (cert_pem, key_pem) = generate_self_signed_cert("localhost");
let route = make_h3_route(proxy_port, "127.0.0.1", backend_port, &cert_pem, &key_pem);
let options = RustProxyOptions {
routes: vec![route],
..Default::default()
};
// 3. Start proxy and wait for UDP bind
let mut proxy = RustProxy::new(options).unwrap();
proxy.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// 4. Connect QUIC/H3 client
let endpoint = make_h3_client_endpoint();
let addr: std::net::SocketAddr = format!("127.0.0.1:{}", proxy_port).parse().unwrap();
let connection = endpoint
.connect(addr, "localhost")
.expect("Failed to initiate QUIC connection")
.await
.expect("QUIC handshake failed");
let (mut driver, mut send_request) = h3::client::new(
h3_quinn::Connection::new(connection),
)
.await
.expect("H3 connection setup failed");
// Drive the H3 connection in background
tokio::spawn(async move {
let _ = driver.wait_idle().await;
});
// 5. Send GET request
let req = http::Request::builder()
.method("GET")
.uri("https://localhost/")
.header("host", "localhost")
.body(())
.unwrap();
let mut stream = send_request.send_request(req).await
.expect("Failed to send H3 request");
stream.finish().await
.expect("Failed to finish sending H3 request body");
// 6. Read response headers
let resp = stream.recv_response().await
.expect("Failed to receive H3 response");
assert_eq!(resp.status(), http::StatusCode::OK,
"Expected 200 OK, got {}", resp.status());
// 7. Read body and verify stream ends (FIN received)
// This is the critical assertion: recv_data() must return None (stream ended)
// within the timeout, NOT hang forever waiting for a FIN that never arrives.
let result = with_timeout(async {
let mut total = 0usize;
while let Some(chunk) = stream.recv_data().await.expect("H3 data receive error") {
total += chunk.remaining();
}
// recv_data() returned None => stream ended (FIN received)
total
}, 10)
.await;
let bytes_received = result.expect(
"TIMEOUT: H3 stream never ended (FIN not received by client). \
The proxy sent all response data but failed to send the QUIC stream FIN."
);
assert_eq!(
bytes_received,
body_text.len(),
"Expected {} bytes, got {}",
body_text.len(),
bytes_received
);
// 8. Cleanup
endpoint.close(quinn::VarInt::from_u32(0), b"test done");
proxy.stop().await.unwrap();
}
/// Insecure TLS verifier that accepts any certificate (for tests only).
#[derive(Debug)]
struct InsecureVerifier;
impl rustls::client::danger::ServerCertVerifier for InsecureVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::RSA_PSS_SHA256,
]
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '25.16.3',
version: '25.17.7',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}