//! HTTP/3 proxy service. //! //! Accepts QUIC connections via quinn, runs h3 server to handle HTTP/3 requests, //! and delegates backend forwarding to the shared `HttpProxyService` — same //! route matching, connection pool, and protocol auto-detection as TCP/HTTP. use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; use http_body::Frame; use http_body_util::BodyExt; use http_body_util::combinators::BoxBody; use tracing::{debug, warn}; use rustproxy_config::RouteConfig; use tokio_util::sync::CancellationToken; use crate::proxy_service::{ConnActivity, HttpProxyService}; /// HTTP/3 proxy service. /// /// Accepts QUIC connections, parses HTTP/3 requests, and delegates backend /// forwarding to the shared `HttpProxyService`. pub struct H3ProxyService { http_proxy: Arc, } impl H3ProxyService { pub fn new(http_proxy: Arc) -> Self { Self { http_proxy } } /// Handle an accepted QUIC connection as HTTP/3. /// /// If `real_client_addr` is provided (from PROXY protocol), it overrides /// `connection.remote_address()` for client IP attribution. pub async fn handle_connection( &self, connection: quinn::Connection, _fallback_route: &RouteConfig, port: u16, real_client_addr: Option, parent_cancel: &CancellationToken, ) -> anyhow::Result<()> { let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address()); debug!("HTTP/3 connection from {} on port {}", remote_addr, port); let mut h3_conn: h3::server::Connection = h3::server::builder() .send_grease(false) .build(h3_quinn::Connection::new(connection)) .await .map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?; loop { let resolver = tokio::select! { _ = parent_cancel.cancelled() => { debug!("HTTP/3 connection from {} cancelled by parent", remote_addr); break; } result = h3_conn.accept() => { match result { Ok(Some(resolver)) => resolver, Ok(None) => { debug!("HTTP/3 connection from {} closed", remote_addr); break; } Err(e) => { debug!("HTTP/3 accept error from {}: {}", remote_addr, e); break; } } } }; let (request, stream) = match resolver.resolve_request().await { Ok(pair) => pair, Err(e) => { debug!("HTTP/3 request resolve error: {}", e); continue; } }; let http_proxy = Arc::clone(&self.http_proxy); let request_cancel = parent_cancel.child_token(); tokio::spawn(async move { if let Err(e) = handle_h3_request( request, stream, port, remote_addr, &http_proxy, request_cancel, ).await { debug!("HTTP/3 request error from {}: {}", remote_addr, e); } }); } Ok(()) } } /// Handle a single HTTP/3 request by delegating to HttpProxyService. /// /// 1. Read the H3 request body via an mpsc channel (streaming, not buffered) /// 2. Build a `hyper::Request` that HttpProxyService can handle /// 3. Call `HttpProxyService::handle_request` — same route matching, connection /// pool, ALPN protocol detection (H1/H2/H3) as the TCP/HTTP path /// 4. Stream the response back over the H3 stream async fn handle_h3_request( request: hyper::Request<()>, mut stream: h3::server::RequestStream, Bytes>, port: u16, peer_addr: SocketAddr, http_proxy: &HttpProxyService, cancel: CancellationToken, ) -> anyhow::Result<()> { // Stream request body from H3 client via an mpsc channel. let (body_tx, body_rx) = tokio::sync::mpsc::channel::(4); // Spawn the H3 body reader task with cancellation let body_cancel = cancel.clone(); let body_reader = tokio::spawn(async move { loop { let chunk = tokio::select! { _ = body_cancel.cancelled() => break, result = stream.recv_data() => { match result { Ok(Some(chunk)) => chunk, _ => break, } } }; let mut chunk = chunk; let data = Bytes::copy_from_slice(chunk.chunk()); chunk.advance(chunk.remaining()); if body_tx.send(data).await.is_err() { break; } } stream }); // Build a hyper::Request from the H3 request + streaming body. // The URI already has scheme + authority + path set by the h3 crate. let body = H3RequestBody { receiver: body_rx }; let (parts, _) = request.into_parts(); let boxed_body: BoxBody = BoxBody::new(body); let req = hyper::Request::from_parts(parts, boxed_body); // Delegate to HttpProxyService — same backend path as TCP/HTTP: // route matching, ALPN protocol detection, connection pool, H1/H2/H3 auto. let conn_activity = ConnActivity::new_standalone(); let response = http_proxy.handle_request(req, peer_addr, port, cancel, conn_activity).await .map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?; // Await the body reader to get the H3 stream back let mut stream = body_reader.await .map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?; // Send response headers over H3 (skip hop-by-hop headers) let (resp_parts, resp_body) = response.into_parts(); let mut h3_response = hyper::Response::builder().status(resp_parts.status); for (name, value) in &resp_parts.headers { let n = name.as_str(); if n == "transfer-encoding" || n == "connection" || n == "keep-alive" || n == "upgrade" { continue; } h3_response = h3_response.header(name, value); } let h3_response = h3_response.body(()) .map_err(|e| anyhow::anyhow!("Failed to build H3 response: {}", e))?; stream.send_response(h3_response).await .map_err(|e| anyhow::anyhow!("Failed to send H3 response: {}", e))?; // Stream response body back over H3 let mut resp_body = resp_body; while let Some(frame) = resp_body.frame().await { match frame { Ok(frame) => { if let Some(data) = frame.data_ref() { stream.send_data(Bytes::copy_from_slice(data)).await .map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?; } } Err(e) => { warn!("Response body read error: {}", e); break; } } } // Finish the H3 stream (send QUIC FIN) stream.finish().await .map_err(|e| anyhow::anyhow!("Failed to finish H3 stream: {}", e))?; Ok(()) } /// A streaming request body backed by an mpsc channel receiver. /// /// Implements `http_body::Body` so hyper can poll chunks as they arrive /// from the H3 client, avoiding buffering the entire request body in memory. struct H3RequestBody { receiver: tokio::sync::mpsc::Receiver, } impl http_body::Body for H3RequestBody { type Data = Bytes; type Error = hyper::Error; fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { match self.receiver.poll_recv(cx) { Poll::Ready(Some(data)) => Poll::Ready(Some(Ok(Frame::data(data)))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } }