feat(quic,http3): add HTTP/3 proxy handling and hot-reload QUIC TLS configuration
This commit is contained in:
@@ -58,6 +58,18 @@ const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::
|
||||
/// Default WebSocket max lifetime (24 hours).
|
||||
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
|
||||
|
||||
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
|
||||
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
|
||||
|
||||
/// Protocol decision for backend connection.
|
||||
#[derive(Debug)]
|
||||
enum ProtocolDecision {
|
||||
H1,
|
||||
H2,
|
||||
H3 { port: u16 },
|
||||
AlpnProbe,
|
||||
}
|
||||
|
||||
/// RAII guard that decrements the active request counter on drop.
|
||||
/// Ensures the counter is correct even if the request handler panics.
|
||||
struct ActiveRequestGuard {
|
||||
@@ -190,6 +202,9 @@ pub struct HttpProxyService {
|
||||
ws_inactivity_timeout: std::time::Duration,
|
||||
/// WebSocket maximum connection lifetime.
|
||||
ws_max_lifetime: std::time::Duration,
|
||||
/// Shared QUIC client endpoint for outbound H3 backend connections.
|
||||
/// Lazily initialized on first H3 backend attempt.
|
||||
quinn_client_endpoint: Arc<quinn::Endpoint>,
|
||||
}
|
||||
|
||||
impl HttpProxyService {
|
||||
@@ -209,6 +224,7 @@ impl HttpProxyService {
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +249,7 @@ impl HttpProxyService {
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -645,37 +662,94 @@ impl HttpProxyService {
|
||||
|
||||
// --- Resolve protocol decision based on backend protocol mode ---
|
||||
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
|
||||
let (use_h2, needs_alpn_probe) = match backend_protocol_mode {
|
||||
rustproxy_config::BackendProtocol::Http1 => (false, false),
|
||||
rustproxy_config::BackendProtocol::Http2 => (true, false),
|
||||
rustproxy_config::BackendProtocol::Http3 => {
|
||||
// HTTP/3 (QUIC) backend connections not yet implemented — fall back to H1
|
||||
warn!("backendProtocol 'http3' not yet implemented, falling back to http1");
|
||||
(false, false)
|
||||
}
|
||||
let protocol_cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||
host: upstream.host.clone(),
|
||||
port: upstream.port,
|
||||
requested_host: host.clone(),
|
||||
};
|
||||
let protocol_decision = match backend_protocol_mode {
|
||||
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
|
||||
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
|
||||
rustproxy_config::BackendProtocol::Http3 => ProtocolDecision::H3 { port: upstream.port },
|
||||
rustproxy_config::BackendProtocol::Auto => {
|
||||
if !upstream.use_tls {
|
||||
// No ALPN without TLS — default to H1
|
||||
(false, false)
|
||||
// No ALPN without TLS, no QUIC without TLS — default to H1
|
||||
ProtocolDecision::H1
|
||||
} else {
|
||||
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||
host: upstream.host.clone(),
|
||||
port: upstream.port,
|
||||
requested_host: host.clone(),
|
||||
};
|
||||
match self.protocol_cache.get(&cache_key) {
|
||||
Some(crate::protocol_cache::DetectedProtocol::H2) => (true, false),
|
||||
Some(crate::protocol_cache::DetectedProtocol::H1) => (false, false),
|
||||
Some(crate::protocol_cache::DetectedProtocol::H3) => {
|
||||
// H3 cached but we're on TCP — fall back to H2 probe
|
||||
(false, true)
|
||||
}
|
||||
None => (false, true), // needs ALPN probe
|
||||
match self.protocol_cache.get(&protocol_cache_key) {
|
||||
Some(cached) => match cached.protocol {
|
||||
crate::protocol_cache::DetectedProtocol::H3 => {
|
||||
if let Some(h3_port) = cached.h3_port {
|
||||
ProtocolDecision::H3 { port: h3_port }
|
||||
} else {
|
||||
// H3 cached but no port — fall back to ALPN probe
|
||||
ProtocolDecision::AlpnProbe
|
||||
}
|
||||
}
|
||||
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
|
||||
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
|
||||
},
|
||||
None => ProtocolDecision::AlpnProbe,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Derive legacy flags for the existing H1/H2 connection path
|
||||
let (use_h2, needs_alpn_probe) = match &protocol_decision {
|
||||
ProtocolDecision::H1 => (false, false),
|
||||
ProtocolDecision::H2 => (true, false),
|
||||
ProtocolDecision::H3 { .. } => (false, false), // H3 path handled separately below
|
||||
ProtocolDecision::AlpnProbe => (false, true),
|
||||
};
|
||||
|
||||
// --- H3 path: try QUIC connection before TCP ---
|
||||
if let ProtocolDecision::H3 { port: h3_port } = protocol_decision {
|
||||
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||
host: upstream.host.clone(),
|
||||
port: h3_port,
|
||||
use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
|
||||
// Try H3 pool checkout first
|
||||
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
|
||||
self.metrics.backend_pool_hit(&upstream_key);
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
}
|
||||
|
||||
// Try fresh QUIC connection
|
||||
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||
Ok(quic_conn) => {
|
||||
self.metrics.backend_pool_miss(&upstream_key);
|
||||
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(backend = %upstream_key, error = %e,
|
||||
"H3 backend connect failed, falling back to H2/H1");
|
||||
// Invalidate H3 from cache — next request will ALPN probe for H2/H1
|
||||
if is_auto_detect_mode {
|
||||
self.protocol_cache.insert(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H1,
|
||||
);
|
||||
}
|
||||
// Fall through to TCP path (ALPN probe for auto, or H1 for explicit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Connection pooling: try reusing an existing connection first ---
|
||||
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
|
||||
if !needs_alpn_probe {
|
||||
@@ -870,6 +944,19 @@ impl HttpProxyService {
|
||||
};
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.backend_connection_closed(&upstream_key);
|
||||
|
||||
// --- Alt-Svc discovery: check if backend advertises H3 ---
|
||||
if is_auto_detect_mode {
|
||||
if let Ok(ref resp) = result {
|
||||
if let Some(alt_svc) = resp.headers().get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||
debug!(backend = %upstream_key, h3_port, "Backend advertises H3 via Alt-Svc");
|
||||
self.protocol_cache.insert_h3(protocol_cache_key, h3_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
@@ -2393,6 +2480,252 @@ impl HttpProxyService {
|
||||
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
Arc::new(config)
|
||||
}
|
||||
|
||||
/// Create a shared QUIC client endpoint for outbound H3 backend connections.
|
||||
fn create_quinn_client_endpoint() -> quinn::Endpoint {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let mut tls_config = rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
|
||||
.with_no_client_auth();
|
||||
tls_config.alpn_protocols = vec![b"h3".to_vec()];
|
||||
|
||||
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
|
||||
.expect("Failed to create QUIC client crypto config");
|
||||
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
|
||||
|
||||
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
|
||||
.expect("Failed to create QUIC client endpoint");
|
||||
endpoint.set_default_client_config(client_config);
|
||||
endpoint
|
||||
}
|
||||
|
||||
/// Connect to a backend via QUIC (H3).
|
||||
async fn connect_quic_backend(
|
||||
&self,
|
||||
host: &str,
|
||||
port: u16,
|
||||
) -> Result<quinn::Connection, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let addr = tokio::net::lookup_host(format!("{}:{}", host, port))
|
||||
.await?
|
||||
.next()
|
||||
.ok_or("DNS resolution returned no addresses")?;
|
||||
|
||||
let server_name = host.to_string();
|
||||
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
|
||||
|
||||
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
|
||||
.map_err(|_| "QUIC connect timeout (3s)")??;
|
||||
|
||||
debug!("QUIC backend connection established to {}:{}", host, port);
|
||||
Ok(connection)
|
||||
}
|
||||
|
||||
/// Forward request to backend via HTTP/3 over QUIC.
|
||||
async fn forward_h3(
|
||||
&self,
|
||||
quic_conn: quinn::Connection,
|
||||
parts: hyper::http::request::Parts,
|
||||
body: Incoming,
|
||||
upstream_headers: hyper::HeaderMap,
|
||||
upstream_path: &str,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
pool_key: &crate::connection_pool::PoolKey,
|
||||
domain: &str,
|
||||
conn_activity: &ConnActivity,
|
||||
backend_key: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
|
||||
let (mut driver, mut send_request) = match h3::client::new(h3_quinn_conn).await {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
|
||||
self.metrics.backend_handshake_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the h3 connection driver
|
||||
let driver_pool = Arc::clone(&self.connection_pool);
|
||||
let driver_pool_key = pool_key.clone();
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
let driver_gen = Arc::clone(&gen_holder);
|
||||
tokio::spawn(async move {
|
||||
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
|
||||
debug!("H3 connection driver closed: {:?}", close_err);
|
||||
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if g != u64::MAX {
|
||||
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
|
||||
}
|
||||
});
|
||||
|
||||
// Build the H3 request
|
||||
let uri = hyper::Uri::builder()
|
||||
.scheme("https")
|
||||
.authority(domain)
|
||||
.path_and_query(upstream_path)
|
||||
.build()
|
||||
.unwrap_or_else(|_| upstream_path.parse().unwrap_or_default());
|
||||
|
||||
let mut h3_req = hyper::Request::builder()
|
||||
.method(parts.method.clone())
|
||||
.uri(uri);
|
||||
|
||||
if let Some(headers) = h3_req.headers_mut() {
|
||||
*headers = upstream_headers;
|
||||
}
|
||||
|
||||
let h3_req = h3_req.body(()).unwrap();
|
||||
|
||||
// Send the request
|
||||
let mut stream = match send_request.send_request(h3_req).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 send_request failed");
|
||||
self.metrics.backend_request_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 request failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// Stream request body
|
||||
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
||||
let sip: Arc<str> = Arc::from(source_ip);
|
||||
|
||||
{
|
||||
use http_body_util::BodyExt;
|
||||
let mut body = body;
|
||||
while let Some(frame) = body.frame().await {
|
||||
match frame {
|
||||
Ok(frame) => {
|
||||
if let Some(data) = frame.data_ref() {
|
||||
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
|
||||
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
|
||||
error!(backend = %backend_key, error = %e, "H3 send_data failed");
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(backend = %backend_key, error = %e, "Client body read error during H3 forward");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Signal end of body
|
||||
stream.finish().await.ok();
|
||||
}
|
||||
|
||||
// Read response
|
||||
let h3_response = match stream.recv_response().await {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 recv_response failed");
|
||||
self.metrics.backend_request_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 response failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// Build the response for the client
|
||||
let status = h3_response.status();
|
||||
let mut response = Response::builder().status(status);
|
||||
|
||||
if let Some(headers) = response.headers_mut() {
|
||||
for (name, value) in h3_response.headers() {
|
||||
let n = name.as_str();
|
||||
// Skip hop-by-hop headers
|
||||
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" {
|
||||
continue;
|
||||
}
|
||||
headers.insert(name.clone(), value.clone());
|
||||
}
|
||||
ResponseFilter::apply_headers(route, headers, None);
|
||||
}
|
||||
|
||||
// Stream response body back via an adapter
|
||||
let h3_body = H3ClientResponseBody { stream };
|
||||
let counting_body = CountingBody::new(
|
||||
h3_body,
|
||||
Arc::clone(&self.metrics),
|
||||
rid,
|
||||
Some(sip),
|
||||
Direction::Out,
|
||||
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
||||
|
||||
let counting_body = if let Some(ref ar) = conn_activity.active_requests {
|
||||
counting_body.with_active_requests(Arc::clone(ar))
|
||||
} else {
|
||||
counting_body
|
||||
};
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||
|
||||
// Register connection in pool on success
|
||||
if status != StatusCode::BAD_GATEWAY {
|
||||
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
|
||||
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
self.metrics.set_backend_protocol(backend_key, "h3");
|
||||
Ok(response.body(body).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse an Alt-Svc header value to extract the H3 port.
|
||||
/// Handles formats like `h3=":443"; ma=86400` and `h3=":8443", h2=":443"`.
|
||||
fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
|
||||
for directive in header_value.split(',') {
|
||||
let directive = directive.trim();
|
||||
// Match h3=":<port>" or h3-29=":<port>" etc.
|
||||
if directive.starts_with("h3=") || directive.starts_with("h3-") {
|
||||
// Find the port in ":<port>"
|
||||
if let Some(start) = directive.find("\":") {
|
||||
let rest = &directive[start + 2..];
|
||||
if let Some(end) = rest.find('"') {
|
||||
if let Ok(port) = rest[..end].parse::<u16>() {
|
||||
return Some(port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Response body adapter for H3 client responses.
|
||||
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
|
||||
struct H3ClientResponseBody {
|
||||
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
||||
}
|
||||
|
||||
impl http_body::Body for H3ClientResponseBody {
|
||||
type Data = Bytes;
|
||||
type Error = hyper::Error;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||||
// h3's recv_data is async, so we need to poll it manually.
|
||||
// Use a small future to poll the recv_data call.
|
||||
use std::future::Future;
|
||||
let mut fut = Box::pin(self.stream.recv_data());
|
||||
match fut.as_mut().poll(_cx) {
|
||||
Poll::Ready(Ok(Some(mut buf))) => {
|
||||
use bytes::Buf;
|
||||
let data = Bytes::copy_from_slice(buf.chunk());
|
||||
buf.advance(buf.remaining());
|
||||
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||||
}
|
||||
Poll::Ready(Ok(None)) => Poll::Ready(None),
|
||||
Poll::Ready(Err(e)) => {
|
||||
warn!("H3 response body recv error: {}", e);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Insecure certificate verifier for backend TLS connections (fallback only).
|
||||
@@ -2463,6 +2796,7 @@ impl Default for HttpProxyService {
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user