Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c63f6fcd5f | |||
| f3cd4d193e | |||
| 81de611255 | |||
| 91598b3be9 | |||
| 4e3c548012 | |||
| 1a2d7529db | |||
| 31514f54ae | |||
| 247653c9d0 | |||
| 07d88f6f6a | |||
| 4b64de2c67 | |||
| e8db7bc96d | |||
| 2621dea9fa | |||
| bb5b9b3d12 | |||
| d70c2d77ed | |||
| 4cf13c36f8 | |||
| 37c7233780 | |||
| 15d0a721d5 | |||
| af970c447e |
60
changelog.md
60
changelog.md
@@ -1,5 +1,65 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.4 - fix(rustproxy-http)
|
||||||
|
prevent HTTP/3 response body streaming from hanging on backend completion
|
||||||
|
|
||||||
|
- extract and track Content-Length before consuming the response body
|
||||||
|
- stop the HTTP/3 body loop when the stream reports end-of-stream or the expected byte count has been sent
|
||||||
|
- add a per-frame idle timeout to avoid indefinite waits on stalled or close-delimited backend bodies
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.3 - fix(repository)
|
||||||
|
no changes detected
|
||||||
|
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.2 - fix(rustproxy-http)
|
||||||
|
enable TLS connections for HTTP/3 upstream requests when backend re-encryption or TLS is configured
|
||||||
|
|
||||||
|
- Pass backend TLS client configuration into the HTTP/3 request handler.
|
||||||
|
- Detect TLS-required upstream targets using route and target TLS settings before connecting.
|
||||||
|
- Build backend request URIs with the correct http or https scheme to match the upstream connection.
|
||||||
|
|
||||||
|
## 2026-03-20 - 25.17.1 - fix(rustproxy-routing)
|
||||||
|
allow QUIC UDP TLS connections without SNI to match domain-restricted routes
|
||||||
|
|
||||||
|
- Exempts UDP transport from the no-SNI rejection logic because QUIC encrypts the TLS ClientHello and SNI is unavailable at accept time
|
||||||
|
- Adds regression tests to confirm QUIC route matching succeeds without SNI while TCP TLS without SNI remains rejected
|
||||||
|
|
||||||
|
## 2026-03-19 - 25.17.0 - feat(rustproxy-passthrough)
|
||||||
|
add PROXY protocol v2 client IP handling for UDP and QUIC listeners
|
||||||
|
|
||||||
|
- propagate trusted proxy IP configuration into UDP and QUIC listener managers
|
||||||
|
- extract and preserve real client addresses from PROXY protocol v2 headers for HTTP/3 and QUIC stream handling
|
||||||
|
- apply rate limiting, session limits, routing, and metrics using the resolved client IP while preserving correct proxy return-path routing
|
||||||
|
|
||||||
|
## 2026-03-19 - 25.16.3 - fix(rustproxy)
|
||||||
|
upgrade fallback UDP listeners to QUIC when TLS certificates become available
|
||||||
|
|
||||||
|
- Rebuild and apply QUIC TLS configuration during route and certificate updates instead of only when adding new UDP ports.
|
||||||
|
- Add logic to drain UDP sessions, stop raw fallback listeners, and start QUIC endpoints on existing ports once TLS is available.
|
||||||
|
- Retry QUIC endpoint creation during upgrade and fall back to rebinding raw UDP if the upgrade cannot complete.
|
||||||
|
|
||||||
|
## 2026-03-19 - 25.16.2 - fix(rustproxy-http)
|
||||||
|
cache backend Alt-Svc only from original upstream responses during protocol auto-detection
|
||||||
|
|
||||||
|
- Moves Alt-Svc discovery into streaming response construction so it reads backend headers before response filters inject client-facing Alt-Svc values
|
||||||
|
- Stores the protocol cache key in connection activity during auto-detect mode and clears it after HTTP/3 connection failure to avoid re-caching failed H3 routes
|
||||||
|
- Prevents fallback requests from reintroducing stale or self-injected Alt-Svc entries that could cause repeated H3 retry loops
|
||||||
|
|
||||||
|
## 2026-03-19 - 25.16.1 - fix(http-proxy)
|
||||||
|
avoid repeated HTTP/3 recaching after QUIC fallback and document backend protocol selection
|
||||||
|
|
||||||
|
- Suppress Alt-Svc HTTP/3 recaching after a failed QUIC backend connection to prevent repeated H3 timeout fallback loops
|
||||||
|
- Force an ALPN probe on TCP fallback so auto detection correctly reselects HTTP/2 or HTTP/1.1 after H3 connection failure
|
||||||
|
- Add README documentation for best-effort backendProtocol selection and supported protocol modes
|
||||||
|
|
||||||
|
## 2026-03-19 - 25.16.0 - feat(quic,http3)
|
||||||
|
add HTTP/3 proxy handling and hot-reload QUIC TLS configuration
|
||||||
|
|
||||||
|
- initialize and wire H3ProxyService into QUIC listeners so HTTP/3 requests are handled instead of being kept as placeholder connections
|
||||||
|
- add backend HTTP/3 support with protocol caching that stores Alt-Svc advertised H3 ports for auto-detection
|
||||||
|
- hot-swap TLS certificates across active QUIC endpoints and require terminating TLS for QUIC route validation
|
||||||
|
- document QUIC route setup with required TLS and ACME configuration
|
||||||
|
|
||||||
## 2026-03-19 - 25.15.0 - feat(readme)
|
## 2026-03-19 - 25.15.0 - feat(readme)
|
||||||
document UDP, QUIC, and HTTP/3 support in the README
|
document UDP, QUIC, and HTTP/3 support in the README
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartproxy",
|
"name": "@push.rocks/smartproxy",
|
||||||
"version": "25.15.0",
|
"version": "25.17.4",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
66
readme.md
66
readme.md
@@ -306,6 +306,10 @@ const quicRoute: IRouteConfig = {
|
|||||||
port: 8443,
|
port: 8443,
|
||||||
backendTransport: 'tcp' // 👈 Translate QUIC → TCP for backend
|
backendTransport: 'tcp' // 👈 Translate QUIC → TCP for backend
|
||||||
}],
|
}],
|
||||||
|
tls: {
|
||||||
|
mode: 'terminate',
|
||||||
|
certificate: 'auto' // 👈 QUIC requires TLS 1.3
|
||||||
|
},
|
||||||
udp: {
|
udp: {
|
||||||
quic: {
|
quic: {
|
||||||
enableHttp3: true,
|
enableHttp3: true,
|
||||||
@@ -318,9 +322,47 @@ const quicRoute: IRouteConfig = {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const proxy = new SmartProxy({ routes: [quicRoute] });
|
const proxy = new SmartProxy({
|
||||||
|
acme: { email: 'ssl@example.com' },
|
||||||
|
routes: [quicRoute]
|
||||||
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### 🚄 Best-Effort Backend Protocol (H3 > H2 > H1)
|
||||||
|
|
||||||
|
SmartProxy automatically uses the **highest protocol your backend supports** for HTTP requests. The backend protocol is independent of the client protocol — a client using HTTP/1.1 can be forwarded over HTTP/3 to the backend, and vice versa.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const route: IRouteConfig = {
|
||||||
|
name: 'auto-protocol',
|
||||||
|
match: { ports: 443, domains: 'app.example.com' },
|
||||||
|
action: {
|
||||||
|
type: 'forward',
|
||||||
|
targets: [{ host: 'backend', port: 8443 }],
|
||||||
|
tls: { mode: 'terminate', certificate: 'auto' },
|
||||||
|
options: {
|
||||||
|
backendProtocol: 'auto' // 👈 Default — best-effort selection
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
**How protocol discovery works (browser model):**
|
||||||
|
|
||||||
|
1. First request → TLS ALPN probe detects H2 or H1
|
||||||
|
2. Backend response inspected for `Alt-Svc: h3=":port"` header
|
||||||
|
3. If H3 advertised → cached and used for subsequent requests via QUIC
|
||||||
|
4. Graceful fallback: H3 failure → H2 → H1 with automatic cache invalidation
|
||||||
|
|
||||||
|
| `backendProtocol` | Behavior |
|
||||||
|
|---|---|
|
||||||
|
| `'auto'` (default) | Best-effort: H3 > H2 > H1 with Alt-Svc discovery |
|
||||||
|
| `'http1'` | Always HTTP/1.1 |
|
||||||
|
| `'http2'` | Always HTTP/2 (hard-fail if unsupported) |
|
||||||
|
| `'http3'` | Always HTTP/3 via QUIC (hard-fail if unsupported) |
|
||||||
|
|
||||||
|
> **Note:** WebSocket upgrades always use HTTP/1.1 to the backend regardless of `backendProtocol`, since there's no performance benefit from H2/H3 Extended CONNECT for tunneled connections, and backend support is rare.
|
||||||
|
|
||||||
### 🔁 Dual-Stack TCP + UDP Route
|
### 🔁 Dual-Stack TCP + UDP Route
|
||||||
|
|
||||||
Listen on both TCP and UDP with a single route — handle each transport with its own handler:
|
Listen on both TCP and UDP with a single route — handle each transport with its own handler:
|
||||||
@@ -769,6 +811,28 @@ interface IRouteLoadBalancing {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Backend Protocol Options
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Set on action.options
|
||||||
|
{
|
||||||
|
action: {
|
||||||
|
type: 'forward',
|
||||||
|
targets: [...],
|
||||||
|
options: {
|
||||||
|
backendProtocol: 'auto' | 'http1' | 'http2' | 'http3'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
| Value | Backend Behavior |
|
||||||
|
|-------|-----------------|
|
||||||
|
| `'auto'` | Best-effort: discovers H3 via Alt-Svc, probes H2 via ALPN, falls back to H1 |
|
||||||
|
| `'http1'` | Always HTTP/1.1 (no ALPN probe) |
|
||||||
|
| `'http2'` | Always HTTP/2 (hard-fail if handshake fails) |
|
||||||
|
| `'http3'` | Always HTTP/3 over QUIC (3s connect timeout, hard-fail if unreachable) |
|
||||||
|
|
||||||
### UDP & QUIC Options
|
### UDP & QUIC Options
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
//! and forwards them to backends using the same routing and pool infrastructure
|
//! and forwards them to backends using the same routing and pool infrastructure
|
||||||
//! as the HTTP/1+2 proxy.
|
//! as the HTTP/1+2 proxy.
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
@@ -35,7 +36,6 @@ pub struct H3ProxyService {
|
|||||||
protocol_cache: Arc<ProtocolCache>,
|
protocol_cache: Arc<ProtocolCache>,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
upstream_selector: UpstreamSelector,
|
upstream_selector: UpstreamSelector,
|
||||||
#[allow(dead_code)]
|
|
||||||
backend_tls_config: Arc<rustls::ClientConfig>,
|
backend_tls_config: Arc<rustls::ClientConfig>,
|
||||||
connect_timeout: Duration,
|
connect_timeout: Duration,
|
||||||
}
|
}
|
||||||
@@ -61,13 +61,17 @@ impl H3ProxyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle an accepted QUIC connection as HTTP/3.
|
/// Handle an accepted QUIC connection as HTTP/3.
|
||||||
|
///
|
||||||
|
/// If `real_client_addr` is provided (from PROXY protocol), it overrides
|
||||||
|
/// `connection.remote_address()` for client IP attribution.
|
||||||
pub async fn handle_connection(
|
pub async fn handle_connection(
|
||||||
&self,
|
&self,
|
||||||
connection: quinn::Connection,
|
connection: quinn::Connection,
|
||||||
_fallback_route: &RouteConfig,
|
_fallback_route: &RouteConfig,
|
||||||
port: u16,
|
port: u16,
|
||||||
|
real_client_addr: Option<SocketAddr>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let remote_addr = connection.remote_address();
|
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
||||||
|
|
||||||
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
||||||
@@ -93,12 +97,14 @@ impl H3ProxyService {
|
|||||||
let rm = self.route_manager.load();
|
let rm = self.route_manager.load();
|
||||||
let pool = Arc::clone(&self.connection_pool);
|
let pool = Arc::clone(&self.connection_pool);
|
||||||
let metrics = Arc::clone(&self.metrics);
|
let metrics = Arc::clone(&self.metrics);
|
||||||
|
let backend_tls = Arc::clone(&self.backend_tls_config);
|
||||||
let connect_timeout = self.connect_timeout;
|
let connect_timeout = self.connect_timeout;
|
||||||
let client_ip = client_ip.clone();
|
let client_ip = client_ip.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_h3_request(
|
if let Err(e) = handle_h3_request(
|
||||||
request, stream, port, &client_ip, &rm, &pool, &metrics, connect_timeout,
|
request, stream, port, &client_ip, &rm, &pool, &metrics,
|
||||||
|
&backend_tls, connect_timeout,
|
||||||
).await {
|
).await {
|
||||||
debug!("HTTP/3 request error from {}: {}", client_ip, e);
|
debug!("HTTP/3 request error from {}: {}", client_ip, e);
|
||||||
}
|
}
|
||||||
@@ -128,6 +134,7 @@ async fn handle_h3_request(
|
|||||||
route_manager: &RouteManager,
|
route_manager: &RouteManager,
|
||||||
_connection_pool: &ConnectionPool,
|
_connection_pool: &ConnectionPool,
|
||||||
metrics: &MetricsCollector,
|
metrics: &MetricsCollector,
|
||||||
|
backend_tls_config: &Arc<rustls::ClientConfig>,
|
||||||
connect_timeout: Duration,
|
connect_timeout: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let method = request.method().clone();
|
let method = request.method().clone();
|
||||||
@@ -168,7 +175,15 @@ async fn handle_h3_request(
|
|||||||
let backend_port = target.port.resolve(port);
|
let backend_port = target.port.resolve(port);
|
||||||
let backend_addr = format!("{}:{}", backend_host, backend_port);
|
let backend_addr = format!("{}:{}", backend_host, backend_port);
|
||||||
|
|
||||||
// Connect to backend via TCP HTTP/1.1 with timeout
|
// Determine if backend requires TLS (same logic as proxy_service.rs)
|
||||||
|
let mut use_tls = target.tls.is_some();
|
||||||
|
if let Some(ref tls) = route.action.tls {
|
||||||
|
if tls.mode == rustproxy_config::TlsMode::TerminateAndReencrypt {
|
||||||
|
use_tls = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to backend via TCP with timeout
|
||||||
let tcp_stream = tokio::time::timeout(
|
let tcp_stream = tokio::time::timeout(
|
||||||
connect_timeout,
|
connect_timeout,
|
||||||
tokio::net::TcpStream::connect(&backend_addr),
|
tokio::net::TcpStream::connect(&backend_addr),
|
||||||
@@ -178,15 +193,27 @@ async fn handle_h3_request(
|
|||||||
|
|
||||||
let _ = tcp_stream.set_nodelay(true);
|
let _ = tcp_stream.set_nodelay(true);
|
||||||
|
|
||||||
let io = hyper_util::rt::TokioIo::new(tcp_stream);
|
// Branch: wrap in TLS if backend requires it, then HTTP/1.1 handshake.
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
|
// hyper's SendRequest<B> is NOT generic over the IO type, so both branches
|
||||||
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
// produce the same type and can be unified.
|
||||||
|
let mut sender = if use_tls {
|
||||||
tokio::spawn(async move {
|
let connector = tokio_rustls::TlsConnector::from(Arc::clone(backend_tls_config));
|
||||||
if let Err(e) = conn.await {
|
let server_name = rustls::pki_types::ServerName::try_from(backend_host.to_string())
|
||||||
debug!("Backend connection closed: {}", e);
|
.map_err(|e| anyhow::anyhow!("Invalid backend SNI '{}': {}", backend_host, e))?;
|
||||||
}
|
let tls_stream = connector.connect(server_name, tcp_stream).await
|
||||||
});
|
.map_err(|e| anyhow::anyhow!("Backend TLS handshake to {} failed: {}", backend_addr, e))?;
|
||||||
|
let io = hyper_util::rt::TokioIo::new(tls_stream);
|
||||||
|
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
||||||
|
tokio::spawn(async move { let _ = conn.await; });
|
||||||
|
sender
|
||||||
|
} else {
|
||||||
|
let io = hyper_util::rt::TokioIo::new(tcp_stream);
|
||||||
|
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
||||||
|
tokio::spawn(async move { let _ = conn.await; });
|
||||||
|
sender
|
||||||
|
};
|
||||||
|
|
||||||
// Stream request body from H3 client to backend via an mpsc channel.
|
// Stream request body from H3 client to backend via an mpsc channel.
|
||||||
// This avoids buffering the entire request body in memory.
|
// This avoids buffering the entire request body in memory.
|
||||||
@@ -209,7 +236,7 @@ async fn handle_h3_request(
|
|||||||
|
|
||||||
// Create a body that polls from the mpsc receiver
|
// Create a body that polls from the mpsc receiver
|
||||||
let body = H3RequestBody { receiver: body_rx };
|
let body = H3RequestBody { receiver: body_rx };
|
||||||
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
|
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body, use_tls)?;
|
||||||
|
|
||||||
let response = sender.send_request(backend_req).await
|
let response = sender.send_request(backend_req).await
|
||||||
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
||||||
@@ -232,6 +259,12 @@ async fn handle_h3_request(
|
|||||||
h3_response = h3_response.header(name, value);
|
h3_response = h3_response.header(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract content-length for body loop termination (must be before into_body())
|
||||||
|
let content_length: Option<u64> = response.headers()
|
||||||
|
.get(hyper::header::CONTENT_LENGTH)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.and_then(|s| s.parse().ok());
|
||||||
|
|
||||||
// Add Alt-Svc for HTTP/3 advertisement
|
// Add Alt-Svc for HTTP/3 advertisement
|
||||||
let alt_svc = route.action.udp.as_ref()
|
let alt_svc = route.action.udp.as_ref()
|
||||||
.and_then(|u| u.quic.as_ref())
|
.and_then(|u| u.quic.as_ref())
|
||||||
@@ -252,21 +285,52 @@ async fn handle_h3_request(
|
|||||||
|
|
||||||
// Stream response body back
|
// Stream response body back
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
|
use http_body::Body as _;
|
||||||
let mut body = response.into_body();
|
let mut body = response.into_body();
|
||||||
let mut total_bytes_out: u64 = 0;
|
let mut total_bytes_out: u64 = 0;
|
||||||
while let Some(frame) = body.frame().await {
|
|
||||||
match frame {
|
// Per-frame idle timeout: if no frame arrives within this duration, assume
|
||||||
Ok(frame) => {
|
// the body is complete (or the backend has stalled). This prevents indefinite
|
||||||
|
// hangs on close-delimited bodies or when hyper's internal trailers oneshot
|
||||||
|
// never resolves after all data has been received.
|
||||||
|
const FRAME_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Layer 1: If the body already knows it is finished (Content-Length
|
||||||
|
// bodies track remaining bytes internally), break immediately to
|
||||||
|
// avoid blocking on hyper's internal trailers oneshot.
|
||||||
|
if body.is_end_stream() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Layer 3: Per-frame idle timeout safety net
|
||||||
|
match tokio::time::timeout(FRAME_IDLE_TIMEOUT, body.frame()).await {
|
||||||
|
Ok(Some(Ok(frame))) => {
|
||||||
if let Some(data) = frame.data_ref() {
|
if let Some(data) = frame.data_ref() {
|
||||||
total_bytes_out += data.len() as u64;
|
total_bytes_out += data.len() as u64;
|
||||||
stream.send_data(Bytes::copy_from_slice(data)).await
|
stream.send_data(Bytes::copy_from_slice(data)).await
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
||||||
|
|
||||||
|
// Layer 2: Content-Length byte count check
|
||||||
|
if let Some(cl) = content_length {
|
||||||
|
if total_bytes_out >= cl {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Ok(Some(Err(e))) => {
|
||||||
warn!("Backend body read error: {}", e);
|
warn!("Backend body read error: {}", e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Ok(None) => break, // Body ended naturally
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
"H3 body frame idle timeout ({:?}) after {} bytes; finishing stream",
|
||||||
|
FRAME_IDLE_TIMEOUT, total_bytes_out
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,10 +353,12 @@ fn build_backend_request<B>(
|
|||||||
host: &str,
|
host: &str,
|
||||||
original_request: &hyper::Request<()>,
|
original_request: &hyper::Request<()>,
|
||||||
body: B,
|
body: B,
|
||||||
|
use_tls: bool,
|
||||||
) -> anyhow::Result<hyper::Request<B>> {
|
) -> anyhow::Result<hyper::Request<B>> {
|
||||||
|
let scheme = if use_tls { "https" } else { "http" };
|
||||||
let mut req = hyper::Request::builder()
|
let mut req = hyper::Request::builder()
|
||||||
.method(method)
|
.method(method)
|
||||||
.uri(format!("http://{}{}", backend_addr, path))
|
.uri(format!("{}://{}{}", scheme, backend_addr, path))
|
||||||
.header("host", host);
|
.header("host", host);
|
||||||
|
|
||||||
// Forward non-pseudo headers
|
// Forward non-pseudo headers
|
||||||
|
|||||||
@@ -1,8 +1,11 @@
|
|||||||
//! Bounded, TTL-based protocol detection cache for HTTP/2 auto-detection.
|
//! Bounded, TTL-based protocol detection cache for backend protocol auto-detection.
|
||||||
//!
|
//!
|
||||||
//! Caches the ALPN-negotiated protocol (H1 or H2) per backend endpoint and requested
|
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
|
||||||
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
|
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
|
||||||
//! frontend domains share the same backend but differ in HTTP/2 support.
|
//! frontend domains share the same backend but differ in protocol support.
|
||||||
|
//!
|
||||||
|
//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are
|
||||||
|
//! parsed and cached, including the advertised H3 port (which may differ from TCP).
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@@ -29,6 +32,14 @@ pub enum DetectedProtocol {
|
|||||||
H3,
|
H3,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Result of a protocol cache lookup.
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct CachedProtocol {
|
||||||
|
pub protocol: DetectedProtocol,
|
||||||
|
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
||||||
|
pub h3_port: Option<u16>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Key for the protocol cache: (host, port, requested_host).
|
/// Key for the protocol cache: (host, port, requested_host).
|
||||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||||
pub struct ProtocolCacheKey {
|
pub struct ProtocolCacheKey {
|
||||||
@@ -43,6 +54,8 @@ pub struct ProtocolCacheKey {
|
|||||||
struct CachedEntry {
|
struct CachedEntry {
|
||||||
protocol: DetectedProtocol,
|
protocol: DetectedProtocol,
|
||||||
detected_at: Instant,
|
detected_at: Instant,
|
||||||
|
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
||||||
|
h3_port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bounded, TTL-based protocol detection cache.
|
/// Bounded, TTL-based protocol detection cache.
|
||||||
@@ -75,11 +88,14 @@ impl ProtocolCache {
|
|||||||
|
|
||||||
/// Look up the cached protocol for a backend endpoint.
|
/// Look up the cached protocol for a backend endpoint.
|
||||||
/// Returns `None` if not cached or expired (caller should probe via ALPN).
|
/// Returns `None` if not cached or expired (caller should probe via ALPN).
|
||||||
pub fn get(&self, key: &ProtocolCacheKey) -> Option<DetectedProtocol> {
|
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
|
||||||
let entry = self.cache.get(key)?;
|
let entry = self.cache.get(key)?;
|
||||||
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
|
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
|
||||||
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
|
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
|
||||||
Some(entry.protocol)
|
Some(CachedProtocol {
|
||||||
|
protocol: entry.protocol,
|
||||||
|
h3_port: entry.h3_port,
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
// Expired — remove and return None to trigger re-probe
|
// Expired — remove and return None to trigger re-probe
|
||||||
drop(entry); // release DashMap ref before remove
|
drop(entry); // release DashMap ref before remove
|
||||||
@@ -91,6 +107,16 @@ impl ProtocolCache {
|
|||||||
/// Insert a detected protocol into the cache.
|
/// Insert a detected protocol into the cache.
|
||||||
/// If the cache is at capacity, evict the oldest entry first.
|
/// If the cache is at capacity, evict the oldest entry first.
|
||||||
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
|
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
|
||||||
|
self.insert_with_h3_port(key, protocol, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert an H3 detection result with the Alt-Svc advertised port.
|
||||||
|
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) {
|
||||||
|
self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a protocol detection result with an optional H3 port.
|
||||||
|
fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
|
||||||
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
|
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
|
||||||
// Evict the oldest entry to stay within bounds
|
// Evict the oldest entry to stay within bounds
|
||||||
let oldest = self.cache.iter()
|
let oldest = self.cache.iter()
|
||||||
@@ -103,6 +129,7 @@ impl ProtocolCache {
|
|||||||
self.cache.insert(key, CachedEntry {
|
self.cache.insert(key, CachedEntry {
|
||||||
protocol,
|
protocol,
|
||||||
detected_at: Instant::now(),
|
detected_at: Instant::now(),
|
||||||
|
h3_port,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,10 @@ struct ConnActivity {
|
|||||||
/// increments on creation and decrements on Drop, keeping the watchdog aware that
|
/// increments on creation and decrements on Drop, keeping the watchdog aware that
|
||||||
/// a response body is still streaming after the request handler has returned.
|
/// a response body is still streaming after the request handler has returned.
|
||||||
active_requests: Option<Arc<AtomicU64>>,
|
active_requests: Option<Arc<AtomicU64>>,
|
||||||
|
/// Protocol cache key for Alt-Svc discovery. When set, `build_streaming_response`
|
||||||
|
/// checks the backend's original response headers for Alt-Svc before our
|
||||||
|
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
|
||||||
|
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Default upstream connect timeout (30 seconds).
|
/// Default upstream connect timeout (30 seconds).
|
||||||
@@ -58,6 +62,18 @@ const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::
|
|||||||
/// Default WebSocket max lifetime (24 hours).
|
/// Default WebSocket max lifetime (24 hours).
|
||||||
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
|
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
|
||||||
|
|
||||||
|
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
|
||||||
|
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
|
||||||
|
|
||||||
|
/// Protocol decision for backend connection.
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum ProtocolDecision {
|
||||||
|
H1,
|
||||||
|
H2,
|
||||||
|
H3 { port: u16 },
|
||||||
|
AlpnProbe,
|
||||||
|
}
|
||||||
|
|
||||||
/// RAII guard that decrements the active request counter on drop.
|
/// RAII guard that decrements the active request counter on drop.
|
||||||
/// Ensures the counter is correct even if the request handler panics.
|
/// Ensures the counter is correct even if the request handler panics.
|
||||||
struct ActiveRequestGuard {
|
struct ActiveRequestGuard {
|
||||||
@@ -190,6 +206,9 @@ pub struct HttpProxyService {
|
|||||||
ws_inactivity_timeout: std::time::Duration,
|
ws_inactivity_timeout: std::time::Duration,
|
||||||
/// WebSocket maximum connection lifetime.
|
/// WebSocket maximum connection lifetime.
|
||||||
ws_max_lifetime: std::time::Duration,
|
ws_max_lifetime: std::time::Duration,
|
||||||
|
/// Shared QUIC client endpoint for outbound H3 backend connections.
|
||||||
|
/// Lazily initialized on first H3 backend attempt.
|
||||||
|
quinn_client_endpoint: Arc<quinn::Endpoint>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpProxyService {
|
impl HttpProxyService {
|
||||||
@@ -209,6 +228,7 @@ impl HttpProxyService {
|
|||||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||||
|
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,6 +253,7 @@ impl HttpProxyService {
|
|||||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||||
|
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -324,7 +345,7 @@ impl HttpProxyService {
|
|||||||
let cn = cancel_inner.clone();
|
let cn = cancel_inner.clone();
|
||||||
let la = Arc::clone(&la_inner);
|
let la = Arc::clone(&la_inner);
|
||||||
let st = start;
|
let st = start;
|
||||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)) };
|
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
|
||||||
async move {
|
async move {
|
||||||
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
||||||
// Mark request end — update activity timestamp before guard drops
|
// Mark request end — update activity timestamp before guard drops
|
||||||
@@ -401,7 +422,7 @@ impl HttpProxyService {
|
|||||||
peer_addr: std::net::SocketAddr,
|
peer_addr: std::net::SocketAddr,
|
||||||
port: u16,
|
port: u16,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
conn_activity: ConnActivity,
|
mut conn_activity: ConnActivity,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
let host = req.headers()
|
let host = req.headers()
|
||||||
.get("host")
|
.get("host")
|
||||||
@@ -645,37 +666,101 @@ impl HttpProxyService {
|
|||||||
|
|
||||||
// --- Resolve protocol decision based on backend protocol mode ---
|
// --- Resolve protocol decision based on backend protocol mode ---
|
||||||
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
|
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
|
||||||
let (use_h2, needs_alpn_probe) = match backend_protocol_mode {
|
let protocol_cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||||
rustproxy_config::BackendProtocol::Http1 => (false, false),
|
host: upstream.host.clone(),
|
||||||
rustproxy_config::BackendProtocol::Http2 => (true, false),
|
port: upstream.port,
|
||||||
rustproxy_config::BackendProtocol::Http3 => {
|
requested_host: host.clone(),
|
||||||
// HTTP/3 (QUIC) backend connections not yet implemented — fall back to H1
|
};
|
||||||
warn!("backendProtocol 'http3' not yet implemented, falling back to http1");
|
let protocol_decision = match backend_protocol_mode {
|
||||||
(false, false)
|
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
|
||||||
}
|
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
|
||||||
|
rustproxy_config::BackendProtocol::Http3 => ProtocolDecision::H3 { port: upstream.port },
|
||||||
rustproxy_config::BackendProtocol::Auto => {
|
rustproxy_config::BackendProtocol::Auto => {
|
||||||
if !upstream.use_tls {
|
if !upstream.use_tls {
|
||||||
// No ALPN without TLS — default to H1
|
// No ALPN without TLS, no QUIC without TLS — default to H1
|
||||||
(false, false)
|
ProtocolDecision::H1
|
||||||
} else {
|
} else {
|
||||||
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
match self.protocol_cache.get(&protocol_cache_key) {
|
||||||
host: upstream.host.clone(),
|
Some(cached) => match cached.protocol {
|
||||||
port: upstream.port,
|
crate::protocol_cache::DetectedProtocol::H3 => {
|
||||||
requested_host: host.clone(),
|
if let Some(h3_port) = cached.h3_port {
|
||||||
};
|
ProtocolDecision::H3 { port: h3_port }
|
||||||
match self.protocol_cache.get(&cache_key) {
|
} else {
|
||||||
Some(crate::protocol_cache::DetectedProtocol::H2) => (true, false),
|
// H3 cached but no port — fall back to ALPN probe
|
||||||
Some(crate::protocol_cache::DetectedProtocol::H1) => (false, false),
|
ProtocolDecision::AlpnProbe
|
||||||
Some(crate::protocol_cache::DetectedProtocol::H3) => {
|
}
|
||||||
// H3 cached but we're on TCP — fall back to H2 probe
|
}
|
||||||
(false, true)
|
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
|
||||||
}
|
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
|
||||||
None => (false, true), // needs ALPN probe
|
},
|
||||||
|
None => ProtocolDecision::AlpnProbe,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Derive legacy flags for the existing H1/H2 connection path
|
||||||
|
let (use_h2, mut needs_alpn_probe) = match &protocol_decision {
|
||||||
|
ProtocolDecision::H1 => (false, false),
|
||||||
|
ProtocolDecision::H2 => (true, false),
|
||||||
|
ProtocolDecision::H3 { .. } => (false, false), // H3 path handled separately below
|
||||||
|
ProtocolDecision::AlpnProbe => (false, true),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Set Alt-Svc cache key on conn_activity so build_streaming_response can check
|
||||||
|
// the backend's original Alt-Svc header before ResponseFilter injects our own.
|
||||||
|
if is_auto_detect_mode {
|
||||||
|
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- H3 path: try QUIC connection before TCP ---
|
||||||
|
if let ProtocolDecision::H3 { port: h3_port } = protocol_decision {
|
||||||
|
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||||
|
host: upstream.host.clone(),
|
||||||
|
port: h3_port,
|
||||||
|
use_tls: true,
|
||||||
|
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Try H3 pool checkout first
|
||||||
|
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
|
||||||
|
self.metrics.backend_pool_hit(&upstream_key);
|
||||||
|
let result = self.forward_h3(
|
||||||
|
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||||
|
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||||
|
).await;
|
||||||
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try fresh QUIC connection
|
||||||
|
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||||
|
Ok(quic_conn) => {
|
||||||
|
self.metrics.backend_pool_miss(&upstream_key);
|
||||||
|
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
|
||||||
|
let result = self.forward_h3(
|
||||||
|
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||||
|
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||||
|
).await;
|
||||||
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(backend = %upstream_key, error = %e,
|
||||||
|
"H3 backend connect failed, falling back to H2/H1");
|
||||||
|
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
|
||||||
|
// from our own injected Alt-Svc header or a stale backend Alt-Svc
|
||||||
|
conn_activity.alt_svc_cache_key = None;
|
||||||
|
// Force ALPN probe on TCP fallback so we correctly detect H2 vs H1
|
||||||
|
// (don't cache anything yet — let the ALPN probe decide)
|
||||||
|
if is_auto_detect_mode && upstream.use_tls {
|
||||||
|
needs_alpn_probe = true;
|
||||||
|
}
|
||||||
|
// Fall through to TCP path
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// --- Connection pooling: try reusing an existing connection first ---
|
// --- Connection pooling: try reusing an existing connection first ---
|
||||||
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
|
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
|
||||||
if !needs_alpn_probe {
|
if !needs_alpn_probe {
|
||||||
@@ -870,6 +955,7 @@ impl HttpProxyService {
|
|||||||
};
|
};
|
||||||
self.upstream_selector.connection_ended(&upstream_key);
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
self.metrics.backend_connection_closed(&upstream_key);
|
self.metrics.backend_connection_closed(&upstream_key);
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1668,6 +1754,19 @@ impl HttpProxyService {
|
|||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
let (resp_parts, resp_body) = upstream_response.into_parts();
|
let (resp_parts, resp_body) = upstream_response.into_parts();
|
||||||
|
|
||||||
|
// Check for Alt-Svc in the backend's ORIGINAL response headers BEFORE
|
||||||
|
// ResponseFilter::apply_headers runs — the filter may inject our own Alt-Svc
|
||||||
|
// for client-facing HTTP/3 advertisement, which must not be confused with
|
||||||
|
// backend-originated Alt-Svc.
|
||||||
|
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
|
||||||
|
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||||
|
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||||
|
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
|
||||||
|
self.protocol_cache.insert_h3(cache_key.clone(), h3_port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut response = Response::builder()
|
let mut response = Response::builder()
|
||||||
.status(resp_parts.status);
|
.status(resp_parts.status);
|
||||||
|
|
||||||
@@ -2393,6 +2492,252 @@ impl HttpProxyService {
|
|||||||
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||||
Arc::new(config)
|
Arc::new(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a shared QUIC client endpoint for outbound H3 backend connections.
|
||||||
|
fn create_quinn_client_endpoint() -> quinn::Endpoint {
|
||||||
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
let mut tls_config = rustls::ClientConfig::builder()
|
||||||
|
.dangerous()
|
||||||
|
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
|
||||||
|
.with_no_client_auth();
|
||||||
|
tls_config.alpn_protocols = vec![b"h3".to_vec()];
|
||||||
|
|
||||||
|
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
|
||||||
|
.expect("Failed to create QUIC client crypto config");
|
||||||
|
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
|
||||||
|
|
||||||
|
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
|
||||||
|
.expect("Failed to create QUIC client endpoint");
|
||||||
|
endpoint.set_default_client_config(client_config);
|
||||||
|
endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connect to a backend via QUIC (H3).
|
||||||
|
async fn connect_quic_backend(
|
||||||
|
&self,
|
||||||
|
host: &str,
|
||||||
|
port: u16,
|
||||||
|
) -> Result<quinn::Connection, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let addr = tokio::net::lookup_host(format!("{}:{}", host, port))
|
||||||
|
.await?
|
||||||
|
.next()
|
||||||
|
.ok_or("DNS resolution returned no addresses")?;
|
||||||
|
|
||||||
|
let server_name = host.to_string();
|
||||||
|
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
|
||||||
|
|
||||||
|
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
|
||||||
|
.map_err(|_| "QUIC connect timeout (3s)")??;
|
||||||
|
|
||||||
|
debug!("QUIC backend connection established to {}:{}", host, port);
|
||||||
|
Ok(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Forward request to backend via HTTP/3 over QUIC.
|
||||||
|
async fn forward_h3(
|
||||||
|
&self,
|
||||||
|
quic_conn: quinn::Connection,
|
||||||
|
parts: hyper::http::request::Parts,
|
||||||
|
body: Incoming,
|
||||||
|
upstream_headers: hyper::HeaderMap,
|
||||||
|
upstream_path: &str,
|
||||||
|
route: &rustproxy_config::RouteConfig,
|
||||||
|
route_id: Option<&str>,
|
||||||
|
source_ip: &str,
|
||||||
|
pool_key: &crate::connection_pool::PoolKey,
|
||||||
|
domain: &str,
|
||||||
|
conn_activity: &ConnActivity,
|
||||||
|
backend_key: &str,
|
||||||
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
|
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
|
||||||
|
let (mut driver, mut send_request) = match h3::client::new(h3_quinn_conn).await {
|
||||||
|
Ok(pair) => pair,
|
||||||
|
Err(e) => {
|
||||||
|
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
|
||||||
|
self.metrics.backend_handshake_error(backend_key);
|
||||||
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Spawn the h3 connection driver
|
||||||
|
let driver_pool = Arc::clone(&self.connection_pool);
|
||||||
|
let driver_pool_key = pool_key.clone();
|
||||||
|
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||||
|
let driver_gen = Arc::clone(&gen_holder);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
|
||||||
|
debug!("H3 connection driver closed: {:?}", close_err);
|
||||||
|
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
if g != u64::MAX {
|
||||||
|
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Build the H3 request
|
||||||
|
let uri = hyper::Uri::builder()
|
||||||
|
.scheme("https")
|
||||||
|
.authority(domain)
|
||||||
|
.path_and_query(upstream_path)
|
||||||
|
.build()
|
||||||
|
.unwrap_or_else(|_| upstream_path.parse().unwrap_or_default());
|
||||||
|
|
||||||
|
let mut h3_req = hyper::Request::builder()
|
||||||
|
.method(parts.method.clone())
|
||||||
|
.uri(uri);
|
||||||
|
|
||||||
|
if let Some(headers) = h3_req.headers_mut() {
|
||||||
|
*headers = upstream_headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
let h3_req = h3_req.body(()).unwrap();
|
||||||
|
|
||||||
|
// Send the request
|
||||||
|
let mut stream = match send_request.send_request(h3_req).await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
error!(backend = %backend_key, domain = %domain, error = %e, "H3 send_request failed");
|
||||||
|
self.metrics.backend_request_error(backend_key);
|
||||||
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 request failed"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Stream request body
|
||||||
|
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
||||||
|
let sip: Arc<str> = Arc::from(source_ip);
|
||||||
|
|
||||||
|
{
|
||||||
|
use http_body_util::BodyExt;
|
||||||
|
let mut body = body;
|
||||||
|
while let Some(frame) = body.frame().await {
|
||||||
|
match frame {
|
||||||
|
Ok(frame) => {
|
||||||
|
if let Some(data) = frame.data_ref() {
|
||||||
|
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
|
||||||
|
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
|
||||||
|
error!(backend = %backend_key, error = %e, "H3 send_data failed");
|
||||||
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(backend = %backend_key, error = %e, "Client body read error during H3 forward");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Signal end of body
|
||||||
|
stream.finish().await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read response
|
||||||
|
let h3_response = match stream.recv_response().await {
|
||||||
|
Ok(resp) => resp,
|
||||||
|
Err(e) => {
|
||||||
|
error!(backend = %backend_key, domain = %domain, error = %e, "H3 recv_response failed");
|
||||||
|
self.metrics.backend_request_error(backend_key);
|
||||||
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 response failed"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Build the response for the client
|
||||||
|
let status = h3_response.status();
|
||||||
|
let mut response = Response::builder().status(status);
|
||||||
|
|
||||||
|
if let Some(headers) = response.headers_mut() {
|
||||||
|
for (name, value) in h3_response.headers() {
|
||||||
|
let n = name.as_str();
|
||||||
|
// Skip hop-by-hop headers
|
||||||
|
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
headers.insert(name.clone(), value.clone());
|
||||||
|
}
|
||||||
|
ResponseFilter::apply_headers(route, headers, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream response body back via an adapter
|
||||||
|
let h3_body = H3ClientResponseBody { stream };
|
||||||
|
let counting_body = CountingBody::new(
|
||||||
|
h3_body,
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
rid,
|
||||||
|
Some(sip),
|
||||||
|
Direction::Out,
|
||||||
|
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
||||||
|
|
||||||
|
let counting_body = if let Some(ref ar) = conn_activity.active_requests {
|
||||||
|
counting_body.with_active_requests(Arc::clone(ar))
|
||||||
|
} else {
|
||||||
|
counting_body
|
||||||
|
};
|
||||||
|
|
||||||
|
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||||
|
|
||||||
|
// Register connection in pool on success
|
||||||
|
if status != StatusCode::BAD_GATEWAY {
|
||||||
|
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
|
||||||
|
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.metrics.set_backend_protocol(backend_key, "h3");
|
||||||
|
Ok(response.body(body).unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse an Alt-Svc header value to extract the H3 port.
|
||||||
|
/// Handles formats like `h3=":443"; ma=86400` and `h3=":8443", h2=":443"`.
|
||||||
|
fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
|
||||||
|
for directive in header_value.split(',') {
|
||||||
|
let directive = directive.trim();
|
||||||
|
// Match h3=":<port>" or h3-29=":<port>" etc.
|
||||||
|
if directive.starts_with("h3=") || directive.starts_with("h3-") {
|
||||||
|
// Find the port in ":<port>"
|
||||||
|
if let Some(start) = directive.find("\":") {
|
||||||
|
let rest = &directive[start + 2..];
|
||||||
|
if let Some(end) = rest.find('"') {
|
||||||
|
if let Ok(port) = rest[..end].parse::<u16>() {
|
||||||
|
return Some(port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Response body adapter for H3 client responses.
|
||||||
|
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
|
||||||
|
struct H3ClientResponseBody {
|
||||||
|
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl http_body::Body for H3ClientResponseBody {
|
||||||
|
type Data = Bytes;
|
||||||
|
type Error = hyper::Error;
|
||||||
|
|
||||||
|
fn poll_frame(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||||||
|
// h3's recv_data is async, so we need to poll it manually.
|
||||||
|
// Use a small future to poll the recv_data call.
|
||||||
|
use std::future::Future;
|
||||||
|
let mut fut = Box::pin(self.stream.recv_data());
|
||||||
|
match fut.as_mut().poll(_cx) {
|
||||||
|
Poll::Ready(Ok(Some(mut buf))) => {
|
||||||
|
use bytes::Buf;
|
||||||
|
let data = Bytes::copy_from_slice(buf.chunk());
|
||||||
|
buf.advance(buf.remaining());
|
||||||
|
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(None)) => Poll::Ready(None),
|
||||||
|
Poll::Ready(Err(e)) => {
|
||||||
|
warn!("H3 response body recv error: {}", e);
|
||||||
|
Poll::Ready(None)
|
||||||
|
}
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insecure certificate verifier for backend TLS connections (fallback only).
|
/// Insecure certificate verifier for backend TLS connections (fallback only).
|
||||||
@@ -2463,6 +2808,7 @@ impl Default for HttpProxyService {
|
|||||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||||
|
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,13 +3,21 @@
|
|||||||
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
|
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
|
||||||
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
|
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
|
||||||
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
|
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
|
||||||
|
//!
|
||||||
|
//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2
|
||||||
|
//! headers before they reach quinn, extracting real client IPs for attribution.
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
|
use dashmap::DashMap;
|
||||||
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
|
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
|
||||||
use rustls::ServerConfig as RustlsServerConfig;
|
use rustls::ServerConfig as RustlsServerConfig;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@@ -19,6 +27,8 @@ use rustproxy_config::{RouteConfig, TransportProtocol};
|
|||||||
use rustproxy_metrics::MetricsCollector;
|
use rustproxy_metrics::MetricsCollector;
|
||||||
use rustproxy_routing::{MatchContext, RouteManager};
|
use rustproxy_routing::{MatchContext, RouteManager};
|
||||||
|
|
||||||
|
use rustproxy_http::h3_service::H3ProxyService;
|
||||||
|
|
||||||
use crate::connection_tracker::ConnectionTracker;
|
use crate::connection_tracker::ConnectionTracker;
|
||||||
|
|
||||||
/// Create a QUIC server endpoint on the given port with the provided TLS config.
|
/// Create a QUIC server endpoint on the given port with the provided TLS config.
|
||||||
@@ -45,9 +55,274 @@ pub fn create_quic_endpoint(
|
|||||||
Ok(endpoint)
|
Ok(endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===== PROXY protocol relay for QUIC =====
|
||||||
|
|
||||||
|
/// Result of creating a QUIC endpoint with a PROXY protocol relay layer.
|
||||||
|
pub struct QuicProxyRelay {
|
||||||
|
/// The quinn endpoint (bound to 127.0.0.1:ephemeral).
|
||||||
|
pub endpoint: Endpoint,
|
||||||
|
/// The relay recv loop task handle.
|
||||||
|
pub relay_task: JoinHandle<()>,
|
||||||
|
/// Maps relay socket local addr → real client SocketAddr (from PROXY v2).
|
||||||
|
/// Consulted by `quic_accept_loop` to resolve real client IPs.
|
||||||
|
pub real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A single relay session for forwarding datagrams between an external source
|
||||||
|
/// and the internal quinn endpoint.
|
||||||
|
struct RelaySession {
|
||||||
|
socket: Arc<UdpSocket>,
|
||||||
|
last_activity: AtomicU64,
|
||||||
|
return_task: JoinHandle<()>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
|
||||||
|
///
|
||||||
|
/// Instead of giving the external socket to quinn, we:
|
||||||
|
/// 1. Bind a raw UDP socket on 0.0.0.0:port (external)
|
||||||
|
/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral)
|
||||||
|
/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams
|
||||||
|
///
|
||||||
|
/// Only used when `proxy_ips` is non-empty.
|
||||||
|
pub fn create_quic_endpoint_with_proxy_relay(
|
||||||
|
port: u16,
|
||||||
|
tls_config: Arc<RustlsServerConfig>,
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> anyhow::Result<QuicProxyRelay> {
|
||||||
|
// Bind external socket on the real port
|
||||||
|
let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?;
|
||||||
|
external_socket.set_nonblocking(true)?;
|
||||||
|
let external_socket = Arc::new(
|
||||||
|
UdpSocket::from_std(external_socket)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Bind quinn on localhost ephemeral port
|
||||||
|
let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
|
||||||
|
let quinn_internal_addr = internal_socket.local_addr()?;
|
||||||
|
|
||||||
|
let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?;
|
||||||
|
let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto));
|
||||||
|
|
||||||
|
let endpoint = Endpoint::new(
|
||||||
|
quinn::EndpointConfig::default(),
|
||||||
|
Some(server_config),
|
||||||
|
internal_socket,
|
||||||
|
quinn::default_runtime()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let real_client_map = Arc::new(DashMap::new());
|
||||||
|
|
||||||
|
let relay_task = tokio::spawn(quic_proxy_relay_loop(
|
||||||
|
external_socket,
|
||||||
|
quinn_internal_addr,
|
||||||
|
proxy_ips,
|
||||||
|
Arc::clone(&real_client_map),
|
||||||
|
cancel,
|
||||||
|
));
|
||||||
|
|
||||||
|
info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr);
|
||||||
|
Ok(QuicProxyRelay { endpoint, relay_task, real_client_map })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Main relay loop: reads datagrams from the external socket, filters PROXY v2
|
||||||
|
/// headers from trusted proxy IPs, and forwards everything else to quinn via
|
||||||
|
/// per-session relay sockets.
|
||||||
|
async fn quic_proxy_relay_loop(
|
||||||
|
external_socket: Arc<UdpSocket>,
|
||||||
|
quinn_internal_addr: SocketAddr,
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
|
real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) {
|
||||||
|
// Maps external source addr → real client addr (from PROXY v2 headers)
|
||||||
|
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
|
||||||
|
// Maps external source addr → relay session
|
||||||
|
let relay_sessions: DashMap<SocketAddr, Arc<RelaySession>> = DashMap::new();
|
||||||
|
let epoch = Instant::now();
|
||||||
|
let mut buf = vec![0u8; 65535];
|
||||||
|
|
||||||
|
// Inline cleanup: periodically scan relay_sessions for stale entries
|
||||||
|
let mut last_cleanup = Instant::now();
|
||||||
|
let cleanup_interval = std::time::Duration::from_secs(30);
|
||||||
|
let session_timeout_ms: u64 = 120_000;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (len, src_addr) = tokio::select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
debug!("QUIC proxy relay loop cancelled");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = external_socket.recv_from(&mut buf) => {
|
||||||
|
match result {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("QUIC proxy relay recv error: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let datagram = &buf[..len];
|
||||||
|
|
||||||
|
// PROXY v2 handling: only on first datagram from a trusted proxy IP
|
||||||
|
// (before a relay session exists for this source)
|
||||||
|
if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() {
|
||||||
|
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
|
||||||
|
match crate::proxy_protocol::parse_v2(datagram) {
|
||||||
|
Ok((header, _consumed)) => {
|
||||||
|
debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr);
|
||||||
|
proxy_addr_map.insert(src_addr, header.source_addr);
|
||||||
|
continue; // consume the PROXY v2 datagram
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine real client address
|
||||||
|
let real_client = proxy_addr_map.get(&src_addr)
|
||||||
|
.map(|r| *r)
|
||||||
|
.unwrap_or(src_addr);
|
||||||
|
|
||||||
|
// Get or create relay session for this external source
|
||||||
|
let session = match relay_sessions.get(&src_addr) {
|
||||||
|
Some(s) => {
|
||||||
|
s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||||
|
Arc::clone(s.value())
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Create new relay socket connected to quinn's internal address
|
||||||
|
let relay_socket = match UdpSocket::bind("127.0.0.1:0").await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("QUIC relay: failed to bind relay socket: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = relay_socket.connect(quinn_internal_addr).await {
|
||||||
|
warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let relay_local_addr = match relay_socket.local_addr() {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("QUIC relay: failed to get relay socket local addr: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let relay_socket = Arc::new(relay_socket);
|
||||||
|
|
||||||
|
// Store the real client mapping for the QUIC accept loop
|
||||||
|
real_client_map.insert(relay_local_addr, real_client);
|
||||||
|
|
||||||
|
// Spawn return-path relay: quinn -> external socket -> original source
|
||||||
|
let session_cancel = cancel.child_token();
|
||||||
|
let return_task = tokio::spawn(relay_return_path(
|
||||||
|
Arc::clone(&relay_socket),
|
||||||
|
Arc::clone(&external_socket),
|
||||||
|
src_addr,
|
||||||
|
session_cancel.child_token(),
|
||||||
|
));
|
||||||
|
|
||||||
|
let session = Arc::new(RelaySession {
|
||||||
|
socket: relay_socket,
|
||||||
|
last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64),
|
||||||
|
return_task,
|
||||||
|
cancel: session_cancel,
|
||||||
|
});
|
||||||
|
|
||||||
|
relay_sessions.insert(src_addr, Arc::clone(&session));
|
||||||
|
debug!("QUIC relay: new session for {} (relay {}), real client {}",
|
||||||
|
src_addr, relay_local_addr, real_client);
|
||||||
|
|
||||||
|
session
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Forward datagram to quinn via the relay socket
|
||||||
|
if let Err(e) = session.socket.send(datagram).await {
|
||||||
|
debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Periodic cleanup of stale relay sessions
|
||||||
|
if last_cleanup.elapsed() >= cleanup_interval {
|
||||||
|
last_cleanup = Instant::now();
|
||||||
|
let now_ms = epoch.elapsed().as_millis() as u64;
|
||||||
|
let stale_keys: Vec<SocketAddr> = relay_sessions.iter()
|
||||||
|
.filter(|entry| {
|
||||||
|
let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed));
|
||||||
|
age > session_timeout_ms
|
||||||
|
})
|
||||||
|
.map(|entry| *entry.key())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for key in stale_keys {
|
||||||
|
if let Some((_, session)) = relay_sessions.remove(&key) {
|
||||||
|
session.cancel.cancel();
|
||||||
|
session.return_task.abort();
|
||||||
|
// Clean up real_client_map entry
|
||||||
|
if let Ok(addr) = session.socket.local_addr() {
|
||||||
|
real_client_map.remove(&addr);
|
||||||
|
}
|
||||||
|
proxy_addr_map.remove(&key);
|
||||||
|
debug!("QUIC relay: cleaned up stale session for {}", key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown: cancel all relay sessions
|
||||||
|
for entry in relay_sessions.iter() {
|
||||||
|
entry.value().cancel.cancel();
|
||||||
|
entry.value().return_task.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return-path relay: receives datagrams from quinn (via the relay socket)
|
||||||
|
/// and forwards them back to the external client through the external socket.
|
||||||
|
async fn relay_return_path(
|
||||||
|
relay_socket: Arc<UdpSocket>,
|
||||||
|
external_socket: Arc<UdpSocket>,
|
||||||
|
external_src_addr: SocketAddr,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) {
|
||||||
|
let mut buf = vec![0u8; 65535];
|
||||||
|
loop {
|
||||||
|
let len = tokio::select! {
|
||||||
|
_ = cancel.cancelled() => break,
|
||||||
|
result = relay_socket.recv(&mut buf) => {
|
||||||
|
match result {
|
||||||
|
Ok(len) => len,
|
||||||
|
Err(e) => {
|
||||||
|
debug!("QUIC relay return recv error for {}: {}", external_src_addr, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await {
|
||||||
|
debug!("QUIC relay return send error to {}: {}", external_src_addr, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== QUIC accept loop =====
|
||||||
|
|
||||||
/// Run the QUIC accept loop for a single endpoint.
|
/// Run the QUIC accept loop for a single endpoint.
|
||||||
///
|
///
|
||||||
/// Accepts incoming QUIC connections and spawns a task per connection.
|
/// Accepts incoming QUIC connections and spawns a task per connection.
|
||||||
|
/// When `real_client_map` is provided, it is consulted to resolve real client
|
||||||
|
/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr).
|
||||||
pub async fn quic_accept_loop(
|
pub async fn quic_accept_loop(
|
||||||
endpoint: Endpoint,
|
endpoint: Endpoint,
|
||||||
port: u16,
|
port: u16,
|
||||||
@@ -55,6 +330,8 @@ pub async fn quic_accept_loop(
|
|||||||
metrics: Arc<MetricsCollector>,
|
metrics: Arc<MetricsCollector>,
|
||||||
conn_tracker: Arc<ConnectionTracker>,
|
conn_tracker: Arc<ConnectionTracker>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
|
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
let incoming = tokio::select! {
|
let incoming = tokio::select! {
|
||||||
@@ -74,11 +351,16 @@ pub async fn quic_accept_loop(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let remote_addr = incoming.remote_address();
|
let remote_addr = incoming.remote_address();
|
||||||
let ip = remote_addr.ip();
|
|
||||||
|
// Resolve real client IP from PROXY protocol map if available
|
||||||
|
let real_addr = real_client_map.as_ref()
|
||||||
|
.and_then(|map| map.get(&remote_addr).map(|r| *r))
|
||||||
|
.unwrap_or(remote_addr);
|
||||||
|
let ip = real_addr.ip();
|
||||||
|
|
||||||
// Per-IP rate limiting
|
// Per-IP rate limiting
|
||||||
if !conn_tracker.try_accept(&ip) {
|
if !conn_tracker.try_accept(&ip) {
|
||||||
debug!("QUIC connection rejected from {} (rate limit)", remote_addr);
|
debug!("QUIC connection rejected from {} (rate limit)", real_addr);
|
||||||
// Drop `incoming` to refuse the connection
|
// Drop `incoming` to refuse the connection
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -101,7 +383,7 @@ pub async fn quic_accept_loop(
|
|||||||
let route = match rm.find_route(&ctx) {
|
let route = match rm.find_route(&ctx) {
|
||||||
Some(m) => m.route.clone(),
|
Some(m) => m.route.clone(),
|
||||||
None => {
|
None => {
|
||||||
debug!("No QUIC route matched for port {} from {}", port, remote_addr);
|
debug!("No QUIC route matched for port {} from {}", port, real_addr);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -113,11 +395,13 @@ pub async fn quic_accept_loop(
|
|||||||
let metrics = Arc::clone(&metrics);
|
let metrics = Arc::clone(&metrics);
|
||||||
let conn_tracker = Arc::clone(&conn_tracker);
|
let conn_tracker = Arc::clone(&conn_tracker);
|
||||||
let cancel = cancel.child_token();
|
let cancel = cancel.child_token();
|
||||||
|
let h3_svc = h3_service.clone();
|
||||||
|
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel).await {
|
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
|
||||||
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
|
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
||||||
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
|
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
@@ -139,10 +423,12 @@ async fn handle_quic_connection(
|
|||||||
port: u16,
|
port: u16,
|
||||||
metrics: Arc<MetricsCollector>,
|
metrics: Arc<MetricsCollector>,
|
||||||
cancel: &CancellationToken,
|
cancel: &CancellationToken,
|
||||||
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
|
real_client_addr: Option<SocketAddr>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let connection = incoming.await?;
|
let connection = incoming.await?;
|
||||||
let remote_addr = connection.remote_address();
|
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
debug!("QUIC connection established from {}", remote_addr);
|
debug!("QUIC connection established from {}", effective_addr);
|
||||||
|
|
||||||
// Check if this route has HTTP/3 enabled
|
// Check if this route has HTTP/3 enabled
|
||||||
let enable_http3 = route.action.udp.as_ref()
|
let enable_http3 = route.action.udp.as_ref()
|
||||||
@@ -151,13 +437,23 @@ async fn handle_quic_connection(
|
|||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
if enable_http3 {
|
if enable_http3 {
|
||||||
// Phase 5: dispatch to H3ProxyService
|
if let Some(ref h3_svc) = h3_service {
|
||||||
// For now, log and accept streams for basic handling
|
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
|
||||||
debug!("HTTP/3 enabled for route {:?}, dispatching to H3 handler", route.name);
|
h3_svc.handle_connection(connection, &route, port, real_client_addr).await
|
||||||
handle_h3_connection(connection, route, port, &metrics, cancel).await
|
} else {
|
||||||
|
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
|
||||||
|
// Keep connection alive until cancelled
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => {}
|
||||||
|
reason = connection.closed() => {
|
||||||
|
debug!("HTTP/3 connection closed (no service): {}", reason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
|
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
|
||||||
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
|
handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,8 +468,9 @@ async fn handle_quic_stream_forwarding(
|
|||||||
port: u16,
|
port: u16,
|
||||||
metrics: Arc<MetricsCollector>,
|
metrics: Arc<MetricsCollector>,
|
||||||
cancel: &CancellationToken,
|
cancel: &CancellationToken,
|
||||||
|
real_client_addr: Option<SocketAddr>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let remote_addr = connection.remote_address();
|
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||||
let metrics_arc = metrics;
|
let metrics_arc = metrics;
|
||||||
|
|
||||||
@@ -194,7 +491,7 @@ async fn handle_quic_stream_forwarding(
|
|||||||
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
|
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
|
||||||
Err(quinn::ConnectionError::LocallyClosed) => break,
|
Err(quinn::ConnectionError::LocallyClosed) => break,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("QUIC stream accept error from {}: {}", remote_addr, e);
|
debug!("QUIC stream accept error from {}: {}", effective_addr, e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -202,7 +499,7 @@ async fn handle_quic_stream_forwarding(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let backend_addr = backend_addr.clone();
|
let backend_addr = backend_addr.clone();
|
||||||
let ip_str = remote_addr.ip().to_string();
|
let ip_str = effective_addr.ip().to_string();
|
||||||
let stream_metrics = Arc::clone(&metrics_arc);
|
let stream_metrics = Arc::clone(&metrics_arc);
|
||||||
let stream_route_id = route_id.map(|s| s.to_string());
|
let stream_route_id = route_id.map(|s| s.to_string());
|
||||||
|
|
||||||
@@ -257,29 +554,6 @@ async fn forward_quic_stream_to_tcp(
|
|||||||
Ok((bytes_in, bytes_out))
|
Ok((bytes_in, bytes_out))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Placeholder for HTTP/3 connection handling (Phase 5).
|
|
||||||
///
|
|
||||||
/// Once h3_service is implemented, this will delegate to it.
|
|
||||||
async fn handle_h3_connection(
|
|
||||||
connection: quinn::Connection,
|
|
||||||
_route: RouteConfig,
|
|
||||||
_port: u16,
|
|
||||||
_metrics: &MetricsCollector,
|
|
||||||
cancel: &CancellationToken,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
warn!("HTTP/3 handling not yet fully implemented — accepting connection but no request processing");
|
|
||||||
|
|
||||||
// Keep the connection alive until cancelled or closed
|
|
||||||
tokio::select! {
|
|
||||||
_ = cancel.cancelled() => {}
|
|
||||||
reason = connection.closed() => {
|
|
||||||
debug!("HTTP/3 connection closed: {}", reason);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -2,12 +2,17 @@
|
|||||||
//!
|
//!
|
||||||
//! Binds UDP sockets on configured ports, receives datagrams, matches routes,
|
//! Binds UDP sockets on configured ports, receives datagrams, matches routes,
|
||||||
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
|
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
|
||||||
|
//!
|
||||||
|
//! Supports PROXY protocol v2 on both raw UDP and QUIC paths when `proxy_ips`
|
||||||
|
//! is configured. For QUIC, a relay layer intercepts datagrams before they
|
||||||
|
//! reach the quinn endpoint.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use dashmap::DashMap;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
@@ -21,13 +26,15 @@ use rustproxy_config::{RouteActionType, TransportProtocol};
|
|||||||
use rustproxy_metrics::MetricsCollector;
|
use rustproxy_metrics::MetricsCollector;
|
||||||
use rustproxy_routing::{MatchContext, RouteManager};
|
use rustproxy_routing::{MatchContext, RouteManager};
|
||||||
|
|
||||||
|
use rustproxy_http::h3_service::H3ProxyService;
|
||||||
|
|
||||||
use crate::connection_tracker::ConnectionTracker;
|
use crate::connection_tracker::ConnectionTracker;
|
||||||
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
|
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
|
||||||
|
|
||||||
/// Manages UDP listeners across all configured ports.
|
/// Manages UDP listeners across all configured ports.
|
||||||
pub struct UdpListenerManager {
|
pub struct UdpListenerManager {
|
||||||
/// Port → recv loop task handle
|
/// Port → (recv loop task handle, optional QUIC endpoint for TLS updates)
|
||||||
listeners: HashMap<u16, JoinHandle<()>>,
|
listeners: HashMap<u16, (JoinHandle<()>, Option<quinn::Endpoint>)>,
|
||||||
/// Hot-reloadable route table
|
/// Hot-reloadable route table
|
||||||
route_manager: Arc<ArcSwap<RouteManager>>,
|
route_manager: Arc<ArcSwap<RouteManager>>,
|
||||||
/// Shared metrics collector
|
/// Shared metrics collector
|
||||||
@@ -44,13 +51,21 @@ pub struct UdpListenerManager {
|
|||||||
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
||||||
/// Cancel token for the current relay reply reader task
|
/// Cancel token for the current relay reply reader task
|
||||||
relay_reader_cancel: Option<CancellationToken>,
|
relay_reader_cancel: Option<CancellationToken>,
|
||||||
|
/// H3 proxy service for HTTP/3 request handling
|
||||||
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
|
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
|
||||||
|
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for UdpListenerManager {
|
impl Drop for UdpListenerManager {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.cancel_token.cancel();
|
self.cancel_token.cancel();
|
||||||
for (_, handle) in self.listeners.drain() {
|
for (_, (handle, endpoint)) in self.listeners.drain() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
|
if let Some(ep) = endpoint {
|
||||||
|
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,9 +87,24 @@ impl UdpListenerManager {
|
|||||||
datagram_handler_relay: Arc::new(RwLock::new(None)),
|
datagram_handler_relay: Arc::new(RwLock::new(None)),
|
||||||
relay_writer: Arc::new(Mutex::new(None)),
|
relay_writer: Arc::new(Mutex::new(None)),
|
||||||
relay_reader_cancel: None,
|
relay_reader_cancel: None,
|
||||||
|
h3_service: None,
|
||||||
|
proxy_ips: Arc::new(Vec::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the trusted proxy IPs for PROXY protocol v2 detection.
|
||||||
|
pub fn set_proxy_ips(&mut self, ips: Vec<IpAddr>) {
|
||||||
|
if !ips.is_empty() {
|
||||||
|
info!("UDP/QUIC PROXY protocol v2 enabled for {} trusted IPs", ips.len());
|
||||||
|
}
|
||||||
|
self.proxy_ips = Arc::new(ips);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the H3 proxy service for HTTP/3 request handling.
|
||||||
|
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
|
||||||
|
self.h3_service = Some(svc);
|
||||||
|
}
|
||||||
|
|
||||||
/// Update the route manager (for hot-reload).
|
/// Update the route manager (for hot-reload).
|
||||||
pub fn update_routes(&self, route_manager: Arc<RouteManager>) {
|
pub fn update_routes(&self, route_manager: Arc<RouteManager>) {
|
||||||
self.route_manager.store(route_manager);
|
self.route_manager.store(route_manager);
|
||||||
@@ -109,18 +139,44 @@ impl UdpListenerManager {
|
|||||||
|
|
||||||
if has_quic {
|
if has_quic {
|
||||||
if let Some(tls) = tls_config {
|
if let Some(tls) = tls_config {
|
||||||
// Create QUIC endpoint
|
if self.proxy_ips.is_empty() {
|
||||||
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
|
// Direct path: quinn owns the external socket (zero overhead)
|
||||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
|
||||||
endpoint,
|
let endpoint_for_updates = endpoint.clone();
|
||||||
port,
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
Arc::clone(&self.route_manager),
|
endpoint,
|
||||||
Arc::clone(&self.metrics),
|
port,
|
||||||
Arc::clone(&self.conn_tracker),
|
Arc::clone(&self.route_manager),
|
||||||
self.cancel_token.child_token(),
|
Arc::clone(&self.metrics),
|
||||||
));
|
Arc::clone(&self.conn_tracker),
|
||||||
self.listeners.insert(port, handle);
|
self.cancel_token.child_token(),
|
||||||
info!("QUIC endpoint started on port {}", port);
|
self.h3_service.clone(),
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
info!("QUIC endpoint started on port {}", port);
|
||||||
|
} else {
|
||||||
|
// Proxy relay path: we own external socket, quinn on localhost
|
||||||
|
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
|
||||||
|
port,
|
||||||
|
tls,
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
)?;
|
||||||
|
let endpoint_for_updates = relay.endpoint.clone();
|
||||||
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
|
relay.endpoint,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
self.h3_service.clone(),
|
||||||
|
Some(relay.real_client_map),
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
info!("QUIC endpoint with PROXY relay started on port {}", port);
|
||||||
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
|
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
|
||||||
@@ -143,9 +199,10 @@ impl UdpListenerManager {
|
|||||||
Arc::clone(&self.datagram_handler_relay),
|
Arc::clone(&self.datagram_handler_relay),
|
||||||
Arc::clone(&self.relay_writer),
|
Arc::clone(&self.relay_writer),
|
||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
));
|
));
|
||||||
|
|
||||||
self.listeners.insert(port, handle);
|
self.listeners.insert(port, (handle, None));
|
||||||
|
|
||||||
// Start the session cleanup task if this is the first port
|
// Start the session cleanup task if this is the first port
|
||||||
if self.listeners.len() == 1 {
|
if self.listeners.len() == 1 {
|
||||||
@@ -157,8 +214,11 @@ impl UdpListenerManager {
|
|||||||
|
|
||||||
/// Stop listening on a UDP port.
|
/// Stop listening on a UDP port.
|
||||||
pub fn remove_port(&mut self, port: u16) {
|
pub fn remove_port(&mut self, port: u16) {
|
||||||
if let Some(handle) = self.listeners.remove(&port) {
|
if let Some((handle, endpoint)) = self.listeners.remove(&port) {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
|
if let Some(ep) = endpoint {
|
||||||
|
ep.close(quinn::VarInt::from_u32(0), b"port removed");
|
||||||
|
}
|
||||||
info!("UDP listener removed from port {}", port);
|
info!("UDP listener removed from port {}", port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -173,14 +233,180 @@ impl UdpListenerManager {
|
|||||||
/// Stop all listeners and clean up.
|
/// Stop all listeners and clean up.
|
||||||
pub async fn stop(&mut self) {
|
pub async fn stop(&mut self) {
|
||||||
self.cancel_token.cancel();
|
self.cancel_token.cancel();
|
||||||
for (port, handle) in self.listeners.drain() {
|
for (port, (handle, endpoint)) in self.listeners.drain() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
|
if let Some(ep) = endpoint {
|
||||||
|
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||||
|
}
|
||||||
debug!("UDP listener stopped on port {}", port);
|
debug!("UDP listener stopped on port {}", port);
|
||||||
}
|
}
|
||||||
info!("All UDP listeners stopped, {} sessions remaining",
|
info!("All UDP listeners stopped, {} sessions remaining",
|
||||||
self.session_table.session_count());
|
self.session_table.session_count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update TLS config on all active QUIC endpoints (cert refresh).
|
||||||
|
/// Only affects new incoming connections — existing connections are undisturbed.
|
||||||
|
/// Uses quinn's Endpoint::set_server_config() for zero-downtime hot-swap.
|
||||||
|
pub fn update_quic_tls(&self, tls_config: Arc<rustls::ServerConfig>) {
|
||||||
|
for (port, (_handle, endpoint)) in &self.listeners {
|
||||||
|
if let Some(ep) = endpoint {
|
||||||
|
match quinn::crypto::rustls::QuicServerConfig::try_from(Arc::clone(&tls_config)) {
|
||||||
|
Ok(quic_crypto) => {
|
||||||
|
let server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto));
|
||||||
|
ep.set_server_config(Some(server_config));
|
||||||
|
info!("Updated QUIC TLS config on port {}", port);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to update QUIC TLS config on port {}: {}", port, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Upgrade raw UDP fallback listeners to QUIC endpoints.
|
||||||
|
///
|
||||||
|
/// At startup, if no TLS certs are available, QUIC routes fall back to raw UDP.
|
||||||
|
/// When certs become available later (via loadCertificate IPC or ACME), this method
|
||||||
|
/// stops the raw UDP listener, drains sessions, and creates a proper QUIC endpoint.
|
||||||
|
///
|
||||||
|
/// This is idempotent — ports that already have QUIC endpoints are skipped.
|
||||||
|
pub async fn upgrade_raw_to_quic(&mut self, tls_config: Arc<rustls::ServerConfig>) {
|
||||||
|
// Find ports that are raw UDP fallback (endpoint=None) but have QUIC routes
|
||||||
|
let rm = self.route_manager.load();
|
||||||
|
let upgrade_ports: Vec<u16> = self.listeners.iter()
|
||||||
|
.filter(|(_, (_, endpoint))| endpoint.is_none())
|
||||||
|
.filter(|(port, _)| {
|
||||||
|
rm.routes_for_port(**port).iter().any(|r| {
|
||||||
|
r.action.udp.as_ref()
|
||||||
|
.and_then(|u| u.quic.as_ref())
|
||||||
|
.is_some()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.map(|(port, _)| *port)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for port in upgrade_ports {
|
||||||
|
info!("Upgrading raw UDP listener on port {} to QUIC endpoint", port);
|
||||||
|
|
||||||
|
// Stop the raw UDP listener task and drain sessions to release the socket
|
||||||
|
if let Some((handle, _)) = self.listeners.remove(&port) {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
let drained = self.session_table.drain_port(
|
||||||
|
port, &self.metrics, &self.conn_tracker,
|
||||||
|
);
|
||||||
|
if drained > 0 {
|
||||||
|
debug!("Drained {} UDP sessions on port {} for QUIC upgrade", drained, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Brief yield to let aborted tasks drop their socket references
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
// Create QUIC endpoint on the now-free port
|
||||||
|
let create_result = if self.proxy_ips.is_empty() {
|
||||||
|
self.create_quic_direct(port, Arc::clone(&tls_config))
|
||||||
|
} else {
|
||||||
|
self.create_quic_with_relay(port, Arc::clone(&tls_config))
|
||||||
|
};
|
||||||
|
|
||||||
|
match create_result {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// Port may still be held — retry once after a brief delay
|
||||||
|
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
let retry_result = if self.proxy_ips.is_empty() {
|
||||||
|
self.create_quic_direct(port, Arc::clone(&tls_config))
|
||||||
|
} else {
|
||||||
|
self.create_quic_with_relay(port, Arc::clone(&tls_config))
|
||||||
|
};
|
||||||
|
|
||||||
|
match retry_result {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
|
||||||
|
}
|
||||||
|
Err(e2) => {
|
||||||
|
error!("Failed to upgrade port {} to QUIC after retry: {}. \
|
||||||
|
Rebinding as raw UDP.", port, e2);
|
||||||
|
// Fallback: rebind as raw UDP so the port isn't dead
|
||||||
|
if let Ok(()) = self.rebind_raw_udp(port).await {
|
||||||
|
warn!("Port {} rebound as raw UDP (QUIC upgrade failed)", port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a direct QUIC endpoint (quinn owns the socket).
|
||||||
|
fn create_quic_direct(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
|
||||||
|
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls_config)?;
|
||||||
|
let endpoint_for_updates = endpoint.clone();
|
||||||
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
|
endpoint,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
self.h3_service.clone(),
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a QUIC endpoint with PROXY protocol relay.
|
||||||
|
fn create_quic_with_relay(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
|
||||||
|
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
|
||||||
|
port,
|
||||||
|
tls_config,
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
)?;
|
||||||
|
let endpoint_for_updates = relay.endpoint.clone();
|
||||||
|
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||||
|
relay.endpoint,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
self.h3_service.clone(),
|
||||||
|
Some(relay.real_client_map),
|
||||||
|
));
|
||||||
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
|
||||||
|
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
|
||||||
|
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
|
||||||
|
let socket = UdpSocket::bind(addr).await?;
|
||||||
|
let socket = Arc::new(socket);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(Self::recv_loop(
|
||||||
|
socket,
|
||||||
|
port,
|
||||||
|
Arc::clone(&self.route_manager),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::clone(&self.conn_tracker),
|
||||||
|
Arc::clone(&self.session_table),
|
||||||
|
Arc::clone(&self.datagram_handler_relay),
|
||||||
|
Arc::clone(&self.relay_writer),
|
||||||
|
self.cancel_token.child_token(),
|
||||||
|
Arc::clone(&self.proxy_ips),
|
||||||
|
));
|
||||||
|
|
||||||
|
self.listeners.insert(port, (handle, None));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Set the datagram handler relay socket path and establish connection.
|
/// Set the datagram handler relay socket path and establish connection.
|
||||||
pub async fn set_datagram_handler_relay(&mut self, path: String) {
|
pub async fn set_datagram_handler_relay(&mut self, path: String) {
|
||||||
// Cancel previous relay reader task if any
|
// Cancel previous relay reader task if any
|
||||||
@@ -255,6 +481,10 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Main receive loop for a UDP port.
|
/// Main receive loop for a UDP port.
|
||||||
|
///
|
||||||
|
/// When `proxy_ips` is non-empty, the first datagram from a trusted proxy IP
|
||||||
|
/// is checked for PROXY protocol v2. If found, the real client IP is extracted
|
||||||
|
/// and used for all subsequent session handling for that source address.
|
||||||
async fn recv_loop(
|
async fn recv_loop(
|
||||||
socket: Arc<UdpSocket>,
|
socket: Arc<UdpSocket>,
|
||||||
port: u16,
|
port: u16,
|
||||||
@@ -265,10 +495,15 @@ impl UdpListenerManager {
|
|||||||
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
||||||
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
) {
|
) {
|
||||||
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
|
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
|
||||||
let mut buf = vec![0u8; 65535];
|
let mut buf = vec![0u8; 65535];
|
||||||
|
|
||||||
|
// Maps proxy source addr → real client addr (from PROXY v2 headers).
|
||||||
|
// Only populated when proxy_ips is non-empty.
|
||||||
|
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (len, client_addr) = tokio::select! {
|
let (len, client_addr) = tokio::select! {
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
@@ -288,9 +523,39 @@ impl UdpListenerManager {
|
|||||||
|
|
||||||
let datagram = &buf[..len];
|
let datagram = &buf[..len];
|
||||||
|
|
||||||
// Route matching
|
// PROXY protocol v2 detection for datagrams from trusted proxy IPs
|
||||||
|
let effective_client_ip = if !proxy_ips.is_empty() && proxy_ips.contains(&client_addr.ip()) {
|
||||||
|
let session_key: SessionKey = (client_addr, port);
|
||||||
|
if session_table.get(&session_key).is_none() && !proxy_addr_map.contains_key(&client_addr) {
|
||||||
|
// No session and no prior PROXY header — check for PROXY v2
|
||||||
|
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
|
||||||
|
match crate::proxy_protocol::parse_v2(datagram) {
|
||||||
|
Ok((header, _consumed)) => {
|
||||||
|
debug!("UDP PROXY v2 from {}: real client {}", client_addr, header.source_addr);
|
||||||
|
proxy_addr_map.insert(client_addr, header.source_addr);
|
||||||
|
continue; // discard the PROXY v2 datagram
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!("UDP PROXY v2 parse error from {}: {}", client_addr, e);
|
||||||
|
client_addr.ip()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_addr.ip()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Use real client IP if we've previously seen a PROXY v2 header
|
||||||
|
proxy_addr_map.get(&client_addr)
|
||||||
|
.map(|r| r.ip())
|
||||||
|
.unwrap_or_else(|| client_addr.ip())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_addr.ip()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Route matching — use effective (real) client IP
|
||||||
let rm = route_manager.load();
|
let rm = route_manager.load();
|
||||||
let ip_str = client_addr.ip().to_string();
|
let ip_str = effective_client_ip.to_string();
|
||||||
let ctx = MatchContext {
|
let ctx = MatchContext {
|
||||||
port,
|
port,
|
||||||
domain: None,
|
domain: None,
|
||||||
@@ -339,20 +604,21 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Session lookup or create
|
// Session lookup or create
|
||||||
|
// Session key uses the proxy's source addr for correct return-path routing
|
||||||
let session_key: SessionKey = (client_addr, port);
|
let session_key: SessionKey = (client_addr, port);
|
||||||
let session = match session_table.get(&session_key) {
|
let session = match session_table.get(&session_key) {
|
||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
// New session — check per-IP limits
|
// New session — check per-IP limits using the real client IP
|
||||||
if !conn_tracker.try_accept(&client_addr.ip()) {
|
if !conn_tracker.try_accept(&effective_client_ip) {
|
||||||
debug!("UDP session rejected for {} (rate limit)", client_addr);
|
debug!("UDP session rejected for {} (rate limit)", effective_client_ip);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if !session_table.can_create_session(
|
if !session_table.can_create_session(
|
||||||
&client_addr.ip(),
|
&effective_client_ip,
|
||||||
udp_config.max_sessions_per_ip,
|
udp_config.max_sessions_per_ip,
|
||||||
) {
|
) {
|
||||||
debug!("UDP session rejected for {} (per-IP session limit)", client_addr);
|
debug!("UDP session rejected for {} (per-IP session limit)", effective_client_ip);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -385,8 +651,8 @@ impl UdpListenerManager {
|
|||||||
}
|
}
|
||||||
let backend_socket = Arc::new(backend_socket);
|
let backend_socket = Arc::new(backend_socket);
|
||||||
|
|
||||||
debug!("New UDP session: {} -> {} (via port {})",
|
debug!("New UDP session: {} -> {} (via port {}, real client {})",
|
||||||
client_addr, backend_addr, port);
|
client_addr, backend_addr, port, effective_client_ip);
|
||||||
|
|
||||||
// Spawn return-path relay task
|
// Spawn return-path relay task
|
||||||
let session_cancel = CancellationToken::new();
|
let session_cancel = CancellationToken::new();
|
||||||
@@ -406,7 +672,7 @@ impl UdpListenerManager {
|
|||||||
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
|
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
|
||||||
created_at: std::time::Instant::now(),
|
created_at: std::time::Instant::now(),
|
||||||
route_id: route_id.map(|s| s.to_string()),
|
route_id: route_id.map(|s| s.to_string()),
|
||||||
source_ip: client_addr.ip(),
|
source_ip: effective_client_ip,
|
||||||
client_addr,
|
client_addr,
|
||||||
return_task,
|
return_task,
|
||||||
cancel: session_cancel,
|
cancel: session_cancel,
|
||||||
@@ -417,8 +683,8 @@ impl UdpListenerManager {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track in metrics
|
// Track in metrics using the real client IP
|
||||||
conn_tracker.connection_opened(&client_addr.ip());
|
conn_tracker.connection_opened(&effective_client_ip);
|
||||||
metrics.connection_opened(route_id, Some(&ip_str));
|
metrics.connection_opened(route_id, Some(&ip_str));
|
||||||
metrics.udp_session_opened();
|
metrics.udp_session_opened();
|
||||||
|
|
||||||
|
|||||||
@@ -201,6 +201,36 @@ impl UdpSessionTable {
|
|||||||
removed
|
removed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drain all sessions on a given listening port, releasing socket references.
|
||||||
|
/// Used when upgrading a raw UDP listener to QUIC — the raw UDP socket's
|
||||||
|
/// Arc refcount must drop to zero so the port can be rebound.
|
||||||
|
pub fn drain_port(
|
||||||
|
&self,
|
||||||
|
port: u16,
|
||||||
|
metrics: &MetricsCollector,
|
||||||
|
conn_tracker: &ConnectionTracker,
|
||||||
|
) -> usize {
|
||||||
|
let keys: Vec<SessionKey> = self.sessions.iter()
|
||||||
|
.filter(|entry| entry.key().1 == port)
|
||||||
|
.map(|entry| *entry.key())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut removed = 0;
|
||||||
|
for key in keys {
|
||||||
|
if let Some(session) = self.remove(&key) {
|
||||||
|
session.cancel.cancel();
|
||||||
|
conn_tracker.connection_closed(&session.source_ip);
|
||||||
|
metrics.connection_closed(
|
||||||
|
session.route_id.as_deref(),
|
||||||
|
Some(&session.source_ip.to_string()),
|
||||||
|
);
|
||||||
|
metrics.udp_session_closed();
|
||||||
|
removed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
removed
|
||||||
|
}
|
||||||
|
|
||||||
/// Total number of active sessions.
|
/// Total number of active sessions.
|
||||||
pub fn session_count(&self) -> usize {
|
pub fn session_count(&self) -> usize {
|
||||||
self.sessions.len()
|
self.sessions.len()
|
||||||
|
|||||||
@@ -122,10 +122,16 @@ impl RouteManager {
|
|||||||
// This prevents session-ticket resumption from misrouting when clients
|
// This prevents session-ticket resumption from misrouting when clients
|
||||||
// omit SNI (RFC 8446 recommends but doesn't mandate SNI on resumption).
|
// omit SNI (RFC 8446 recommends but doesn't mandate SNI on resumption).
|
||||||
// Wildcard-only routes (domains: ["*"]) still match since they accept all.
|
// Wildcard-only routes (domains: ["*"]) still match since they accept all.
|
||||||
let patterns = domains.to_vec();
|
//
|
||||||
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
|
// Exception: QUIC (UDP transport) encrypts the TLS ClientHello, so SNI
|
||||||
if !is_wildcard_only {
|
// is unavailable at accept time. Domain verification happens per-request
|
||||||
return false;
|
// in H3ProxyService via the :authority header.
|
||||||
|
if ctx.transport != Some(TransportProtocol::Udp) {
|
||||||
|
let patterns = domains.to_vec();
|
||||||
|
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
|
||||||
|
if !is_wildcard_only {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -997,4 +1003,52 @@ mod tests {
|
|||||||
let result = manager.find_route(&udp_ctx).unwrap();
|
let result = manager.find_route(&udp_ctx).unwrap();
|
||||||
assert_eq!(result.route.name.as_deref(), Some("udp-route"));
|
assert_eq!(result.route.name.as_deref(), Some("udp-route"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_quic_tls_no_sni_matches_domain_restricted_route() {
|
||||||
|
// QUIC accept-level matching: is_tls=true, domain=None, transport=Udp.
|
||||||
|
// Should match because QUIC encrypts the ClientHello — SNI is unavailable
|
||||||
|
// at accept time but verified per-request in H3ProxyService.
|
||||||
|
let mut route = make_route(443, Some("example.com"), 0);
|
||||||
|
route.route_match.transport = Some(TransportProtocol::Udp);
|
||||||
|
let routes = vec![route];
|
||||||
|
let manager = RouteManager::new(routes);
|
||||||
|
|
||||||
|
let ctx = MatchContext {
|
||||||
|
port: 443,
|
||||||
|
domain: None,
|
||||||
|
path: None,
|
||||||
|
client_ip: None,
|
||||||
|
tls_version: None,
|
||||||
|
headers: None,
|
||||||
|
is_tls: true,
|
||||||
|
protocol: Some("quic"),
|
||||||
|
transport: Some(TransportProtocol::Udp),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(manager.find_route(&ctx).is_some(),
|
||||||
|
"QUIC (UDP) with is_tls=true and domain=None should match domain-restricted routes");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tcp_tls_no_sni_still_rejects_domain_restricted_route() {
|
||||||
|
// TCP TLS without SNI must still be rejected (no QUIC exemption).
|
||||||
|
let routes = vec![make_route(443, Some("example.com"), 0)];
|
||||||
|
let manager = RouteManager::new(routes);
|
||||||
|
|
||||||
|
let ctx = MatchContext {
|
||||||
|
port: 443,
|
||||||
|
domain: None,
|
||||||
|
path: None,
|
||||||
|
client_ip: None,
|
||||||
|
tls_version: None,
|
||||||
|
headers: None,
|
||||||
|
is_tls: true,
|
||||||
|
protocol: None,
|
||||||
|
transport: None, // TCP (default)
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(manager.find_route(&ctx).is_none(),
|
||||||
|
"TCP TLS without SNI should NOT match domain-restricted routes");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -264,6 +264,8 @@ impl RustProxy {
|
|||||||
conn_config.socket_timeout_ms,
|
conn_config.socket_timeout_ms,
|
||||||
conn_config.max_connection_lifetime_ms,
|
conn_config.max_connection_lifetime_ms,
|
||||||
);
|
);
|
||||||
|
// Clone proxy_ips before conn_config is moved into the TCP listener
|
||||||
|
let udp_proxy_ips = conn_config.proxy_ips.clone();
|
||||||
listener.set_connection_config(conn_config);
|
listener.set_connection_config(conn_config);
|
||||||
|
|
||||||
// Share the socket-handler relay path with the listener
|
// Share the socket-handler relay path with the listener
|
||||||
@@ -339,6 +341,18 @@ impl RustProxy {
|
|||||||
conn_tracker,
|
conn_tracker,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
);
|
);
|
||||||
|
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
|
||||||
|
|
||||||
|
// Construct H3ProxyService for HTTP/3 request handling
|
||||||
|
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(
|
||||||
|
Arc::new(ArcSwap::from(Arc::clone(&*self.route_table.load()))),
|
||||||
|
Arc::clone(&self.metrics),
|
||||||
|
Arc::new(rustproxy_http::connection_pool::ConnectionPool::new()),
|
||||||
|
Arc::new(rustproxy_http::protocol_cache::ProtocolCache::new()),
|
||||||
|
rustproxy_passthrough::tls_handler::shared_backend_tls_config(),
|
||||||
|
std::time::Duration::from_secs(30),
|
||||||
|
);
|
||||||
|
udp_mgr.set_h3_service(Arc::new(h3_svc));
|
||||||
|
|
||||||
for port in &udp_ports {
|
for port in &udp_ports {
|
||||||
udp_mgr.add_port_with_tls(*port, quic_tls_config.clone()).await?;
|
udp_mgr.add_port_with_tls(*port, quic_tls_config.clone()).await?;
|
||||||
@@ -763,22 +777,31 @@ impl RustProxy {
|
|||||||
if self.udp_listener_manager.is_none() {
|
if self.udp_listener_manager.is_none() {
|
||||||
if let Some(ref listener) = self.listener_manager {
|
if let Some(ref listener) = self.listener_manager {
|
||||||
let conn_tracker = listener.conn_tracker().clone();
|
let conn_tracker = listener.conn_tracker().clone();
|
||||||
self.udp_listener_manager = Some(UdpListenerManager::new(
|
let conn_config = Self::build_connection_config(&self.options);
|
||||||
|
let mut udp_mgr = UdpListenerManager::new(
|
||||||
Arc::clone(&new_manager),
|
Arc::clone(&new_manager),
|
||||||
Arc::clone(&self.metrics),
|
Arc::clone(&self.metrics),
|
||||||
conn_tracker,
|
conn_tracker,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
));
|
);
|
||||||
|
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
|
||||||
|
self.udp_listener_manager = Some(udp_mgr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build TLS config for QUIC (needed for new ports and upgrading existing raw UDP)
|
||||||
|
let quic_tls = {
|
||||||
|
let tls_configs = self.current_tls_configs().await;
|
||||||
|
Self::build_quic_tls_config(&tls_configs)
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||||
udp_mgr.update_routes(Arc::clone(&new_manager));
|
udp_mgr.update_routes(Arc::clone(&new_manager));
|
||||||
|
|
||||||
// Add new UDP ports
|
// Add new UDP ports (with TLS for QUIC)
|
||||||
for port in &new_udp_ports {
|
for port in &new_udp_ports {
|
||||||
if !old_udp_ports.contains(port) {
|
if !old_udp_ports.contains(port) {
|
||||||
udp_mgr.add_port(*port).await?;
|
udp_mgr.add_port_with_tls(*port, quic_tls.clone()).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Remove old UDP ports
|
// Remove old UDP ports
|
||||||
@@ -787,6 +810,12 @@ impl RustProxy {
|
|||||||
udp_mgr.remove_port(*port);
|
udp_mgr.remove_port(*port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Upgrade existing raw UDP fallback listeners to QUIC if TLS is now available
|
||||||
|
if let Some(ref quic_config) = quic_tls {
|
||||||
|
udp_mgr.update_quic_tls(Arc::clone(quic_config));
|
||||||
|
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if self.udp_listener_manager.is_some() {
|
} else if self.udp_listener_manager.is_some() {
|
||||||
// All UDP routes removed — shut down UDP manager
|
// All UDP routes removed — shut down UDP manager
|
||||||
@@ -843,12 +872,12 @@ impl RustProxy {
|
|||||||
.map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?;
|
||||||
|
|
||||||
// Hot-swap into TLS configs
|
// Hot-swap into TLS configs
|
||||||
if let Some(ref mut listener) = self.listener_manager {
|
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
|
||||||
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
|
tls_configs.insert(domain.clone(), TlsCertConfig {
|
||||||
tls_configs.insert(domain.clone(), TlsCertConfig {
|
cert_pem: bundle.cert_pem.clone(),
|
||||||
cert_pem: bundle.cert_pem.clone(),
|
key_pem: bundle.key_pem.clone(),
|
||||||
key_pem: bundle.key_pem.clone(),
|
});
|
||||||
});
|
{
|
||||||
let cm = cm_arc.lock().await;
|
let cm = cm_arc.lock().await;
|
||||||
for (d, b) in cm.store().iter() {
|
for (d, b) in cm.store().iter() {
|
||||||
if !tls_configs.contains_key(d) {
|
if !tls_configs.contains_key(d) {
|
||||||
@@ -858,9 +887,22 @@ impl RustProxy {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let quic_tls = Self::build_quic_tls_config(&tls_configs);
|
||||||
|
|
||||||
|
if let Some(ref listener) = self.listener_manager {
|
||||||
listener.set_tls_configs(tls_configs);
|
listener.set_tls_configs(tls_configs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
|
||||||
|
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||||
|
if let Some(ref quic_config) = quic_tls {
|
||||||
|
udp_mgr.update_quic_tls(Arc::clone(quic_config));
|
||||||
|
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Certificate provisioned and loaded for route '{}'", route_name);
|
info!("Certificate provisioned and loaded for route '{}'", route_name);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -1005,6 +1047,33 @@ impl RustProxy {
|
|||||||
Some(Arc::new(tls_config))
|
Some(Arc::new(tls_config))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build the current full TLS config map from all sources (route configs, loaded certs, cert manager).
|
||||||
|
async fn current_tls_configs(&self) -> HashMap<String, TlsCertConfig> {
|
||||||
|
let mut configs = Self::extract_tls_configs(&self.options.routes);
|
||||||
|
|
||||||
|
// Merge dynamically loaded certs (from loadCertificate IPC)
|
||||||
|
for (d, c) in &self.loaded_certs {
|
||||||
|
if !configs.contains_key(d) {
|
||||||
|
configs.insert(d.clone(), c.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge certs from cert manager store
|
||||||
|
if let Some(ref cm_arc) = self.cert_manager {
|
||||||
|
let cm = cm_arc.lock().await;
|
||||||
|
for (d, b) in cm.store().iter() {
|
||||||
|
if !configs.contains_key(d) {
|
||||||
|
configs.insert(d.clone(), TlsCertConfig {
|
||||||
|
cert_pem: b.cert_pem.clone(),
|
||||||
|
key_pem: b.key_pem.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
configs
|
||||||
|
}
|
||||||
|
|
||||||
/// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks.
|
/// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks.
|
||||||
pub async fn set_datagram_handler_relay_path(&mut self, path: Option<String>) {
|
pub async fn set_datagram_handler_relay_path(&mut self, path: Option<String>) {
|
||||||
info!("Datagram handler relay path set to: {:?}", path);
|
info!("Datagram handler relay path set to: {:?}", path);
|
||||||
@@ -1055,39 +1124,24 @@ impl RustProxy {
|
|||||||
key_pem: key_pem.clone(),
|
key_pem: key_pem.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Hot-swap TLS config on the listener
|
// Hot-swap TLS config on TCP and QUIC listeners
|
||||||
if let Some(ref mut listener) = self.listener_manager {
|
let tls_configs = self.current_tls_configs().await;
|
||||||
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
|
|
||||||
|
|
||||||
// Add the new cert
|
// Build QUIC TLS config before TCP consumes the map
|
||||||
tls_configs.insert(domain.to_string(), TlsCertConfig {
|
let quic_tls = Self::build_quic_tls_config(&tls_configs);
|
||||||
cert_pem: cert_pem.clone(),
|
|
||||||
key_pem: key_pem.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Also include all existing certs from cert manager
|
|
||||||
if let Some(ref cm_arc) = self.cert_manager {
|
|
||||||
let cm = cm_arc.lock().await;
|
|
||||||
for (d, b) in cm.store().iter() {
|
|
||||||
if !tls_configs.contains_key(d) {
|
|
||||||
tls_configs.insert(d.clone(), TlsCertConfig {
|
|
||||||
cert_pem: b.cert_pem.clone(),
|
|
||||||
key_pem: b.key_pem.clone(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge dynamically loaded certs from previous loadCertificate calls
|
|
||||||
for (d, c) in &self.loaded_certs {
|
|
||||||
if !tls_configs.contains_key(d) {
|
|
||||||
tls_configs.insert(d.clone(), c.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if let Some(ref listener) = self.listener_manager {
|
||||||
listener.set_tls_configs(tls_configs);
|
listener.set_tls_configs(tls_configs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
|
||||||
|
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||||
|
if let Some(ref quic_config) = quic_tls {
|
||||||
|
udp_mgr.update_quic_tls(Arc::clone(quic_config));
|
||||||
|
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Certificate loaded and TLS config updated for {}", domain);
|
info!("Certificate loaded and TLS config updated for {}", domain);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartproxy',
|
name: '@push.rocks/smartproxy',
|
||||||
version: '25.15.0',
|
version: '25.17.4',
|
||||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import type { IRouteConfig, IRouteMatch, IRouteAction, TPortRange } from '../mod
|
|||||||
export class RouteValidator {
|
export class RouteValidator {
|
||||||
private static readonly VALID_TLS_MODES = ['terminate', 'passthrough', 'terminate-and-reencrypt'];
|
private static readonly VALID_TLS_MODES = ['terminate', 'passthrough', 'terminate-and-reencrypt'];
|
||||||
private static readonly VALID_ACTION_TYPES = ['forward', 'socket-handler'];
|
private static readonly VALID_ACTION_TYPES = ['forward', 'socket-handler'];
|
||||||
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss'];
|
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss', 'udp', 'quic', 'http3'];
|
||||||
private static readonly MAX_PORTS = 100;
|
private static readonly MAX_PORTS = 100;
|
||||||
private static readonly MAX_DOMAINS = 1000;
|
private static readonly MAX_DOMAINS = 1000;
|
||||||
private static readonly MAX_HEADER_SIZE = 8192;
|
private static readonly MAX_HEADER_SIZE = 8192;
|
||||||
@@ -173,6 +173,22 @@ export class RouteValidator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QUIC routes require TLS with termination (QUIC mandates TLS 1.3)
|
||||||
|
if (route.action.udp?.quic && route.action.type === 'forward') {
|
||||||
|
if (!route.action.tls) {
|
||||||
|
errors.push('QUIC routes require TLS configuration (action.tls) — QUIC mandates TLS 1.3');
|
||||||
|
} else if (route.action.tls.mode === 'passthrough') {
|
||||||
|
errors.push('QUIC routes cannot use TLS mode "passthrough" — use "terminate" or "terminate-and-reencrypt"');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Protocol quic/http3 requires transport udp or all
|
||||||
|
if (route.match?.protocol && ['quic', 'http3'].includes(route.match.protocol)) {
|
||||||
|
if (route.match.transport && route.match.transport !== 'udp' && route.match.transport !== 'all') {
|
||||||
|
errors.push(`Protocol "${route.match.protocol}" requires transport "udp" or "all"`);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate security settings
|
// Validate security settings
|
||||||
@@ -619,6 +635,15 @@ export function validateRouteAction(action: IRouteAction): { valid: boolean; err
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QUIC routes require TLS with termination
|
||||||
|
if (action.udp?.quic && action.type === 'forward') {
|
||||||
|
if (!action.tls) {
|
||||||
|
errors.push('QUIC routes require TLS configuration — QUIC mandates TLS 1.3');
|
||||||
|
} else if (action.tls.mode === 'passthrough') {
|
||||||
|
errors.push('QUIC routes cannot use TLS mode "passthrough"');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (action.type === 'socket-handler') {
|
if (action.type === 'socket-handler') {
|
||||||
if (!action.socketHandler && !action.datagramHandler) {
|
if (!action.socketHandler && !action.datagramHandler) {
|
||||||
errors.push('Socket handler or datagram handler function is required for socket-handler action');
|
errors.push('Socket handler or datagram handler function is required for socket-handler action');
|
||||||
|
|||||||
Reference in New Issue
Block a user