fix(rustproxy-http): improve HTTP/3 connection reuse and clean up stale proxy state

This commit is contained in:
2026-03-26 07:05:57 +00:00
parent 437d1a3329
commit a3d8a3a388
14 changed files with 185 additions and 225 deletions

View File

@@ -30,3 +30,4 @@ socket2 = { workspace = true }
quinn = { workspace = true }
h3 = { workspace = true }
h3-quinn = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["std"] }

View File

@@ -56,7 +56,11 @@ struct PooledH2 {
}
/// A pooled QUIC/HTTP/3 connection (multiplexed like H2).
/// Stores the h3 `SendRequest` handle so pool hits skip the h3 SETTINGS handshake.
pub struct PooledH3 {
/// Multiplexed h3 request handle — clone to open a new stream.
pub send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
/// Raw QUIC connection — kept for liveness probing (close_reason) only.
pub connection: quinn::Connection,
pub created_at: Instant,
pub generation: u64,
@@ -197,7 +201,10 @@ impl ConnectionPool {
/// Try to get a pooled QUIC connection for the given key.
/// QUIC connections are multiplexed — the connection is shared, not removed.
pub fn checkout_h3(&self, key: &PoolKey) -> Option<(quinn::Connection, Duration)> {
pub fn checkout_h3(
&self,
key: &PoolKey,
) -> Option<(h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>, quinn::Connection, Duration)> {
let entry = self.h3_pool.get(key)?;
let pooled = entry.value();
let age = pooled.created_at.elapsed();
@@ -215,13 +222,20 @@ impl ConnectionPool {
return None;
}
Some((pooled.connection.clone(), age))
Some((pooled.send_request.clone(), pooled.connection.clone(), age))
}
/// Register a QUIC connection in the pool. Returns the generation ID.
pub fn register_h3(&self, key: PoolKey, connection: quinn::Connection) -> u64 {
/// Register a QUIC connection and its h3 SendRequest handle in the pool.
/// Returns the generation ID.
pub fn register_h3(
&self,
key: PoolKey,
connection: quinn::Connection,
send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
) -> u64 {
let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed);
self.h3_pool.insert(key, PooledH3 {
send_request,
connection,
created_at: Instant::now(),
generation: gen,

View File

@@ -116,7 +116,7 @@ async fn handle_h3_request(
cancel: CancellationToken,
) -> anyhow::Result<()> {
// Stream request body from H3 client via an mpsc channel.
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(32);
// Spawn the H3 body reader task with cancellation
let body_cancel = cancel.clone();
@@ -132,8 +132,7 @@ async fn handle_h3_request(
}
};
let mut chunk = chunk;
let data = Bytes::copy_from_slice(chunk.chunk());
chunk.advance(chunk.remaining());
let data = chunk.copy_to_bytes(chunk.remaining());
if body_tx.send(data).await.is_err() {
break;
}
@@ -179,8 +178,8 @@ async fn handle_h3_request(
while let Some(frame) = resp_body.frame().await {
match frame {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
stream.send_data(Bytes::copy_from_slice(data)).await
if let Ok(data) = frame.into_data() {
stream.send_data(data).await
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
}
}

View File

@@ -72,15 +72,16 @@ const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_s
/// If no new request arrives within this duration, the connection is closed.
const DEFAULT_HTTP_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
/// Default HTTP max connection lifetime (1 hour).
/// HTTP connections are forcefully closed after this duration regardless of activity.
const DEFAULT_HTTP_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(3600);
/// Default WebSocket inactivity timeout (1 hour).
const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600);
/// Default WebSocket max lifetime (24 hours).
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
/// Protocol decision for backend connection.
#[derive(Debug)]
enum ProtocolDecision {
@@ -222,6 +223,8 @@ pub struct HttpProxyService {
protocol_cache: Arc<crate::protocol_cache::ProtocolCache>,
/// HTTP keep-alive idle timeout: close connection if no new request arrives within this duration.
http_idle_timeout: std::time::Duration,
/// HTTP max connection lifetime: forcefully close connection after this duration regardless of activity.
http_max_lifetime: std::time::Duration,
/// WebSocket inactivity timeout (no data in either direction).
ws_inactivity_timeout: std::time::Duration,
/// WebSocket maximum connection lifetime.
@@ -248,6 +251,7 @@ impl HttpProxyService {
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
@@ -275,21 +279,24 @@ impl HttpProxyService {
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
}
}
/// Set the HTTP keep-alive idle timeout, WebSocket inactivity timeout, and
/// WebSocket max lifetime from connection config values.
/// Set the HTTP keep-alive idle timeout, HTTP max lifetime, WebSocket inactivity
/// timeout, and WebSocket max lifetime from connection config values.
pub fn set_connection_timeouts(
&mut self,
http_idle_timeout: std::time::Duration,
http_max_lifetime: std::time::Duration,
ws_inactivity_timeout: std::time::Duration,
ws_max_lifetime: std::time::Duration,
) {
self.http_idle_timeout = http_idle_timeout;
self.http_max_lifetime = http_max_lifetime;
self.ws_inactivity_timeout = ws_inactivity_timeout;
self.ws_max_lifetime = ws_max_lifetime;
}
@@ -314,6 +321,15 @@ impl HttpProxyService {
self.protocol_cache.clear();
}
/// Clean up expired entries in all per-route rate limiters.
/// Called from the background sampling task to prevent unbounded growth
/// when traffic stops after a burst of unique IPs.
pub fn cleanup_all_rate_limiters(&self) {
for entry in self.route_rate_limiters.iter() {
entry.value().cleanup();
}
}
/// Snapshot the protocol cache for metrics/UI display.
pub fn protocol_cache_snapshot(&self) -> Vec<crate::protocol_cache::ProtocolCacheEntry> {
self.protocol_cache.snapshot()
@@ -354,6 +370,7 @@ impl HttpProxyService {
// Capture timeouts before `self` is moved into the service closure.
let idle_timeout = self.http_idle_timeout;
let max_lifetime = self.http_max_lifetime;
// Activity tracker: updated at the START and END of each request.
// The idle watchdog checks this to determine if the connection is idle
@@ -412,15 +429,23 @@ impl HttpProxyService {
}
}
_ = async {
// Idle watchdog: check every 5s whether the connection has been idle
// (no active requests AND no activity for idle_timeout).
// This avoids killing long-running requests or upgraded connections.
// Idle + lifetime watchdog: check every 5s whether the connection has been
// idle (no active requests AND no activity for idle_timeout) or exceeded
// the max connection lifetime.
let check_interval = std::time::Duration::from_secs(5);
let mut last_seen = 0u64;
loop {
tokio::time::sleep(check_interval).await;
// Never close while a request is in progress
// Check max connection lifetime (unconditional — even active connections
// must eventually be recycled to prevent resource accumulation).
if start.elapsed() >= max_lifetime {
debug!("HTTP connection exceeded max lifetime ({}s) from {}",
max_lifetime.as_secs(), peer_addr);
return;
}
// Never close for idleness while a request is in progress
if active_requests.load(Ordering::Relaxed) > 0 {
last_seen = last_activity.load(Ordering::Relaxed);
continue;
@@ -437,7 +462,7 @@ impl HttpProxyService {
last_seen = current;
}
} => {
debug!("HTTP connection idle timeout ({}s) from {}", idle_timeout.as_secs(), peer_addr);
debug!("HTTP connection timeout from {}", peer_addr);
conn.as_mut().graceful_shutdown();
// Give any in-flight work 5s to drain after graceful shutdown
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), conn).await;
@@ -791,10 +816,10 @@ impl HttpProxyService {
};
// Try H3 pool checkout first
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
if let Some((pooled_sr, quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
self.metrics.backend_pool_hit(&upstream_key);
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, Some(pooled_sr), parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -807,7 +832,7 @@ impl HttpProxyService {
self.metrics.backend_pool_miss(&upstream_key);
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -966,7 +991,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -1009,7 +1034,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -1068,7 +1093,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -1111,7 +1136,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -2744,7 +2769,12 @@ impl HttpProxyService {
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("Failed to create QUIC client crypto config");
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
// Tune QUIC transport to match H2 flow-control: 2 MB per-stream receive window.
let mut transport = quinn::TransportConfig::default();
transport.stream_receive_window(quinn::VarInt::from_u32(2 * 1024 * 1024));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
client_config.transport_config(Arc::new(transport));
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
.expect("Failed to create QUIC client endpoint");
@@ -2766,8 +2796,8 @@ impl HttpProxyService {
let server_name = host.to_string();
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
.map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??;
let connection = tokio::time::timeout(self.connect_timeout, connecting).await
.map_err(|_| format!("QUIC connect timeout ({:?}) for {}", self.connect_timeout, host))??;
debug!("QUIC backend connection established to {}:{}", host, port);
Ok(connection)
@@ -2777,6 +2807,7 @@ impl HttpProxyService {
async fn forward_h3(
&self,
quic_conn: quinn::Connection,
pooled_sender: Option<h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>>,
parts: hyper::http::request::Parts,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
@@ -2789,33 +2820,42 @@ impl HttpProxyService {
conn_activity: &ConnActivity,
backend_key: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, mut send_request) = match h3::client::builder()
.send_grease(false)
.build(h3_quinn_conn)
.await
{
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
self.metrics.backend_handshake_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
}
};
// Obtain the h3 SendRequest handle: skip handshake + driver on pool hit.
let (mut send_request, gen_holder) = if let Some(sr) = pooled_sender {
// Pool hit — reuse existing h3 session, no SETTINGS round-trip
(sr, None)
} else {
// Fresh QUIC connection — full h3 handshake + driver spawn
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, sr) = match h3::client::builder()
.send_grease(false)
.build(h3_quinn_conn)
.await
{
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
self.metrics.backend_handshake_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
}
};
// Spawn the h3 connection driver
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
let driver_gen = Arc::clone(&gen_holder);
tokio::spawn(async move {
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
if g != u64::MAX {
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
{
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let driver_gen = Arc::clone(&gen_holder);
tokio::spawn(async move {
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
if g != u64::MAX {
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
}
});
}
});
(sr, Some(gen_holder))
};
// Build the H3 request
let uri = hyper::Uri::builder()
@@ -2845,7 +2885,7 @@ impl HttpProxyService {
}
};
// Stream request body
// Stream request body (zero-copy: into_data yields owned Bytes)
let rid: Option<Arc<str>> = route_id.map(Arc::from);
let sip: Arc<str> = Arc::from(source_ip);
@@ -2855,9 +2895,9 @@ impl HttpProxyService {
while let Some(frame) = body.frame().await {
match frame {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
if let Ok(data) = frame.into_data() {
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
if let Err(e) = stream.send_data(data).await {
error!(backend = %backend_key, error = %e, "H3 send_data failed");
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
}
@@ -2899,8 +2939,23 @@ impl HttpProxyService {
ResponseFilter::apply_headers(route, headers, None);
}
// Stream response body back via an adapter
let h3_body = H3ClientResponseBody { stream };
// Stream response body back via unfold — correctly preserves waker across polls
let body_stream = futures::stream::unfold(stream, |mut s| async move {
match s.recv_data().await {
Ok(Some(mut buf)) => {
use bytes::Buf;
let data = buf.copy_to_bytes(buf.remaining());
Some((Ok::<_, hyper::Error>(http_body::Frame::data(data)), s))
}
Ok(None) => None,
Err(e) => {
warn!("H3 response body recv error: {}", e);
None
}
}
});
let h3_body = http_body_util::StreamBody::new(body_stream);
let counting_body = CountingBody::new(
h3_body,
Arc::clone(&self.metrics),
@@ -2917,10 +2972,16 @@ impl HttpProxyService {
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
// Register connection in pool on success
// Register connection in pool on success (fresh connections only)
if status != StatusCode::BAD_GATEWAY {
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
if let Some(gh) = gen_holder {
let g = self.connection_pool.register_h3(
pool_key.clone(),
quic_conn,
send_request,
);
gh.store(g, std::sync::atomic::Ordering::Relaxed);
}
}
self.metrics.set_backend_protocol(backend_key, "h3");
@@ -2949,41 +3010,6 @@ fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
None
}
/// Response body adapter for H3 client responses.
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
struct H3ClientResponseBody {
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl http_body::Body for H3ClientResponseBody {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
// h3's recv_data is async, so we need to poll it manually.
// Use a small future to poll the recv_data call.
use std::future::Future;
let mut fut = Box::pin(self.stream.recv_data());
match fut.as_mut().poll(_cx) {
Poll::Ready(Ok(Some(mut buf))) => {
use bytes::Buf;
let data = Bytes::copy_from_slice(buf.chunk());
buf.advance(buf.remaining());
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
}
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => {
warn!("H3 response body recv error: {}", e);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
/// Insecure certificate verifier for backend TLS connections (fallback only).
/// The production path uses the shared config from tls_handler which has the same
/// behavior but with session resumption across all outbound connections.
@@ -3052,6 +3078,7 @@ impl Default for HttpProxyService {
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),