Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 33fdf42a70 | |||
| fb1c59ac9a | |||
| ea8224c400 | |||
| da1cc58a3d | |||
| 606c620849 | |||
| 4ae09ac6ae | |||
| 2fce910795 | |||
| ff09cef350 | |||
| d0148b2ac3 | |||
| 7217e15649 | |||
| bfcf92a855 | |||
| 8e0804cd20 | |||
| c63f6fcd5f | |||
| f3cd4d193e | |||
| 81de611255 | |||
| 91598b3be9 | |||
| 4e3c548012 | |||
| 1a2d7529db | |||
| 31514f54ae | |||
| 247653c9d0 | |||
| 07d88f6f6a | |||
| 4b64de2c67 |
71
changelog.md
71
changelog.md
@@ -1,5 +1,76 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.10 - fix(rustproxy-http)
|
||||||
|
reuse the shared HTTP proxy service for HTTP/3 request handling
|
||||||
|
|
||||||
|
- Refactors H3ProxyService to delegate requests to the shared HttpProxyService instead of maintaining separate routing and backend forwarding logic.
|
||||||
|
- Aligns HTTP/3 with the TCP/HTTP path for route matching, connection pooling, and ALPN-based upstream protocol detection.
|
||||||
|
- Generalizes request handling and filters to accept boxed/generic HTTP bodies so both HTTP/3 and existing HTTP paths share the same proxy pipeline.
|
||||||
|
- Updates the HTTP/3 integration route matcher to allow transport matching across shared HTTP and QUIC handling.
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.9 - fix(rustproxy-http)
|
||||||
|
correct HTTP/3 host extraction and avoid protocol filtering during UDP route lookup
|
||||||
|
|
||||||
|
- Use the URI host or strip the port from the Host header so HTTP/3 requests match routes consistently with TCP/HTTP handling.
|
||||||
|
- Remove protocol filtering from HTTP/3 route lookup because QUIC transport already constrains routing to UDP and protocol validation happens earlier.
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.8 - fix(rustproxy)
|
||||||
|
use SNI-based certificate resolution for QUIC TLS connections
|
||||||
|
|
||||||
|
- Replaces static first-certificate selection with the shared CertResolver used by the TCP/TLS path.
|
||||||
|
- Ensures QUIC connections can present the correct certificate per requested domain.
|
||||||
|
- Keeps HTTP/3 ALPN configuration while improving multi-domain TLS handling.
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.7 - fix(readme)
|
||||||
|
document QUIC and HTTP/3 compatibility caveats
|
||||||
|
|
||||||
|
- Add notes explaining that GREASE frames are disabled on both server and client HTTP/3 paths to avoid interoperability issues
|
||||||
|
- Document that the current HTTP/3 stack depends on pre-1.0 h3 ecosystem components and may still have rough edges
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.6 - fix(rustproxy-http)
|
||||||
|
disable HTTP/3 GREASE for client and server connections
|
||||||
|
|
||||||
|
- Switch the HTTP/3 server connection setup to use the builder API with send_grease(false)
|
||||||
|
- Switch the HTTP/3 client handshake to use the builder API with send_grease(false) to improve compatibility
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.5 - fix(rustproxy)
|
||||||
|
add HTTP/3 integration test for QUIC response stream FIN handling
|
||||||
|
|
||||||
|
- adds an integration test covering HTTP/3 proxying over QUIC with TLS termination
|
||||||
|
- verifies response bodies fully arrive and the client receives stream termination instead of hanging
|
||||||
|
- adds test-only dependencies for quinn, h3, h3-quinn, rustls, bytes, and http
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.4 - fix(rustproxy-http)
|
||||||
|
prevent HTTP/3 response body streaming from hanging on backend completion
|
||||||
|
|
||||||
|
- extract and track Content-Length before consuming the response body
|
||||||
|
- stop the HTTP/3 body loop when the stream reports end-of-stream or the expected byte count has been sent
|
||||||
|
- add a per-frame idle timeout to avoid indefinite waits on stalled or close-delimited backend bodies
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.3 - fix(repository)
|
||||||
|
no changes detected
|
||||||
|
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.2 - fix(rustproxy-http)
|
||||||
|
enable TLS connections for HTTP/3 upstream requests when backend re-encryption or TLS is configured
|
||||||
|
|
||||||
|
- Pass backend TLS client configuration into the HTTP/3 request handler.
|
||||||
|
- Detect TLS-required upstream targets using route and target TLS settings before connecting.
|
||||||
|
- Build backend request URIs with the correct http or https scheme to match the upstream connection.
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.1 - fix(rustproxy-routing)
|
||||||
|
allow QUIC UDP TLS connections without SNI to match domain-restricted routes
|
||||||
|
|
||||||
|
- Exempts UDP transport from the no-SNI rejection logic because QUIC encrypts the TLS ClientHello and SNI is unavailable at accept time
|
||||||
|
- Adds regression tests to confirm QUIC route matching succeeds without SNI while TCP TLS without SNI remains rejected
|
||||||
|
|
||||||
|
## 2026-03-19 - 25.17.0 - feat(rustproxy-passthrough)
|
||||||
|
add PROXY protocol v2 client IP handling for UDP and QUIC listeners
|
||||||
|
|
||||||
|
- propagate trusted proxy IP configuration into UDP and QUIC listener managers
|
||||||
|
- extract and preserve real client addresses from PROXY protocol v2 headers for HTTP/3 and QUIC stream handling
|
||||||
|
- apply rate limiting, session limits, routing, and metrics using the resolved client IP while preserving correct proxy return-path routing
|
||||||
|
|
||||||
## 2026-03-19 - 25.16.3 - fix(rustproxy)
|
## 2026-03-19 - 25.16.3 - fix(rustproxy)
|
||||||
upgrade fallback UDP listeners to QUIC when TLS certificates become available
|
upgrade fallback UDP listeners to QUIC when TLS certificates become available
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartproxy",
|
"name": "@push.rocks/smartproxy",
|
||||||
"version": "25.16.3",
|
"version": "25.17.10",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
@@ -1111,6 +1111,10 @@ SmartProxy searches for the Rust binary in this order:
|
|||||||
5. Local dev build (`./rust/target/release/rustproxy`)
|
5. Local dev build (`./rust/target/release/rustproxy`)
|
||||||
6. System PATH (`rustproxy`)
|
6. System PATH (`rustproxy`)
|
||||||
|
|
||||||
|
### QUIC / HTTP3 Caveats
|
||||||
|
- **GREASE frames are disabled.** The underlying h3 crate sends [GREASE frames](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) by default to test protocol extensibility. However, some HTTP/3 clients and servers don't properly ignore unknown frame types, causing 400/500 errors or stream hangs ([h3#206](https://github.com/hyperium/h3/issues/206)). SmartProxy disables GREASE on both the server side (for incoming H3 requests) and the client side (for H3 backend connections) to maximize compatibility.
|
||||||
|
- **HTTP/3 is pre-release.** The h3 ecosystem (h3 0.0.8, h3-quinn 0.0.10, quinn 0.11) is still pre-1.0. Expect rough edges.
|
||||||
|
|
||||||
### Performance Tuning
|
### Performance Tuning
|
||||||
- ✅ Use NFTables forwarding for high-traffic routes (Linux only)
|
- ✅ Use NFTables forwarding for high-traffic routes (Linux only)
|
||||||
- ✅ Enable connection keep-alive where appropriate
|
- ✅ Enable connection keep-alive where appropriate
|
||||||
|
|||||||
4
rust/Cargo.lock
generated
4
rust/Cargo.lock
generated
@@ -1224,10 +1224,14 @@ dependencies = [
|
|||||||
"bytes",
|
"bytes",
|
||||||
"clap",
|
"clap",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
|
"h3",
|
||||||
|
"h3-quinn",
|
||||||
|
"http",
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
|
"quinn",
|
||||||
"rcgen",
|
"rcgen",
|
||||||
"rustls",
|
"rustls",
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
|
|||||||
@@ -1,82 +1,59 @@
|
|||||||
//! HTTP/3 proxy service.
|
//! HTTP/3 proxy service.
|
||||||
//!
|
//!
|
||||||
//! Accepts QUIC connections via quinn, runs h3 server to handle HTTP/3 requests,
|
//! Accepts QUIC connections via quinn, runs h3 server to handle HTTP/3 requests,
|
||||||
//! and forwards them to backends using the same routing and pool infrastructure
|
//! and delegates backend forwarding to the shared `HttpProxyService` — same
|
||||||
//! as the HTTP/1+2 proxy.
|
//! route matching, connection pool, and protocol auto-detection as TCP/HTTP.
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
use http_body::Frame;
|
use http_body::Frame;
|
||||||
|
use http_body_util::BodyExt;
|
||||||
|
use http_body_util::combinators::BoxBody;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
use rustproxy_config::{RouteConfig, TransportProtocol};
|
use rustproxy_config::RouteConfig;
|
||||||
use rustproxy_metrics::MetricsCollector;
|
use tokio_util::sync::CancellationToken;
|
||||||
use rustproxy_routing::{MatchContext, RouteManager};
|
|
||||||
|
|
||||||
use crate::connection_pool::ConnectionPool;
|
use crate::proxy_service::{ConnActivity, HttpProxyService};
|
||||||
use crate::protocol_cache::ProtocolCache;
|
|
||||||
use crate::upstream_selector::UpstreamSelector;
|
|
||||||
|
|
||||||
/// HTTP/3 proxy service.
|
/// HTTP/3 proxy service.
|
||||||
///
|
///
|
||||||
/// Handles QUIC connections with the h3 crate, parses HTTP/3 requests,
|
/// Accepts QUIC connections, parses HTTP/3 requests, and delegates backend
|
||||||
/// and forwards them to backends using per-request route matching and
|
/// forwarding to the shared `HttpProxyService`.
|
||||||
/// shared connection pooling.
|
|
||||||
pub struct H3ProxyService {
|
pub struct H3ProxyService {
|
||||||
route_manager: Arc<ArcSwap<RouteManager>>,
|
http_proxy: Arc<HttpProxyService>,
|
||||||
metrics: Arc<MetricsCollector>,
|
|
||||||
connection_pool: Arc<ConnectionPool>,
|
|
||||||
#[allow(dead_code)]
|
|
||||||
protocol_cache: Arc<ProtocolCache>,
|
|
||||||
#[allow(dead_code)]
|
|
||||||
upstream_selector: UpstreamSelector,
|
|
||||||
#[allow(dead_code)]
|
|
||||||
backend_tls_config: Arc<rustls::ClientConfig>,
|
|
||||||
connect_timeout: Duration,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl H3ProxyService {
|
impl H3ProxyService {
|
||||||
pub fn new(
|
pub fn new(http_proxy: Arc<HttpProxyService>) -> Self {
|
||||||
route_manager: Arc<ArcSwap<RouteManager>>,
|
Self { http_proxy }
|
||||||
metrics: Arc<MetricsCollector>,
|
|
||||||
connection_pool: Arc<ConnectionPool>,
|
|
||||||
protocol_cache: Arc<ProtocolCache>,
|
|
||||||
backend_tls_config: Arc<rustls::ClientConfig>,
|
|
||||||
connect_timeout: Duration,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
route_manager: Arc::clone(&route_manager),
|
|
||||||
metrics: Arc::clone(&metrics),
|
|
||||||
connection_pool,
|
|
||||||
protocol_cache,
|
|
||||||
upstream_selector: UpstreamSelector::new(),
|
|
||||||
backend_tls_config,
|
|
||||||
connect_timeout,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle an accepted QUIC connection as HTTP/3.
|
/// Handle an accepted QUIC connection as HTTP/3.
|
||||||
|
///
|
||||||
|
/// If `real_client_addr` is provided (from PROXY protocol), it overrides
|
||||||
|
/// `connection.remote_address()` for client IP attribution.
|
||||||
pub async fn handle_connection(
|
pub async fn handle_connection(
|
||||||
&self,
|
&self,
|
||||||
connection: quinn::Connection,
|
connection: quinn::Connection,
|
||||||
_fallback_route: &RouteConfig,
|
_fallback_route: &RouteConfig,
|
||||||
port: u16,
|
port: u16,
|
||||||
|
real_client_addr: Option<SocketAddr>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let remote_addr = connection.remote_address();
|
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
||||||
|
|
||||||
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
||||||
h3::server::Connection::new(h3_quinn::Connection::new(connection))
|
h3::server::builder()
|
||||||
|
.send_grease(false)
|
||||||
|
.build(h3_quinn::Connection::new(connection))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?;
|
||||||
|
|
||||||
let client_ip = remote_addr.ip().to_string();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match h3_conn.accept().await {
|
match h3_conn.accept().await {
|
||||||
Ok(Some(resolver)) => {
|
Ok(Some(resolver)) => {
|
||||||
@@ -88,19 +65,13 @@ impl H3ProxyService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.metrics.record_http_request();
|
let http_proxy = Arc::clone(&self.http_proxy);
|
||||||
|
|
||||||
let rm = self.route_manager.load();
|
|
||||||
let pool = Arc::clone(&self.connection_pool);
|
|
||||||
let metrics = Arc::clone(&self.metrics);
|
|
||||||
let connect_timeout = self.connect_timeout;
|
|
||||||
let client_ip = client_ip.clone();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_h3_request(
|
if let Err(e) = handle_h3_request(
|
||||||
request, stream, port, &client_ip, &rm, &pool, &metrics, connect_timeout,
|
request, stream, port, remote_addr, &http_proxy,
|
||||||
).await {
|
).await {
|
||||||
debug!("HTTP/3 request error from {}: {}", client_ip, e);
|
debug!("HTTP/3 request error from {}: {}", remote_addr, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -119,86 +90,27 @@ impl H3ProxyService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a single HTTP/3 request with per-request route matching.
|
/// Handle a single HTTP/3 request by delegating to HttpProxyService.
|
||||||
|
///
|
||||||
|
/// 1. Read the H3 request body via an mpsc channel (streaming, not buffered)
|
||||||
|
/// 2. Build a `hyper::Request<BoxBody>` that HttpProxyService can handle
|
||||||
|
/// 3. Call `HttpProxyService::handle_request` — same route matching, connection
|
||||||
|
/// pool, ALPN protocol detection (H1/H2/H3) as the TCP/HTTP path
|
||||||
|
/// 4. Stream the response back over the H3 stream
|
||||||
async fn handle_h3_request(
|
async fn handle_h3_request(
|
||||||
request: hyper::Request<()>,
|
request: hyper::Request<()>,
|
||||||
mut stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
mut stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
||||||
port: u16,
|
port: u16,
|
||||||
client_ip: &str,
|
peer_addr: SocketAddr,
|
||||||
route_manager: &RouteManager,
|
http_proxy: &HttpProxyService,
|
||||||
_connection_pool: &ConnectionPool,
|
|
||||||
metrics: &MetricsCollector,
|
|
||||||
connect_timeout: Duration,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let method = request.method().clone();
|
// Stream request body from H3 client via an mpsc channel.
|
||||||
let uri = request.uri().clone();
|
|
||||||
let path = uri.path().to_string();
|
|
||||||
|
|
||||||
// Extract host from :authority or Host header
|
|
||||||
let host = request.uri().authority()
|
|
||||||
.map(|a| a.as_str().to_string())
|
|
||||||
.or_else(|| request.headers().get("host").and_then(|v| v.to_str().ok()).map(|s| s.to_string()))
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
debug!("HTTP/3 {} {} (host: {}, client: {})", method, path, host, client_ip);
|
|
||||||
|
|
||||||
// Per-request route matching
|
|
||||||
let ctx = MatchContext {
|
|
||||||
port,
|
|
||||||
domain: if host.is_empty() { None } else { Some(&host) },
|
|
||||||
path: Some(&path),
|
|
||||||
client_ip: Some(client_ip),
|
|
||||||
tls_version: Some("TLSv1.3"),
|
|
||||||
headers: None,
|
|
||||||
is_tls: true,
|
|
||||||
protocol: Some("http"),
|
|
||||||
transport: Some(TransportProtocol::Udp),
|
|
||||||
};
|
|
||||||
|
|
||||||
let route_match = route_manager.find_route(&ctx)
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("No route matched for HTTP/3 request to {}{}", host, path))?;
|
|
||||||
let route = route_match.route;
|
|
||||||
|
|
||||||
// Resolve backend target (use matched target or first target)
|
|
||||||
let target = route_match.target
|
|
||||||
.or_else(|| route.action.targets.as_ref().and_then(|t| t.first()))
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("No target for HTTP/3 route"))?;
|
|
||||||
|
|
||||||
let backend_host = target.host.first();
|
|
||||||
let backend_port = target.port.resolve(port);
|
|
||||||
let backend_addr = format!("{}:{}", backend_host, backend_port);
|
|
||||||
|
|
||||||
// Connect to backend via TCP HTTP/1.1 with timeout
|
|
||||||
let tcp_stream = tokio::time::timeout(
|
|
||||||
connect_timeout,
|
|
||||||
tokio::net::TcpStream::connect(&backend_addr),
|
|
||||||
).await
|
|
||||||
.map_err(|_| anyhow::anyhow!("Backend connect timeout to {}", backend_addr))?
|
|
||||||
.map_err(|e| anyhow::anyhow!("Backend connect to {} failed: {}", backend_addr, e))?;
|
|
||||||
|
|
||||||
let _ = tcp_stream.set_nodelay(true);
|
|
||||||
|
|
||||||
let io = hyper_util::rt::TokioIo::new(tcp_stream);
|
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
|
|
||||||
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = conn.await {
|
|
||||||
debug!("Backend connection closed: {}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Stream request body from H3 client to backend via an mpsc channel.
|
|
||||||
// This avoids buffering the entire request body in memory.
|
|
||||||
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
|
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
|
||||||
let total_bytes_in = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
|
||||||
let total_bytes_in_writer = Arc::clone(&total_bytes_in);
|
|
||||||
|
|
||||||
// Spawn the H3 body reader task
|
// Spawn the H3 body reader task
|
||||||
let body_reader = tokio::spawn(async move {
|
let body_reader = tokio::spawn(async move {
|
||||||
while let Ok(Some(mut chunk)) = stream.recv_data().await {
|
while let Ok(Some(mut chunk)) = stream.recv_data().await {
|
||||||
let data = Bytes::copy_from_slice(chunk.chunk());
|
let data = Bytes::copy_from_slice(chunk.chunk());
|
||||||
total_bytes_in_writer.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
chunk.advance(chunk.remaining());
|
chunk.advance(chunk.remaining());
|
||||||
if body_tx.send(data).await.is_err() {
|
if body_tx.send(data).await.is_err() {
|
||||||
break;
|
break;
|
||||||
@@ -207,106 +119,64 @@ async fn handle_h3_request(
|
|||||||
stream
|
stream
|
||||||
});
|
});
|
||||||
|
|
||||||
// Create a body that polls from the mpsc receiver
|
// Build a hyper::Request<BoxBody> from the H3 request + streaming body.
|
||||||
|
// The URI already has scheme + authority + path set by the h3 crate.
|
||||||
let body = H3RequestBody { receiver: body_rx };
|
let body = H3RequestBody { receiver: body_rx };
|
||||||
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
|
let (parts, _) = request.into_parts();
|
||||||
|
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(body);
|
||||||
|
let req = hyper::Request::from_parts(parts, boxed_body);
|
||||||
|
|
||||||
let response = sender.send_request(backend_req).await
|
// Delegate to HttpProxyService — same backend path as TCP/HTTP:
|
||||||
|
// route matching, ALPN protocol detection, connection pool, H1/H2/H3 auto.
|
||||||
|
let cancel = CancellationToken::new();
|
||||||
|
let conn_activity = ConnActivity::new_standalone();
|
||||||
|
let response = http_proxy.handle_request(req, peer_addr, port, cancel, conn_activity).await
|
||||||
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
||||||
|
|
||||||
// Await the body reader to get the stream back
|
// Await the body reader to get the H3 stream back
|
||||||
let mut stream = body_reader.await
|
let mut stream = body_reader.await
|
||||||
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
|
||||||
let total_bytes_in = total_bytes_in.load(std::sync::atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Build H3 response
|
// Send response headers over H3 (skip hop-by-hop headers)
|
||||||
let status = response.status();
|
let (resp_parts, resp_body) = response.into_parts();
|
||||||
let mut h3_response = hyper::Response::builder().status(status);
|
let mut h3_response = hyper::Response::builder().status(resp_parts.status);
|
||||||
|
for (name, value) in &resp_parts.headers {
|
||||||
// Copy response headers (skip hop-by-hop)
|
let n = name.as_str();
|
||||||
for (name, value) in response.headers() {
|
|
||||||
let n = name.as_str().to_lowercase();
|
|
||||||
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" || n == "upgrade" {
|
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" || n == "upgrade" {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
h3_response = h3_response.header(name, value);
|
h3_response = h3_response.header(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add Alt-Svc for HTTP/3 advertisement
|
|
||||||
let alt_svc = route.action.udp.as_ref()
|
|
||||||
.and_then(|u| u.quic.as_ref())
|
|
||||||
.map(|q| {
|
|
||||||
let p = q.alt_svc_port.unwrap_or(port);
|
|
||||||
let ma = q.alt_svc_max_age.unwrap_or(86400);
|
|
||||||
format!("h3=\":{}\"; ma={}", p, ma)
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|| format!("h3=\":{}\"; ma=86400", port));
|
|
||||||
h3_response = h3_response.header("alt-svc", alt_svc);
|
|
||||||
|
|
||||||
let h3_response = h3_response.body(())
|
let h3_response = h3_response.body(())
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to build H3 response: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to build H3 response: {}", e))?;
|
||||||
|
|
||||||
// Send response headers
|
|
||||||
stream.send_response(h3_response).await
|
stream.send_response(h3_response).await
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to send H3 response: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to send H3 response: {}", e))?;
|
||||||
|
|
||||||
// Stream response body back
|
// Stream response body back over H3
|
||||||
use http_body_util::BodyExt;
|
let mut resp_body = resp_body;
|
||||||
let mut body = response.into_body();
|
while let Some(frame) = resp_body.frame().await {
|
||||||
let mut total_bytes_out: u64 = 0;
|
|
||||||
while let Some(frame) = body.frame().await {
|
|
||||||
match frame {
|
match frame {
|
||||||
Ok(frame) => {
|
Ok(frame) => {
|
||||||
if let Some(data) = frame.data_ref() {
|
if let Some(data) = frame.data_ref() {
|
||||||
total_bytes_out += data.len() as u64;
|
|
||||||
stream.send_data(Bytes::copy_from_slice(data)).await
|
stream.send_data(Bytes::copy_from_slice(data)).await
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Backend body read error: {}", e);
|
warn!("Response body read error: {}", e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record metrics
|
// Finish the H3 stream (send QUIC FIN)
|
||||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
|
||||||
metrics.record_bytes(total_bytes_in, total_bytes_out, route_id, Some(client_ip));
|
|
||||||
|
|
||||||
// Finish the stream
|
|
||||||
stream.finish().await
|
stream.finish().await
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to finish H3 stream: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to finish H3 stream: {}", e))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build an HTTP/1.1 backend request from the H3 frontend request.
|
|
||||||
fn build_backend_request<B>(
|
|
||||||
method: &hyper::Method,
|
|
||||||
backend_addr: &str,
|
|
||||||
path: &str,
|
|
||||||
host: &str,
|
|
||||||
original_request: &hyper::Request<()>,
|
|
||||||
body: B,
|
|
||||||
) -> anyhow::Result<hyper::Request<B>> {
|
|
||||||
let mut req = hyper::Request::builder()
|
|
||||||
.method(method)
|
|
||||||
.uri(format!("http://{}{}", backend_addr, path))
|
|
||||||
.header("host", host);
|
|
||||||
|
|
||||||
// Forward non-pseudo headers
|
|
||||||
for (name, value) in original_request.headers() {
|
|
||||||
let n = name.as_str();
|
|
||||||
if !n.starts_with(':') && n != "host" {
|
|
||||||
req = req.header(name, value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
req.body(body)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to build backend request: {}", e))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A streaming request body backed by an mpsc channel receiver.
|
/// A streaming request body backed by an mpsc channel receiver.
|
||||||
///
|
///
|
||||||
/// Implements `http_body::Body` so hyper can poll chunks as they arrive
|
/// Implements `http_body::Body` so hyper can poll chunks as they arrive
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ use crate::upstream_selector::UpstreamSelector;
|
|||||||
/// Per-connection context for keeping the idle watchdog alive during body streaming.
|
/// Per-connection context for keeping the idle watchdog alive during body streaming.
|
||||||
/// Passed through the forwarding chain so CountingBody can update the timestamp.
|
/// Passed through the forwarding chain so CountingBody can update the timestamp.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct ConnActivity {
|
pub struct ConnActivity {
|
||||||
last_activity: Arc<AtomicU64>,
|
last_activity: Arc<AtomicU64>,
|
||||||
start: std::time::Instant,
|
start: std::time::Instant,
|
||||||
/// Active-request counter from handle_io's idle watchdog. When set, CountingBody
|
/// Active-request counter from handle_io's idle watchdog. When set, CountingBody
|
||||||
@@ -49,6 +49,19 @@ struct ConnActivity {
|
|||||||
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ConnActivity {
|
||||||
|
/// Create a minimal ConnActivity (no idle watchdog, no Alt-Svc cache).
|
||||||
|
/// Used by H3ProxyService where the TCP idle watchdog doesn't apply.
|
||||||
|
pub fn new_standalone() -> Self {
|
||||||
|
Self {
|
||||||
|
last_activity: Arc::new(AtomicU64::new(0)),
|
||||||
|
start: std::time::Instant::now(),
|
||||||
|
active_requests: None,
|
||||||
|
alt_svc_cache_key: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Default upstream connect timeout (30 seconds).
|
/// Default upstream connect timeout (30 seconds).
|
||||||
const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||||
|
|
||||||
@@ -347,6 +360,7 @@ impl HttpProxyService {
|
|||||||
let st = start;
|
let st = start;
|
||||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
|
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
|
||||||
async move {
|
async move {
|
||||||
|
let req = req.map(|body| BoxBody::new(body));
|
||||||
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
||||||
// Mark request end — update activity timestamp before guard drops
|
// Mark request end — update activity timestamp before guard drops
|
||||||
la.store(st.elapsed().as_millis() as u64, Ordering::Relaxed);
|
la.store(st.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||||
@@ -416,9 +430,13 @@ impl HttpProxyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a single HTTP request.
|
/// Handle a single HTTP request.
|
||||||
async fn handle_request(
|
///
|
||||||
|
/// Accepts a generic body (`BoxBody`) so both the TCP/HTTP path (which boxes
|
||||||
|
/// `Incoming`) and the H3 path (which boxes the H3 request body stream) can
|
||||||
|
/// share the same backend forwarding logic.
|
||||||
|
pub async fn handle_request(
|
||||||
&self,
|
&self,
|
||||||
req: Request<Incoming>,
|
req: Request<BoxBody<Bytes, hyper::Error>>,
|
||||||
peer_addr: std::net::SocketAddr,
|
peer_addr: std::net::SocketAddr,
|
||||||
port: u16,
|
port: u16,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
@@ -965,7 +983,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
io: TokioIo<BackendStream>,
|
io: TokioIo<BackendStream>,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
upstream_headers: hyper::HeaderMap,
|
upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
_upstream: &crate::upstream_selector::UpstreamSelection,
|
_upstream: &crate::upstream_selector::UpstreamSelection,
|
||||||
@@ -1013,7 +1031,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
mut sender: hyper::client::conn::http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
mut sender: hyper::client::conn::http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
upstream_headers: hyper::HeaderMap,
|
upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
route: &rustproxy_config::RouteConfig,
|
route: &rustproxy_config::RouteConfig,
|
||||||
@@ -1077,7 +1095,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
io: TokioIo<BackendStream>,
|
io: TokioIo<BackendStream>,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
upstream_headers: hyper::HeaderMap,
|
upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
_upstream: &crate::upstream_selector::UpstreamSelection,
|
_upstream: &crate::upstream_selector::UpstreamSelection,
|
||||||
@@ -1151,7 +1169,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
upstream_headers: hyper::HeaderMap,
|
upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
route: &rustproxy_config::RouteConfig,
|
route: &rustproxy_config::RouteConfig,
|
||||||
@@ -1344,7 +1362,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
io: TokioIo<BackendStream>,
|
io: TokioIo<BackendStream>,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
mut upstream_headers: hyper::HeaderMap,
|
mut upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||||
@@ -1675,7 +1693,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
mut sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
mut sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
upstream_headers: hyper::HeaderMap,
|
upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
route: &rustproxy_config::RouteConfig,
|
route: &rustproxy_config::RouteConfig,
|
||||||
@@ -1816,7 +1834,7 @@ impl HttpProxyService {
|
|||||||
/// Handle a WebSocket upgrade request (H1 Upgrade or H2 Extended CONNECT per RFC 8441).
|
/// Handle a WebSocket upgrade request (H1 Upgrade or H2 Extended CONNECT per RFC 8441).
|
||||||
async fn handle_websocket_upgrade(
|
async fn handle_websocket_upgrade(
|
||||||
&self,
|
&self,
|
||||||
req: Request<Incoming>,
|
req: Request<BoxBody<Bytes, hyper::Error>>,
|
||||||
peer_addr: std::net::SocketAddr,
|
peer_addr: std::net::SocketAddr,
|
||||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||||
route: &rustproxy_config::RouteConfig,
|
route: &rustproxy_config::RouteConfig,
|
||||||
@@ -2538,7 +2556,7 @@ impl HttpProxyService {
|
|||||||
&self,
|
&self,
|
||||||
quic_conn: quinn::Connection,
|
quic_conn: quinn::Connection,
|
||||||
parts: hyper::http::request::Parts,
|
parts: hyper::http::request::Parts,
|
||||||
body: Incoming,
|
body: BoxBody<Bytes, hyper::Error>,
|
||||||
upstream_headers: hyper::HeaderMap,
|
upstream_headers: hyper::HeaderMap,
|
||||||
upstream_path: &str,
|
upstream_path: &str,
|
||||||
route: &rustproxy_config::RouteConfig,
|
route: &rustproxy_config::RouteConfig,
|
||||||
@@ -2550,7 +2568,11 @@ impl HttpProxyService {
|
|||||||
backend_key: &str,
|
backend_key: &str,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
|
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
|
||||||
let (mut driver, mut send_request) = match h3::client::new(h3_quinn_conn).await {
|
let (mut driver, mut send_request) = match h3::client::builder()
|
||||||
|
.send_grease(false)
|
||||||
|
.build(h3_quinn_conn)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(pair) => pair,
|
Ok(pair) => pair,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
|
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ use std::sync::Arc;
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http_body_util::Full;
|
use http_body_util::Full;
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper::body::Incoming;
|
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
use http_body_util::combinators::BoxBody;
|
use http_body_util::combinators::BoxBody;
|
||||||
|
|
||||||
@@ -19,7 +18,7 @@ impl RequestFilter {
|
|||||||
/// Apply security filters. Returns Some(response) if the request should be blocked.
|
/// Apply security filters. Returns Some(response) if the request should be blocked.
|
||||||
pub fn apply(
|
pub fn apply(
|
||||||
security: &RouteSecurity,
|
security: &RouteSecurity,
|
||||||
req: &Request<Incoming>,
|
req: &Request<impl hyper::body::Body>,
|
||||||
peer_addr: &SocketAddr,
|
peer_addr: &SocketAddr,
|
||||||
) -> Option<Response<BoxBody<Bytes, hyper::Error>>> {
|
) -> Option<Response<BoxBody<Bytes, hyper::Error>>> {
|
||||||
Self::apply_with_rate_limiter(security, req, peer_addr, None)
|
Self::apply_with_rate_limiter(security, req, peer_addr, None)
|
||||||
@@ -29,7 +28,7 @@ impl RequestFilter {
|
|||||||
/// Returns Some(response) if the request should be blocked.
|
/// Returns Some(response) if the request should be blocked.
|
||||||
pub fn apply_with_rate_limiter(
|
pub fn apply_with_rate_limiter(
|
||||||
security: &RouteSecurity,
|
security: &RouteSecurity,
|
||||||
req: &Request<Incoming>,
|
req: &Request<impl hyper::body::Body>,
|
||||||
peer_addr: &SocketAddr,
|
peer_addr: &SocketAddr,
|
||||||
rate_limiter: Option<&Arc<RateLimiter>>,
|
rate_limiter: Option<&Arc<RateLimiter>>,
|
||||||
) -> Option<Response<BoxBody<Bytes, hyper::Error>>> {
|
) -> Option<Response<BoxBody<Bytes, hyper::Error>>> {
|
||||||
@@ -182,7 +181,7 @@ impl RequestFilter {
|
|||||||
/// Determine the rate limit key based on configuration.
|
/// Determine the rate limit key based on configuration.
|
||||||
fn rate_limit_key(
|
fn rate_limit_key(
|
||||||
config: &rustproxy_config::RouteRateLimit,
|
config: &rustproxy_config::RouteRateLimit,
|
||||||
req: &Request<Incoming>,
|
req: &Request<impl hyper::body::Body>,
|
||||||
peer_addr: &SocketAddr,
|
peer_addr: &SocketAddr,
|
||||||
) -> String {
|
) -> String {
|
||||||
use rustproxy_config::RateLimitKeyBy;
|
use rustproxy_config::RateLimitKeyBy;
|
||||||
@@ -220,7 +219,7 @@ impl RequestFilter {
|
|||||||
/// Handle CORS preflight (OPTIONS) requests.
|
/// Handle CORS preflight (OPTIONS) requests.
|
||||||
/// Returns Some(response) if this is a CORS preflight that should be handled.
|
/// Returns Some(response) if this is a CORS preflight that should be handled.
|
||||||
pub fn handle_cors_preflight(
|
pub fn handle_cors_preflight(
|
||||||
req: &Request<Incoming>,
|
req: &Request<impl hyper::body::Body>,
|
||||||
) -> Option<Response<BoxBody<Bytes, hyper::Error>>> {
|
) -> Option<Response<BoxBody<Bytes, hyper::Error>>> {
|
||||||
if req.method() != hyper::Method::OPTIONS {
|
if req.method() != hyper::Method::OPTIONS {
|
||||||
return None;
|
return None;
|
||||||
|
|||||||
@@ -3,13 +3,21 @@
|
|||||||
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
|
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
|
||||||
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
|
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
|
||||||
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
|
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
|
||||||
|
//!
|
||||||
|
//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2
|
||||||
|
//! headers before they reach quinn, extracting real client IPs for attribution.
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
|
use dashmap::DashMap;
|
||||||
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
|
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
|
||||||
use rustls::ServerConfig as RustlsServerConfig;
|
use rustls::ServerConfig as RustlsServerConfig;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@@ -47,9 +55,274 @@ pub fn create_quic_endpoint(
|
|||||||
Ok(endpoint)
|
Ok(endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===== PROXY protocol relay for QUIC =====
|
||||||
|
|
||||||
|
/// Result of creating a QUIC endpoint with a PROXY protocol relay layer.
|
||||||
|
pub struct QuicProxyRelay {
|
||||||
|
/// The quinn endpoint (bound to 127.0.0.1:ephemeral).
|
||||||
|
pub endpoint: Endpoint,
|
||||||
|
/// The relay recv loop task handle.
|
||||||
|
pub relay_task: JoinHandle<()>,
|
||||||
|
/// Maps relay socket local addr → real client SocketAddr (from PROXY v2).
|
||||||
|
/// Consulted by `quic_accept_loop` to resolve real client IPs.
|
||||||
|
pub real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A single relay session for forwarding datagrams between an external source
|
||||||
|
/// and the internal quinn endpoint.
|
||||||
|
struct RelaySession {
|
||||||
|
socket: Arc<UdpSocket>,
|
||||||
|
last_activity: AtomicU64,
|
||||||
|
return_task: JoinHandle<()>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
|
||||||
|
///
|
||||||
|
/// Instead of giving the external socket to quinn, we:
|
||||||
|
/// 1. Bind a raw UDP socket on 0.0.0.0:port (external)
|
||||||
|
/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral)
|
||||||
|
/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams
|
||||||
|
///
|
||||||
|
/// Only used when `proxy_ips` is non-empty.
|
||||||
|
pub fn create_quic_endpoint_with_proxy_relay(
|
||||||
|
port: u16,
|
||||||
|
tls_config: Arc<RustlsServerConfig>,
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> anyhow::Result<QuicProxyRelay> {
|
||||||
|
// Bind external socket on the real port
|
||||||
|
let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?;
|
||||||
|
external_socket.set_nonblocking(true)?;
|
||||||
|
let external_socket = Arc::new(
|
||||||
|
UdpSocket::from_std(external_socket)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Bind quinn on localhost ephemeral port
|
||||||
|
let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
|
||||||
|
let quinn_internal_addr = internal_socket.local_addr()?;
|
||||||
|
|
||||||
|
let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?;
|
||||||
|
let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto));
|
||||||
|
|
||||||
|
let endpoint = Endpoint::new(
|
||||||
|
quinn::EndpointConfig::default(),
|
||||||
|
Some(server_config),
|
||||||
|
internal_socket,
|
||||||
|
quinn::default_runtime()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let real_client_map = Arc::new(DashMap::new());
|
||||||
|
|
||||||
|
let relay_task = tokio::spawn(quic_proxy_relay_loop(
|
||||||
|
external_socket,
|
||||||
|
quinn_internal_addr,
|
||||||
|
proxy_ips,
|
||||||
|
Arc::clone(&real_client_map),
|
||||||
|
cancel,
|
||||||
|
));
|
||||||
|
|
||||||
|
info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr);
|
||||||
|
Ok(QuicProxyRelay { endpoint, relay_task, real_client_map })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Main relay loop: reads datagrams from the external socket, filters PROXY v2
|
||||||
|
/// headers from trusted proxy IPs, and forwards everything else to quinn via
|
||||||
|
/// per-session relay sockets.
|
||||||
|
async fn quic_proxy_relay_loop(
|
||||||
|
external_socket: Arc<UdpSocket>,
|
||||||
|
quinn_internal_addr: SocketAddr,
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
|
real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) {
|
||||||
|
// Maps external source addr → real client addr (from PROXY v2 headers)
|
||||||
|
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
|
||||||
|
// Maps external source addr → relay session
|
||||||
|
let relay_sessions: DashMap<SocketAddr, Arc<RelaySession>> = DashMap::new();
|
||||||
|
let epoch = Instant::now();
|
||||||
|
let mut buf = vec![0u8; 65535];
|
||||||
|
|
||||||
|
// Inline cleanup: periodically scan relay_sessions for stale entries
|
||||||
|
let mut last_cleanup = Instant::now();
|
||||||
|
let cleanup_interval = std::time::Duration::from_secs(30);
|
||||||
|
let session_timeout_ms: u64 = 120_000;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (len, src_addr) = tokio::select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
debug!("QUIC proxy relay loop cancelled");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = external_socket.recv_from(&mut buf) => {
|
||||||
|
match result {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("QUIC proxy relay recv error: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let datagram = &buf[..len];
|
||||||
|
|
||||||
|
// PROXY v2 handling: only on first datagram from a trusted proxy IP
|
||||||
|
// (before a relay session exists for this source)
|
||||||
|
if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() {
|
||||||
|
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
|
||||||
|
match crate::proxy_protocol::parse_v2(datagram) {
|
||||||
|
Ok((header, _consumed)) => {
|
||||||
|
debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr);
|
||||||
|
proxy_addr_map.insert(src_addr, header.source_addr);
|
||||||
|
continue; // consume the PROXY v2 datagram
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine real client address
|
||||||
|
let real_client = proxy_addr_map.get(&src_addr)
|
||||||
|
.map(|r| *r)
|
||||||
|
.unwrap_or(src_addr);
|
||||||
|
|
||||||
|
// Get or create relay session for this external source
|
||||||
|
let session = match relay_sessions.get(&src_addr) {
|
||||||
|
Some(s) => {
|
||||||
|
s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||||
|
Arc::clone(s.value())
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Create new relay socket connected to quinn's internal address
|
||||||
|
let relay_socket = match UdpSocket::bind("127.0.0.1:0").await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("QUIC relay: failed to bind relay socket: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = relay_socket.connect(quinn_internal_addr).await {
|
||||||
|
warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let relay_local_addr = match relay_socket.local_addr() {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("QUIC relay: failed to get relay socket local addr: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let relay_socket = Arc::new(relay_socket);
|
||||||
|
|
||||||
|
// Store the real client mapping for the QUIC accept loop
|
||||||
|
real_client_map.insert(relay_local_addr, real_client);
|
||||||
|
|
||||||
|
// Spawn return-path relay: quinn -> external socket -> original source
|
||||||
|
let session_cancel = cancel.child_token();
|
||||||
|
let return_task = tokio::spawn(relay_return_path(
|
||||||
|
Arc::clone(&relay_socket),
|
||||||
|
Arc::clone(&external_socket),
|
||||||
|
src_addr,
|
||||||
|
session_cancel.child_token(),
|
||||||
|
));
|
||||||
|
|
||||||
|
let session = Arc::new(RelaySession {
|
||||||
|
socket: relay_socket,
|
||||||
|
last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64),
|
||||||
|
return_task,
|
||||||
|
cancel: session_cancel,
|
||||||
|
});
|
||||||
|
|
||||||
|
relay_sessions.insert(src_addr, Arc::clone(&session));
|
||||||
|
debug!("QUIC relay: new session for {} (relay {}), real client {}",
|
||||||
|
src_addr, relay_local_addr, real_client);
|
||||||
|
|
||||||
|
session
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Forward datagram to quinn via the relay socket
|
||||||
|
if let Err(e) = session.socket.send(datagram).await {
|
||||||
|
debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Periodic cleanup of stale relay sessions
|
||||||
|
if last_cleanup.elapsed() >= cleanup_interval {
|
||||||
|
last_cleanup = Instant::now();
|
||||||
|
let now_ms = epoch.elapsed().as_millis() as u64;
|
||||||
|
let stale_keys: Vec<SocketAddr> = relay_sessions.iter()
|
||||||
|
.filter(|entry| {
|
||||||
|
let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed));
|
||||||
|
age > session_timeout_ms
|
||||||
|
})
|
||||||
|
.map(|entry| *entry.key())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for key in stale_keys {
|
||||||
|
if let Some((_, session)) = relay_sessions.remove(&key) {
|
||||||
|
session.cancel.cancel();
|
||||||
|
session.return_task.abort();
|
||||||
|
// Clean up real_client_map entry
|
||||||
|
if let Ok(addr) = session.socket.local_addr() {
|
||||||
|
real_client_map.remove(&addr);
|
||||||
|
}
|
||||||
|
proxy_addr_map.remove(&key);
|
||||||
|
debug!("QUIC relay: cleaned up stale session for {}", key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown: cancel all relay sessions
|
||||||
|
for entry in relay_sessions.iter() {
|
||||||
|
entry.value().cancel.cancel();
|
||||||
|
entry.value().return_task.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return-path relay: receives datagrams from quinn (via the relay socket)
|
||||||
|
/// and forwards them back to the external client through the external socket.
|
||||||
|
async fn relay_return_path(
|
||||||
|
relay_socket: Arc<UdpSocket>,
|
||||||
|
external_socket: Arc<UdpSocket>,
|
||||||
|
external_src_addr: SocketAddr,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) {
|
||||||
|
let mut buf = vec![0u8; 65535];
|
||||||
|
loop {
|
||||||
|
let len = tokio::select! {
|
||||||
|
_ = cancel.cancelled() => break,
|
||||||
|
result = relay_socket.recv(&mut buf) => {
|
||||||
|
match result {
|
||||||
|
Ok(len) => len,
|
||||||
|
Err(e) => {
|
||||||
|
debug!("QUIC relay return recv error for {}: {}", external_src_addr, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await {
|
||||||
|
debug!("QUIC relay return send error to {}: {}", external_src_addr, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== QUIC accept loop =====
|
||||||
|
|
||||||
/// Run the QUIC accept loop for a single endpoint.
|
/// Run the QUIC accept loop for a single endpoint.
|
||||||
///
|
///
|
||||||
/// Accepts incoming QUIC connections and spawns a task per connection.
|
/// Accepts incoming QUIC connections and spawns a task per connection.
|
||||||
|
/// When `real_client_map` is provided, it is consulted to resolve real client
|
||||||
|
/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr).
|
||||||
pub async fn quic_accept_loop(
|
pub async fn quic_accept_loop(
|
||||||
endpoint: Endpoint,
|
endpoint: Endpoint,
|
||||||
port: u16,
|
port: u16,
|
||||||
@@ -58,6 +331,7 @@ pub async fn quic_accept_loop(
|
|||||||
conn_tracker: Arc<ConnectionTracker>,
|
conn_tracker: Arc<ConnectionTracker>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
h3_service: Option<Arc<H3ProxyService>>,
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
|
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
let incoming = tokio::select! {
|
let incoming = tokio::select! {
|
||||||
@@ -77,11 +351,16 @@ pub async fn quic_accept_loop(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let remote_addr = incoming.remote_address();
|
let remote_addr = incoming.remote_address();
|
||||||
let ip = remote_addr.ip();
|
|
||||||
|
// Resolve real client IP from PROXY protocol map if available
|
||||||
|
let real_addr = real_client_map.as_ref()
|
||||||
|
.and_then(|map| map.get(&remote_addr).map(|r| *r))
|
||||||
|
.unwrap_or(remote_addr);
|
||||||
|
let ip = real_addr.ip();
|
||||||
|
|
||||||
// Per-IP rate limiting
|
// Per-IP rate limiting
|
||||||
if !conn_tracker.try_accept(&ip) {
|
if !conn_tracker.try_accept(&ip) {
|
||||||
debug!("QUIC connection rejected from {} (rate limit)", remote_addr);
|
debug!("QUIC connection rejected from {} (rate limit)", real_addr);
|
||||||
// Drop `incoming` to refuse the connection
|
// Drop `incoming` to refuse the connection
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -104,7 +383,7 @@ pub async fn quic_accept_loop(
|
|||||||
let route = match rm.find_route(&ctx) {
|
let route = match rm.find_route(&ctx) {
|
||||||
Some(m) => m.route.clone(),
|
Some(m) => m.route.clone(),
|
||||||
None => {
|
None => {
|
||||||
debug!("No QUIC route matched for port {} from {}", port, remote_addr);
|
debug!("No QUIC route matched for port {} from {}", port, real_addr);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -117,11 +396,12 @@ pub async fn quic_accept_loop(
|
|||||||
let conn_tracker = Arc::clone(&conn_tracker);
|
let conn_tracker = Arc::clone(&conn_tracker);
|
||||||
let cancel = cancel.child_token();
|
let cancel = cancel.child_token();
|
||||||
let h3_svc = h3_service.clone();
|
let h3_svc = h3_service.clone();
|
||||||
|
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc).await {
|
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
|
||||||
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
|
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
||||||
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
|
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
@@ -144,10 +424,11 @@ async fn handle_quic_connection(
|
|||||||
metrics: Arc<MetricsCollector>,
|
metrics: Arc<MetricsCollector>,
|
||||||
cancel: &CancellationToken,
|
cancel: &CancellationToken,
|
||||||
h3_service: Option<Arc<H3ProxyService>>,
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
|
real_client_addr: Option<SocketAddr>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let connection = incoming.await?;
|
let connection = incoming.await?;
|
||||||
let remote_addr = connection.remote_address();
|
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
debug!("QUIC connection established from {}", remote_addr);
|
debug!("QUIC connection established from {}", effective_addr);
|
||||||
|
|
||||||
// Check if this route has HTTP/3 enabled
|
// Check if this route has HTTP/3 enabled
|
||||||
let enable_http3 = route.action.udp.as_ref()
|
let enable_http3 = route.action.udp.as_ref()
|
||||||
@@ -158,7 +439,7 @@ async fn handle_quic_connection(
|
|||||||
if enable_http3 {
|
if enable_http3 {
|
||||||
if let Some(ref h3_svc) = h3_service {
|
if let Some(ref h3_svc) = h3_service {
|
||||||
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
|
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
|
||||||
h3_svc.handle_connection(connection, &route, port).await
|
h3_svc.handle_connection(connection, &route, port, real_client_addr).await
|
||||||
} else {
|
} else {
|
||||||
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
|
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
|
||||||
// Keep connection alive until cancelled
|
// Keep connection alive until cancelled
|
||||||
@@ -172,7 +453,7 @@ async fn handle_quic_connection(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
|
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
|
||||||
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
|
handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -187,8 +468,9 @@ async fn handle_quic_stream_forwarding(
|
|||||||
port: u16,
|
port: u16,
|
||||||
metrics: Arc<MetricsCollector>,
|
metrics: Arc<MetricsCollector>,
|
||||||
cancel: &CancellationToken,
|
cancel: &CancellationToken,
|
||||||
|
real_client_addr: Option<SocketAddr>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let remote_addr = connection.remote_address();
|
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||||
let metrics_arc = metrics;
|
let metrics_arc = metrics;
|
||||||
|
|
||||||
@@ -209,7 +491,7 @@ async fn handle_quic_stream_forwarding(
|
|||||||
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
|
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
|
||||||
Err(quinn::ConnectionError::LocallyClosed) => break,
|
Err(quinn::ConnectionError::LocallyClosed) => break,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("QUIC stream accept error from {}: {}", remote_addr, e);
|
debug!("QUIC stream accept error from {}: {}", effective_addr, e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -217,7 +499,7 @@ async fn handle_quic_stream_forwarding(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let backend_addr = backend_addr.clone();
|
let backend_addr = backend_addr.clone();
|
||||||
let ip_str = remote_addr.ip().to_string();
|
let ip_str = effective_addr.ip().to_string();
|
||||||
let stream_metrics = Arc::clone(&metrics_arc);
|
let stream_metrics = Arc::clone(&metrics_arc);
|
||||||
let stream_route_id = route_id.map(|s| s.to_string());
|
let stream_route_id = route_id.map(|s| s.to_string());
|
||||||
|
|
||||||
|
|||||||
@@ -428,6 +428,11 @@ impl TcpListenerManager {
|
|||||||
self.http_proxy.prune_stale_routes(active_route_ids);
|
self.http_proxy.prune_stale_routes(active_route_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the HTTP proxy service (shared with H3).
|
||||||
|
pub fn http_proxy(&self) -> &Arc<HttpProxyService> {
|
||||||
|
&self.http_proxy
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a reference to the connection tracker.
|
/// Get a reference to the connection tracker.
|
||||||
pub fn conn_tracker(&self) -> &Arc<ConnectionTracker> {
|
pub fn conn_tracker(&self) -> &Arc<ConnectionTracker> {
|
||||||
&self.conn_tracker
|
&self.conn_tracker
|
||||||
|
|||||||
@@ -2,12 +2,17 @@
|
|||||||
//!
|
//!
|
||||||
//! Binds UDP sockets on configured ports, receives datagrams, matches routes,
|
//! Binds UDP sockets on configured ports, receives datagrams, matches routes,
|
||||||
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
|
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
|
||||||
|
//!
|
||||||
|
//! Supports PROXY protocol v2 on both raw UDP and QUIC paths when `proxy_ips`
|
||||||
|
//! is configured. For QUIC, a relay layer intercepts datagrams before they
|
||||||
|
//! reach the quinn endpoint.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use dashmap::DashMap;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
@@ -48,6 +53,9 @@ pub struct UdpListenerManager {
|
|||||||
relay_reader_cancel: Option<CancellationToken>,
|
relay_reader_cancel: Option<CancellationToken>,
|
||||||
/// H3 proxy service for HTTP/3 request handling
|
/// H3 proxy service for HTTP/3 request handling
|
||||||
h3_service: Option<Arc<H3ProxyService>>,
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
|
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
|
||||||
|
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for UdpListenerManager {
|
impl Drop for UdpListenerManager {
|
||||||
@@ -80,9 +88,18 @@ impl UdpListenerManager {
|
|||||||
relay_writer: Arc::new(Mutex::new(None)),
|
relay_writer: Arc::new(Mutex::new(None)),
|
||||||
relay_reader_cancel: None,
|
relay_reader_cancel: None,
|
||||||
h3_service: None,
|
h3_service: None,
|
||||||
|
proxy_ips: Arc::new(Vec::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the trusted proxy IPs for PROXY protocol v2 detection.
|
||||||
|
pub fn set_proxy_ips(&mut self, ips: Vec<IpAddr>) {
|
||||||
|
if !ips.is_empty() {
|
||||||
|
info!("UDP/QUIC PROXY protocol v2 enabled for {} trusted IPs", ips.len());
|
||||||
|
}
|
||||||
|
self.proxy_ips = Arc::new(ips);
|
||||||
|
}
|
||||||
|
|
||||||
/// Set the H3 proxy service for HTTP/3 request handling.
|
/// Set the H3 proxy service for HTTP/3 request handling.
|
||||||
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
|
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
|
||||||
self.h3_service = Some(svc);
|
self.h3_service = Some(svc);
|
||||||
@@ -122,20 +139,44 @@ impl UdpListenerManager {
|
|||||||
|
|
||||||
if has_quic {
|
if has_quic {
|
||||||
if let Some(tls) = tls_config {
|
if let Some(tls) = tls_config {
|
||||||
// Create QUIC endpoint; clone it so we can hot-swap TLS later
|
if self.proxy_ips.is_empty() {
|
||||||
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
|
// Direct path: quinn owns the external socket (zero overhead)
|
||||||
let endpoint_for_updates = endpoint.clone(); // quinn::Endpoint is Arc-based
|
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
|
||||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
let endpoint_for_updates = endpoint.clone();
|
||||||
endpoint,
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
port,
|
endpoint,
|
||||||
Arc::clone(&self.route_manager),
|
port,
|
||||||
Arc::clone(&self.metrics),
|
Arc::clone(&self.route_manager),
|
||||||
Arc::clone(&self.conn_tracker),
|
Arc::clone(&self.metrics),
|
||||||
self.cancel_token.child_token(),
|
Arc::clone(&self.conn_tracker),
|
||||||
self.h3_service.clone(),
|
self.cancel_token.child_token(),
|
||||||
));
|
self.h3_service.clone(),
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
None,
|
||||||
info!("QUIC endpoint started on port {}", port);
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
info!("QUIC endpoint started on port {}", port);
|
||||||
|
} else {
|
||||||
|
// Proxy relay path: we own external socket, quinn on localhost
|
||||||
|
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
|
||||||
|
port,
|
||||||
|
tls,
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
)?;
|
||||||
|
let endpoint_for_updates = relay.endpoint.clone();
|
||||||
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
|
relay.endpoint,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
self.h3_service.clone(),
|
||||||
|
Some(relay.real_client_map),
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
info!("QUIC endpoint with PROXY relay started on port {}", port);
|
||||||
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
|
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
|
||||||
@@ -158,6 +199,7 @@ impl UdpListenerManager {
|
|||||||
Arc::clone(&self.datagram_handler_relay),
|
Arc::clone(&self.datagram_handler_relay),
|
||||||
Arc::clone(&self.relay_writer),
|
Arc::clone(&self.relay_writer),
|
||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
));
|
));
|
||||||
|
|
||||||
self.listeners.insert(port, (handle, None));
|
self.listeners.insert(port, (handle, None));
|
||||||
@@ -262,19 +304,14 @@ impl UdpListenerManager {
|
|||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
// Create QUIC endpoint on the now-free port
|
// Create QUIC endpoint on the now-free port
|
||||||
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
|
let create_result = if self.proxy_ips.is_empty() {
|
||||||
Ok(endpoint) => {
|
self.create_quic_direct(port, Arc::clone(&tls_config))
|
||||||
let endpoint_for_updates = endpoint.clone();
|
} else {
|
||||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
self.create_quic_with_relay(port, Arc::clone(&tls_config))
|
||||||
endpoint,
|
};
|
||||||
port,
|
|
||||||
Arc::clone(&self.route_manager),
|
match create_result {
|
||||||
Arc::clone(&self.metrics),
|
Ok(()) => {
|
||||||
Arc::clone(&self.conn_tracker),
|
|
||||||
self.cancel_token.child_token(),
|
|
||||||
self.h3_service.clone(),
|
|
||||||
));
|
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
|
||||||
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
|
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -282,19 +319,14 @@ impl UdpListenerManager {
|
|||||||
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
|
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
|
let retry_result = if self.proxy_ips.is_empty() {
|
||||||
Ok(endpoint) => {
|
self.create_quic_direct(port, Arc::clone(&tls_config))
|
||||||
let endpoint_for_updates = endpoint.clone();
|
} else {
|
||||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
self.create_quic_with_relay(port, Arc::clone(&tls_config))
|
||||||
endpoint,
|
};
|
||||||
port,
|
|
||||||
Arc::clone(&self.route_manager),
|
match retry_result {
|
||||||
Arc::clone(&self.metrics),
|
Ok(()) => {
|
||||||
Arc::clone(&self.conn_tracker),
|
|
||||||
self.cancel_token.child_token(),
|
|
||||||
self.h3_service.clone(),
|
|
||||||
));
|
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
|
||||||
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
|
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
|
||||||
}
|
}
|
||||||
Err(e2) => {
|
Err(e2) => {
|
||||||
@@ -311,6 +343,47 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a direct QUIC endpoint (quinn owns the socket).
|
||||||
|
fn create_quic_direct(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
|
||||||
|
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls_config)?;
|
||||||
|
let endpoint_for_updates = endpoint.clone();
|
||||||
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
|
endpoint,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
self.h3_service.clone(),
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a QUIC endpoint with PROXY protocol relay.
|
||||||
|
fn create_quic_with_relay(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
|
||||||
|
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
|
||||||
|
port,
|
||||||
|
tls_config,
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
)?;
|
||||||
|
let endpoint_for_updates = relay.endpoint.clone();
|
||||||
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
|
relay.endpoint,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
self.h3_service.clone(),
|
||||||
|
Some(relay.real_client_map),
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
|
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
|
||||||
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
|
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
|
||||||
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
|
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
|
||||||
@@ -327,6 +400,7 @@ impl UdpListenerManager {
|
|||||||
Arc::clone(&self.datagram_handler_relay),
|
Arc::clone(&self.datagram_handler_relay),
|
||||||
Arc::clone(&self.relay_writer),
|
Arc::clone(&self.relay_writer),
|
||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
));
|
));
|
||||||
|
|
||||||
self.listeners.insert(port, (handle, None));
|
self.listeners.insert(port, (handle, None));
|
||||||
@@ -407,6 +481,10 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Main receive loop for a UDP port.
|
/// Main receive loop for a UDP port.
|
||||||
|
///
|
||||||
|
/// When `proxy_ips` is non-empty, the first datagram from a trusted proxy IP
|
||||||
|
/// is checked for PROXY protocol v2. If found, the real client IP is extracted
|
||||||
|
/// and used for all subsequent session handling for that source address.
|
||||||
async fn recv_loop(
|
async fn recv_loop(
|
||||||
socket: Arc<UdpSocket>,
|
socket: Arc<UdpSocket>,
|
||||||
port: u16,
|
port: u16,
|
||||||
@@ -417,10 +495,15 @@ impl UdpListenerManager {
|
|||||||
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
||||||
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
) {
|
) {
|
||||||
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
|
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
|
||||||
let mut buf = vec![0u8; 65535];
|
let mut buf = vec![0u8; 65535];
|
||||||
|
|
||||||
|
// Maps proxy source addr → real client addr (from PROXY v2 headers).
|
||||||
|
// Only populated when proxy_ips is non-empty.
|
||||||
|
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (len, client_addr) = tokio::select! {
|
let (len, client_addr) = tokio::select! {
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
@@ -440,9 +523,39 @@ impl UdpListenerManager {
|
|||||||
|
|
||||||
let datagram = &buf[..len];
|
let datagram = &buf[..len];
|
||||||
|
|
||||||
// Route matching
|
// PROXY protocol v2 detection for datagrams from trusted proxy IPs
|
||||||
|
let effective_client_ip = if !proxy_ips.is_empty() && proxy_ips.contains(&client_addr.ip()) {
|
||||||
|
let session_key: SessionKey = (client_addr, port);
|
||||||
|
if session_table.get(&session_key).is_none() && !proxy_addr_map.contains_key(&client_addr) {
|
||||||
|
// No session and no prior PROXY header — check for PROXY v2
|
||||||
|
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
|
||||||
|
match crate::proxy_protocol::parse_v2(datagram) {
|
||||||
|
Ok((header, _consumed)) => {
|
||||||
|
debug!("UDP PROXY v2 from {}: real client {}", client_addr, header.source_addr);
|
||||||
|
proxy_addr_map.insert(client_addr, header.source_addr);
|
||||||
|
continue; // discard the PROXY v2 datagram
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!("UDP PROXY v2 parse error from {}: {}", client_addr, e);
|
||||||
|
client_addr.ip()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_addr.ip()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Use real client IP if we've previously seen a PROXY v2 header
|
||||||
|
proxy_addr_map.get(&client_addr)
|
||||||
|
.map(|r| r.ip())
|
||||||
|
.unwrap_or_else(|| client_addr.ip())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_addr.ip()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Route matching — use effective (real) client IP
|
||||||
let rm = route_manager.load();
|
let rm = route_manager.load();
|
||||||
let ip_str = client_addr.ip().to_string();
|
let ip_str = effective_client_ip.to_string();
|
||||||
let ctx = MatchContext {
|
let ctx = MatchContext {
|
||||||
port,
|
port,
|
||||||
domain: None,
|
domain: None,
|
||||||
@@ -491,20 +604,21 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Session lookup or create
|
// Session lookup or create
|
||||||
|
// Session key uses the proxy's source addr for correct return-path routing
|
||||||
let session_key: SessionKey = (client_addr, port);
|
let session_key: SessionKey = (client_addr, port);
|
||||||
let session = match session_table.get(&session_key) {
|
let session = match session_table.get(&session_key) {
|
||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
// New session — check per-IP limits
|
// New session — check per-IP limits using the real client IP
|
||||||
if !conn_tracker.try_accept(&client_addr.ip()) {
|
if !conn_tracker.try_accept(&effective_client_ip) {
|
||||||
debug!("UDP session rejected for {} (rate limit)", client_addr);
|
debug!("UDP session rejected for {} (rate limit)", effective_client_ip);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if !session_table.can_create_session(
|
if !session_table.can_create_session(
|
||||||
&client_addr.ip(),
|
&effective_client_ip,
|
||||||
udp_config.max_sessions_per_ip,
|
udp_config.max_sessions_per_ip,
|
||||||
) {
|
) {
|
||||||
debug!("UDP session rejected for {} (per-IP session limit)", client_addr);
|
debug!("UDP session rejected for {} (per-IP session limit)", effective_client_ip);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -537,8 +651,8 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
let backend_socket = Arc::new(backend_socket);
|
let backend_socket = Arc::new(backend_socket);
|
||||||
|
|
||||||
debug!("New UDP session: {} -> {} (via port {})",
|
debug!("New UDP session: {} -> {} (via port {}, real client {})",
|
||||||
client_addr, backend_addr, port);
|
client_addr, backend_addr, port, effective_client_ip);
|
||||||
|
|
||||||
// Spawn return-path relay task
|
// Spawn return-path relay task
|
||||||
let session_cancel = CancellationToken::new();
|
let session_cancel = CancellationToken::new();
|
||||||
@@ -558,7 +672,7 @@ impl UdpListenerManager {
|
|||||||
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
|
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
|
||||||
created_at: std::time::Instant::now(),
|
created_at: std::time::Instant::now(),
|
||||||
route_id: route_id.map(|s| s.to_string()),
|
route_id: route_id.map(|s| s.to_string()),
|
||||||
source_ip: client_addr.ip(),
|
source_ip: effective_client_ip,
|
||||||
client_addr,
|
client_addr,
|
||||||
return_task,
|
return_task,
|
||||||
cancel: session_cancel,
|
cancel: session_cancel,
|
||||||
@@ -569,8 +683,8 @@ impl UdpListenerManager {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track in metrics
|
// Track in metrics using the real client IP
|
||||||
conn_tracker.connection_opened(&client_addr.ip());
|
conn_tracker.connection_opened(&effective_client_ip);
|
||||||
metrics.connection_opened(route_id, Some(&ip_str));
|
metrics.connection_opened(route_id, Some(&ip_str));
|
||||||
metrics.udp_session_opened();
|
metrics.udp_session_opened();
|
||||||
|
|
||||||
|
|||||||
@@ -122,10 +122,16 @@ impl RouteManager {
|
|||||||
// This prevents session-ticket resumption from misrouting when clients
|
// This prevents session-ticket resumption from misrouting when clients
|
||||||
// omit SNI (RFC 8446 recommends but doesn't mandate SNI on resumption).
|
// omit SNI (RFC 8446 recommends but doesn't mandate SNI on resumption).
|
||||||
// Wildcard-only routes (domains: ["*"]) still match since they accept all.
|
// Wildcard-only routes (domains: ["*"]) still match since they accept all.
|
||||||
let patterns = domains.to_vec();
|
//
|
||||||
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
|
// Exception: QUIC (UDP transport) encrypts the TLS ClientHello, so SNI
|
||||||
if !is_wildcard_only {
|
// is unavailable at accept time. Domain verification happens per-request
|
||||||
return false;
|
// in H3ProxyService via the :authority header.
|
||||||
|
if ctx.transport != Some(TransportProtocol::Udp) {
|
||||||
|
let patterns = domains.to_vec();
|
||||||
|
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
|
||||||
|
if !is_wildcard_only {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -997,4 +1003,52 @@ mod tests {
|
|||||||
let result = manager.find_route(&udp_ctx).unwrap();
|
let result = manager.find_route(&udp_ctx).unwrap();
|
||||||
assert_eq!(result.route.name.as_deref(), Some("udp-route"));
|
assert_eq!(result.route.name.as_deref(), Some("udp-route"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_quic_tls_no_sni_matches_domain_restricted_route() {
|
||||||
|
// QUIC accept-level matching: is_tls=true, domain=None, transport=Udp.
|
||||||
|
// Should match because QUIC encrypts the ClientHello — SNI is unavailable
|
||||||
|
// at accept time but verified per-request in H3ProxyService.
|
||||||
|
let mut route = make_route(443, Some("example.com"), 0);
|
||||||
|
route.route_match.transport = Some(TransportProtocol::Udp);
|
||||||
|
let routes = vec![route];
|
||||||
|
let manager = RouteManager::new(routes);
|
||||||
|
|
||||||
|
let ctx = MatchContext {
|
||||||
|
port: 443,
|
||||||
|
domain: None,
|
||||||
|
path: None,
|
||||||
|
client_ip: None,
|
||||||
|
tls_version: None,
|
||||||
|
headers: None,
|
||||||
|
is_tls: true,
|
||||||
|
protocol: Some("quic"),
|
||||||
|
transport: Some(TransportProtocol::Udp),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(manager.find_route(&ctx).is_some(),
|
||||||
|
"QUIC (UDP) with is_tls=true and domain=None should match domain-restricted routes");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tcp_tls_no_sni_still_rejects_domain_restricted_route() {
|
||||||
|
// TCP TLS without SNI must still be rejected (no QUIC exemption).
|
||||||
|
let routes = vec![make_route(443, Some("example.com"), 0)];
|
||||||
|
let manager = RouteManager::new(routes);
|
||||||
|
|
||||||
|
let ctx = MatchContext {
|
||||||
|
port: 443,
|
||||||
|
domain: None,
|
||||||
|
path: None,
|
||||||
|
client_ip: None,
|
||||||
|
tls_version: None,
|
||||||
|
headers: None,
|
||||||
|
is_tls: true,
|
||||||
|
protocol: None,
|
||||||
|
transport: None, // TCP (default)
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(manager.find_route(&ctx).is_none(),
|
||||||
|
"TCP TLS without SNI should NOT match domain-restricted routes");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,3 +44,9 @@ mimalloc = { workspace = true }
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rcgen = { workspace = true }
|
rcgen = { workspace = true }
|
||||||
|
quinn = { workspace = true }
|
||||||
|
h3 = { workspace = true }
|
||||||
|
h3-quinn = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
rustls = { workspace = true }
|
||||||
|
http = "1"
|
||||||
|
|||||||
@@ -264,6 +264,8 @@ impl RustProxy {
|
|||||||
conn_config.socket_timeout_ms,
|
conn_config.socket_timeout_ms,
|
||||||
conn_config.max_connection_lifetime_ms,
|
conn_config.max_connection_lifetime_ms,
|
||||||
);
|
);
|
||||||
|
// Clone proxy_ips before conn_config is moved into the TCP listener
|
||||||
|
let udp_proxy_ips = conn_config.proxy_ips.clone();
|
||||||
listener.set_connection_config(conn_config);
|
listener.set_connection_config(conn_config);
|
||||||
|
|
||||||
// Share the socket-handler relay path with the listener
|
// Share the socket-handler relay path with the listener
|
||||||
@@ -339,16 +341,12 @@ impl RustProxy {
|
|||||||
conn_tracker,
|
conn_tracker,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
);
|
);
|
||||||
|
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
|
||||||
|
|
||||||
// Construct H3ProxyService for HTTP/3 request handling
|
// Share HttpProxyService with H3 — same route matching, connection
|
||||||
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(
|
// pool, and ALPN protocol detection as the TCP/HTTP path.
|
||||||
Arc::new(ArcSwap::from(Arc::clone(&*self.route_table.load()))),
|
let http_proxy = self.listener_manager.as_ref().unwrap().http_proxy().clone();
|
||||||
Arc::clone(&self.metrics),
|
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(http_proxy);
|
||||||
Arc::new(rustproxy_http::connection_pool::ConnectionPool::new()),
|
|
||||||
Arc::new(rustproxy_http::protocol_cache::ProtocolCache::new()),
|
|
||||||
rustproxy_passthrough::tls_handler::shared_backend_tls_config(),
|
|
||||||
std::time::Duration::from_secs(30),
|
|
||||||
);
|
|
||||||
udp_mgr.set_h3_service(Arc::new(h3_svc));
|
udp_mgr.set_h3_service(Arc::new(h3_svc));
|
||||||
|
|
||||||
for port in &udp_ports {
|
for port in &udp_ports {
|
||||||
@@ -774,12 +772,15 @@ impl RustProxy {
|
|||||||
if self.udp_listener_manager.is_none() {
|
if self.udp_listener_manager.is_none() {
|
||||||
if let Some(ref listener) = self.listener_manager {
|
if let Some(ref listener) = self.listener_manager {
|
||||||
let conn_tracker = listener.conn_tracker().clone();
|
let conn_tracker = listener.conn_tracker().clone();
|
||||||
self.udp_listener_manager = Some(UdpListenerManager::new(
|
let conn_config = Self::build_connection_config(&self.options);
|
||||||
|
let mut udp_mgr = UdpListenerManager::new(
|
||||||
Arc::clone(&new_manager),
|
Arc::clone(&new_manager),
|
||||||
Arc::clone(&self.metrics),
|
Arc::clone(&self.metrics),
|
||||||
conn_tracker,
|
conn_tracker,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
));
|
);
|
||||||
|
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
|
||||||
|
self.udp_listener_manager = Some(udp_mgr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -997,44 +998,25 @@ impl RustProxy {
|
|||||||
fn build_quic_tls_config(
|
fn build_quic_tls_config(
|
||||||
tls_configs: &HashMap<String, TlsCertConfig>,
|
tls_configs: &HashMap<String, TlsCertConfig>,
|
||||||
) -> Option<Arc<rustls::ServerConfig>> {
|
) -> Option<Arc<rustls::ServerConfig>> {
|
||||||
// Find the first available cert (prefer wildcard, then any)
|
if tls_configs.is_empty() {
|
||||||
let cert_config = tls_configs.get("*")
|
|
||||||
.or_else(|| tls_configs.values().next());
|
|
||||||
|
|
||||||
let cert_config = match cert_config {
|
|
||||||
Some(c) => c,
|
|
||||||
None => return None,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Parse cert chain from PEM
|
|
||||||
let mut cert_reader = std::io::BufReader::new(cert_config.cert_pem.as_bytes());
|
|
||||||
let certs: Vec<rustls::pki_types::CertificateDer<'static>> =
|
|
||||||
rustls_pemfile::certs(&mut cert_reader)
|
|
||||||
.filter_map(|r| r.ok())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if certs.is_empty() {
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse private key from PEM
|
// Reuse CertResolver for SNI-based cert selection (same as TCP/TLS path).
|
||||||
let mut key_reader = std::io::BufReader::new(cert_config.key_pem.as_bytes());
|
// This ensures QUIC connections get the correct certificate for each domain
|
||||||
let key = match rustls_pemfile::private_key(&mut key_reader) {
|
// instead of a single static cert.
|
||||||
Ok(Some(key)) => key,
|
let resolver = match rustproxy_passthrough::tls_handler::CertResolver::new(tls_configs) {
|
||||||
_ => return None,
|
Ok(r) => r,
|
||||||
};
|
|
||||||
|
|
||||||
let mut tls_config = match rustls::ServerConfig::builder()
|
|
||||||
.with_no_client_auth()
|
|
||||||
.with_single_cert(certs, key)
|
|
||||||
{
|
|
||||||
Ok(c) => c,
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to build QUIC TLS config: {}", e);
|
warn!("Failed to build QUIC cert resolver: {}", e);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut tls_config = rustls::ServerConfig::builder()
|
||||||
|
.with_no_client_auth()
|
||||||
|
.with_cert_resolver(Arc::new(resolver));
|
||||||
|
|
||||||
// QUIC requires h3 ALPN
|
// QUIC requires h3 ALPN
|
||||||
tls_config.alpn_protocols = vec![b"h3".to_vec()];
|
tls_config.alpn_protocols = vec![b"h3".to_vec()];
|
||||||
|
|
||||||
|
|||||||
195
rust/crates/rustproxy/tests/integration_h3_proxy.rs
Normal file
195
rust/crates/rustproxy/tests/integration_h3_proxy.rs
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
mod common;
|
||||||
|
|
||||||
|
use common::*;
|
||||||
|
use rustproxy::RustProxy;
|
||||||
|
use rustproxy_config::{RustProxyOptions, TransportProtocol, RouteUdp, RouteQuic};
|
||||||
|
use bytes::Buf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// Build a route that listens on UDP with HTTP/3 enabled and TLS terminate.
|
||||||
|
fn make_h3_route(
|
||||||
|
port: u16,
|
||||||
|
target_host: &str,
|
||||||
|
target_port: u16,
|
||||||
|
cert_pem: &str,
|
||||||
|
key_pem: &str,
|
||||||
|
) -> rustproxy_config::RouteConfig {
|
||||||
|
let mut route = make_tls_terminate_route(port, "localhost", target_host, target_port, cert_pem, key_pem);
|
||||||
|
route.route_match.transport = Some(TransportProtocol::All);
|
||||||
|
// Keep domain="localhost" from make_tls_terminate_route — needed for TLS cert extraction
|
||||||
|
route.action.udp = Some(RouteUdp {
|
||||||
|
session_timeout: None,
|
||||||
|
max_sessions_per_ip: None,
|
||||||
|
max_datagram_size: None,
|
||||||
|
quic: Some(RouteQuic {
|
||||||
|
max_idle_timeout: Some(30000),
|
||||||
|
max_concurrent_bidi_streams: None,
|
||||||
|
max_concurrent_uni_streams: None,
|
||||||
|
enable_http3: Some(true),
|
||||||
|
alt_svc_port: None,
|
||||||
|
alt_svc_max_age: None,
|
||||||
|
initial_congestion_window: None,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
route
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a quinn client endpoint with insecure TLS for testing.
|
||||||
|
fn make_h3_client_endpoint() -> quinn::Endpoint {
|
||||||
|
let mut tls_config = rustls::ClientConfig::builder()
|
||||||
|
.dangerous()
|
||||||
|
.with_custom_certificate_verifier(Arc::new(InsecureVerifier))
|
||||||
|
.with_no_client_auth();
|
||||||
|
tls_config.alpn_protocols = vec![b"h3".to_vec()];
|
||||||
|
|
||||||
|
let quic_client_config = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
|
||||||
|
.expect("Failed to build QUIC client config");
|
||||||
|
let client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
|
||||||
|
|
||||||
|
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
|
||||||
|
.expect("Failed to create QUIC client endpoint");
|
||||||
|
endpoint.set_default_client_config(client_config);
|
||||||
|
endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that HTTP/3 response streams properly finish (FIN is received by client).
|
||||||
|
///
|
||||||
|
/// This is the critical regression test for the FIN bug: the proxy must send
|
||||||
|
/// a QUIC stream FIN after the response body so the client's `recv_data()`
|
||||||
|
/// returns `None` instead of hanging forever.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_h3_response_stream_finishes() {
|
||||||
|
let backend_port = next_port();
|
||||||
|
let proxy_port = next_port();
|
||||||
|
let body_text = "Hello from HTTP/3 backend! This body has a known length for testing.";
|
||||||
|
|
||||||
|
// 1. Start plain HTTP backend with known body + content-length
|
||||||
|
let _backend = start_http_server(backend_port, 200, body_text).await;
|
||||||
|
|
||||||
|
// 2. Generate self-signed cert and configure H3 route
|
||||||
|
let (cert_pem, key_pem) = generate_self_signed_cert("localhost");
|
||||||
|
let route = make_h3_route(proxy_port, "127.0.0.1", backend_port, &cert_pem, &key_pem);
|
||||||
|
|
||||||
|
let options = RustProxyOptions {
|
||||||
|
routes: vec![route],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// 3. Start proxy and wait for UDP bind
|
||||||
|
let mut proxy = RustProxy::new(options).unwrap();
|
||||||
|
proxy.start().await.unwrap();
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||||
|
|
||||||
|
// 4. Connect QUIC/H3 client
|
||||||
|
let endpoint = make_h3_client_endpoint();
|
||||||
|
let addr: std::net::SocketAddr = format!("127.0.0.1:{}", proxy_port).parse().unwrap();
|
||||||
|
let connection = endpoint
|
||||||
|
.connect(addr, "localhost")
|
||||||
|
.expect("Failed to initiate QUIC connection")
|
||||||
|
.await
|
||||||
|
.expect("QUIC handshake failed");
|
||||||
|
|
||||||
|
let (mut driver, mut send_request) = h3::client::new(
|
||||||
|
h3_quinn::Connection::new(connection),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("H3 connection setup failed");
|
||||||
|
|
||||||
|
// Drive the H3 connection in background
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = driver.wait_idle().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// 5. Send GET request
|
||||||
|
let req = http::Request::builder()
|
||||||
|
.method("GET")
|
||||||
|
.uri("https://localhost/")
|
||||||
|
.header("host", "localhost")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut stream = send_request.send_request(req).await
|
||||||
|
.expect("Failed to send H3 request");
|
||||||
|
stream.finish().await
|
||||||
|
.expect("Failed to finish sending H3 request body");
|
||||||
|
|
||||||
|
// 6. Read response headers
|
||||||
|
let resp = stream.recv_response().await
|
||||||
|
.expect("Failed to receive H3 response");
|
||||||
|
assert_eq!(resp.status(), http::StatusCode::OK,
|
||||||
|
"Expected 200 OK, got {}", resp.status());
|
||||||
|
|
||||||
|
// 7. Read body and verify stream ends (FIN received)
|
||||||
|
// This is the critical assertion: recv_data() must return None (stream ended)
|
||||||
|
// within the timeout, NOT hang forever waiting for a FIN that never arrives.
|
||||||
|
let result = with_timeout(async {
|
||||||
|
let mut total = 0usize;
|
||||||
|
while let Some(chunk) = stream.recv_data().await.expect("H3 data receive error") {
|
||||||
|
total += chunk.remaining();
|
||||||
|
}
|
||||||
|
// recv_data() returned None => stream ended (FIN received)
|
||||||
|
total
|
||||||
|
}, 10)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let bytes_received = result.expect(
|
||||||
|
"TIMEOUT: H3 stream never ended (FIN not received by client). \
|
||||||
|
The proxy sent all response data but failed to send the QUIC stream FIN."
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
bytes_received,
|
||||||
|
body_text.len(),
|
||||||
|
"Expected {} bytes, got {}",
|
||||||
|
body_text.len(),
|
||||||
|
bytes_received
|
||||||
|
);
|
||||||
|
|
||||||
|
// 8. Cleanup
|
||||||
|
endpoint.close(quinn::VarInt::from_u32(0), b"test done");
|
||||||
|
proxy.stop().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insecure TLS verifier that accepts any certificate (for tests only).
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct InsecureVerifier;
|
||||||
|
|
||||||
|
impl rustls::client::danger::ServerCertVerifier for InsecureVerifier {
|
||||||
|
fn verify_server_cert(
|
||||||
|
&self,
|
||||||
|
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||||
|
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||||
|
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||||
|
_ocsp_response: &[u8],
|
||||||
|
_now: rustls::pki_types::UnixTime,
|
||||||
|
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
||||||
|
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_tls12_signature(
|
||||||
|
&self,
|
||||||
|
_message: &[u8],
|
||||||
|
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||||
|
_dss: &rustls::DigitallySignedStruct,
|
||||||
|
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||||
|
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_tls13_signature(
|
||||||
|
&self,
|
||||||
|
_message: &[u8],
|
||||||
|
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||||
|
_dss: &rustls::DigitallySignedStruct,
|
||||||
|
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||||
|
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||||
|
vec![
|
||||||
|
rustls::SignatureScheme::RSA_PKCS1_SHA256,
|
||||||
|
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
|
||||||
|
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
|
||||||
|
rustls::SignatureScheme::ED25519,
|
||||||
|
rustls::SignatureScheme::RSA_PSS_SHA256,
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartproxy',
|
name: '@push.rocks/smartproxy',
|
||||||
version: '25.16.3',
|
version: '25.17.10',
|
||||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user