fix(rustproxy-http): reuse the shared HTTP proxy service for HTTP/3 request handling

This commit is contained in:
2026-03-20 08:57:18 +00:00
parent ea8224c400
commit fb1c59ac9a
8 changed files with 101 additions and 276 deletions

View File

@@ -36,7 +36,7 @@ use crate::upstream_selector::UpstreamSelector;
/// Per-connection context for keeping the idle watchdog alive during body streaming.
/// Passed through the forwarding chain so CountingBody can update the timestamp.
#[derive(Clone)]
struct ConnActivity {
pub struct ConnActivity {
last_activity: Arc<AtomicU64>,
start: std::time::Instant,
/// Active-request counter from handle_io's idle watchdog. When set, CountingBody
@@ -49,6 +49,19 @@ struct ConnActivity {
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
}
impl ConnActivity {
/// Create a minimal ConnActivity (no idle watchdog, no Alt-Svc cache).
/// Used by H3ProxyService where the TCP idle watchdog doesn't apply.
pub fn new_standalone() -> Self {
Self {
last_activity: Arc::new(AtomicU64::new(0)),
start: std::time::Instant::now(),
active_requests: None,
alt_svc_cache_key: None,
}
}
}
/// Default upstream connect timeout (30 seconds).
const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
@@ -347,6 +360,7 @@ impl HttpProxyService {
let st = start;
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
async move {
let req = req.map(|body| BoxBody::new(body));
let result = svc.handle_request(req, peer, port, cn, ca).await;
// Mark request end — update activity timestamp before guard drops
la.store(st.elapsed().as_millis() as u64, Ordering::Relaxed);
@@ -416,9 +430,13 @@ impl HttpProxyService {
}
/// Handle a single HTTP request.
async fn handle_request(
///
/// Accepts a generic body (`BoxBody`) so both the TCP/HTTP path (which boxes
/// `Incoming`) and the H3 path (which boxes the H3 request body stream) can
/// share the same backend forwarding logic.
pub async fn handle_request(
&self,
req: Request<Incoming>,
req: Request<BoxBody<Bytes, hyper::Error>>,
peer_addr: std::net::SocketAddr,
port: u16,
cancel: CancellationToken,
@@ -965,7 +983,7 @@ impl HttpProxyService {
&self,
io: TokioIo<BackendStream>,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
_upstream: &crate::upstream_selector::UpstreamSelection,
@@ -1013,7 +1031,7 @@ impl HttpProxyService {
&self,
mut sender: hyper::client::conn::http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
@@ -1077,7 +1095,7 @@ impl HttpProxyService {
&self,
io: TokioIo<BackendStream>,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
_upstream: &crate::upstream_selector::UpstreamSelection,
@@ -1151,7 +1169,7 @@ impl HttpProxyService {
&self,
sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
@@ -1344,7 +1362,7 @@ impl HttpProxyService {
&self,
io: TokioIo<BackendStream>,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
mut upstream_headers: hyper::HeaderMap,
upstream_path: &str,
upstream: &crate::upstream_selector::UpstreamSelection,
@@ -1675,7 +1693,7 @@ impl HttpProxyService {
&self,
mut sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
@@ -1816,7 +1834,7 @@ impl HttpProxyService {
/// Handle a WebSocket upgrade request (H1 Upgrade or H2 Extended CONNECT per RFC 8441).
async fn handle_websocket_upgrade(
&self,
req: Request<Incoming>,
req: Request<BoxBody<Bytes, hyper::Error>>,
peer_addr: std::net::SocketAddr,
upstream: &crate::upstream_selector::UpstreamSelection,
route: &rustproxy_config::RouteConfig,
@@ -2538,7 +2556,7 @@ impl HttpProxyService {
&self,
quic_conn: quinn::Connection,
parts: hyper::http::request::Parts,
body: Incoming,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,