224 lines
8.0 KiB
Rust
224 lines
8.0 KiB
Rust
//! HTTP/3 proxy service.
|
|
//!
|
|
//! Accepts QUIC connections via quinn, runs h3 server to handle HTTP/3 requests,
|
|
//! and delegates backend forwarding to the shared `HttpProxyService` — same
|
|
//! route matching, connection pool, and protocol auto-detection as TCP/HTTP.
|
|
|
|
use std::net::SocketAddr;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::task::{Context, Poll};
|
|
|
|
use bytes::{Buf, Bytes};
|
|
use http_body::Frame;
|
|
use http_body_util::BodyExt;
|
|
use http_body_util::combinators::BoxBody;
|
|
use tracing::{debug, warn};
|
|
|
|
use rustproxy_config::RouteConfig;
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
use crate::proxy_service::{ConnActivity, HttpProxyService};
|
|
|
|
/// HTTP/3 proxy service.
|
|
///
|
|
/// Accepts QUIC connections, parses HTTP/3 requests, and delegates backend
|
|
/// forwarding to the shared `HttpProxyService`.
|
|
pub struct H3ProxyService {
|
|
http_proxy: Arc<HttpProxyService>,
|
|
}
|
|
|
|
impl H3ProxyService {
|
|
pub fn new(http_proxy: Arc<HttpProxyService>) -> Self {
|
|
Self { http_proxy }
|
|
}
|
|
|
|
/// Handle an accepted QUIC connection as HTTP/3.
|
|
///
|
|
/// If `real_client_addr` is provided (from PROXY protocol), it overrides
|
|
/// `connection.remote_address()` for client IP attribution.
|
|
pub async fn handle_connection(
|
|
&self,
|
|
connection: quinn::Connection,
|
|
_fallback_route: &RouteConfig,
|
|
port: u16,
|
|
real_client_addr: Option<SocketAddr>,
|
|
parent_cancel: &CancellationToken,
|
|
) -> anyhow::Result<()> {
|
|
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
|
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
|
|
|
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
|
h3::server::builder()
|
|
.send_grease(false)
|
|
.build(h3_quinn::Connection::new(connection))
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?;
|
|
|
|
loop {
|
|
let resolver = tokio::select! {
|
|
_ = parent_cancel.cancelled() => {
|
|
debug!("HTTP/3 connection from {} cancelled by parent", remote_addr);
|
|
break;
|
|
}
|
|
result = h3_conn.accept() => {
|
|
match result {
|
|
Ok(Some(resolver)) => resolver,
|
|
Ok(None) => {
|
|
debug!("HTTP/3 connection from {} closed", remote_addr);
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
debug!("HTTP/3 accept error from {}: {}", remote_addr, e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
let (request, stream) = match resolver.resolve_request().await {
|
|
Ok(pair) => pair,
|
|
Err(e) => {
|
|
debug!("HTTP/3 request resolve error: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let http_proxy = Arc::clone(&self.http_proxy);
|
|
let request_cancel = parent_cancel.child_token();
|
|
|
|
tokio::spawn(async move {
|
|
if let Err(e) = handle_h3_request(
|
|
request, stream, port, remote_addr, &http_proxy, request_cancel,
|
|
).await {
|
|
debug!("HTTP/3 request error from {}: {}", remote_addr, e);
|
|
}
|
|
});
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Handle a single HTTP/3 request by delegating to HttpProxyService.
|
|
///
|
|
/// 1. Read the H3 request body via an mpsc channel (streaming, not buffered)
|
|
/// 2. Build a `hyper::Request<BoxBody>` that HttpProxyService can handle
|
|
/// 3. Call `HttpProxyService::handle_request` — same route matching, connection
|
|
/// pool, ALPN protocol detection (H1/H2/H3) as the TCP/HTTP path
|
|
/// 4. Stream the response back over the H3 stream
|
|
async fn handle_h3_request(
|
|
request: hyper::Request<()>,
|
|
mut stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
|
port: u16,
|
|
peer_addr: SocketAddr,
|
|
http_proxy: &HttpProxyService,
|
|
cancel: CancellationToken,
|
|
) -> anyhow::Result<()> {
|
|
// Stream request body from H3 client via an mpsc channel.
|
|
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
|
|
|
|
// Spawn the H3 body reader task with cancellation
|
|
let body_cancel = cancel.clone();
|
|
let body_reader = tokio::spawn(async move {
|
|
loop {
|
|
let chunk = tokio::select! {
|
|
_ = body_cancel.cancelled() => break,
|
|
result = stream.recv_data() => {
|
|
match result {
|
|
Ok(Some(chunk)) => chunk,
|
|
_ => break,
|
|
}
|
|
}
|
|
};
|
|
let mut chunk = chunk;
|
|
let data = Bytes::copy_from_slice(chunk.chunk());
|
|
chunk.advance(chunk.remaining());
|
|
if body_tx.send(data).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
stream
|
|
});
|
|
|
|
// Build a hyper::Request<BoxBody> from the H3 request + streaming body.
|
|
// The URI already has scheme + authority + path set by the h3 crate.
|
|
let body = H3RequestBody { receiver: body_rx };
|
|
let (parts, _) = request.into_parts();
|
|
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(body);
|
|
let req = hyper::Request::from_parts(parts, boxed_body);
|
|
|
|
// Delegate to HttpProxyService — same backend path as TCP/HTTP:
|
|
// route matching, ALPN protocol detection, connection pool, H1/H2/H3 auto.
|
|
let conn_activity = ConnActivity::new_standalone();
|
|
let response = http_proxy.handle_request(req, peer_addr, port, cancel, conn_activity).await
|
|
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
|
|
|
// Await the body reader to get the H3 stream back
|
|
let mut stream = body_reader.await
|
|
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
|
|
|
|
// Send response headers over H3 (skip hop-by-hop headers)
|
|
let (resp_parts, resp_body) = response.into_parts();
|
|
let mut h3_response = hyper::Response::builder().status(resp_parts.status);
|
|
for (name, value) in &resp_parts.headers {
|
|
let n = name.as_str();
|
|
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" || n == "upgrade" {
|
|
continue;
|
|
}
|
|
h3_response = h3_response.header(name, value);
|
|
}
|
|
let h3_response = h3_response.body(())
|
|
.map_err(|e| anyhow::anyhow!("Failed to build H3 response: {}", e))?;
|
|
|
|
stream.send_response(h3_response).await
|
|
.map_err(|e| anyhow::anyhow!("Failed to send H3 response: {}", e))?;
|
|
|
|
// Stream response body back over H3
|
|
let mut resp_body = resp_body;
|
|
while let Some(frame) = resp_body.frame().await {
|
|
match frame {
|
|
Ok(frame) => {
|
|
if let Some(data) = frame.data_ref() {
|
|
stream.send_data(Bytes::copy_from_slice(data)).await
|
|
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
warn!("Response body read error: {}", e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finish the H3 stream (send QUIC FIN)
|
|
stream.finish().await
|
|
.map_err(|e| anyhow::anyhow!("Failed to finish H3 stream: {}", e))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// A streaming request body backed by an mpsc channel receiver.
|
|
///
|
|
/// Implements `http_body::Body` so hyper can poll chunks as they arrive
|
|
/// from the H3 client, avoiding buffering the entire request body in memory.
|
|
struct H3RequestBody {
|
|
receiver: tokio::sync::mpsc::Receiver<Bytes>,
|
|
}
|
|
|
|
impl http_body::Body for H3RequestBody {
|
|
type Data = Bytes;
|
|
type Error = hyper::Error;
|
|
|
|
fn poll_frame(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
|
match self.receiver.poll_recv(cx) {
|
|
Poll::Ready(Some(data)) => Poll::Ready(Some(Ok(Frame::data(data)))),
|
|
Poll::Ready(None) => Poll::Ready(None),
|
|
Poll::Pending => Poll::Pending,
|
|
}
|
|
}
|
|
}
|