|
|
|
|
@@ -36,7 +36,6 @@ pub struct H3ProxyService {
|
|
|
|
|
protocol_cache: Arc<ProtocolCache>,
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
|
upstream_selector: UpstreamSelector,
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
|
backend_tls_config: Arc<rustls::ClientConfig>,
|
|
|
|
|
connect_timeout: Duration,
|
|
|
|
|
}
|
|
|
|
|
@@ -98,12 +97,14 @@ impl H3ProxyService {
|
|
|
|
|
let rm = self.route_manager.load();
|
|
|
|
|
let pool = Arc::clone(&self.connection_pool);
|
|
|
|
|
let metrics = Arc::clone(&self.metrics);
|
|
|
|
|
let backend_tls = Arc::clone(&self.backend_tls_config);
|
|
|
|
|
let connect_timeout = self.connect_timeout;
|
|
|
|
|
let client_ip = client_ip.clone();
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
if let Err(e) = handle_h3_request(
|
|
|
|
|
request, stream, port, &client_ip, &rm, &pool, &metrics, connect_timeout,
|
|
|
|
|
request, stream, port, &client_ip, &rm, &pool, &metrics,
|
|
|
|
|
&backend_tls, connect_timeout,
|
|
|
|
|
).await {
|
|
|
|
|
debug!("HTTP/3 request error from {}: {}", client_ip, e);
|
|
|
|
|
}
|
|
|
|
|
@@ -133,6 +134,7 @@ async fn handle_h3_request(
|
|
|
|
|
route_manager: &RouteManager,
|
|
|
|
|
_connection_pool: &ConnectionPool,
|
|
|
|
|
metrics: &MetricsCollector,
|
|
|
|
|
backend_tls_config: &Arc<rustls::ClientConfig>,
|
|
|
|
|
connect_timeout: Duration,
|
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
|
let method = request.method().clone();
|
|
|
|
|
@@ -173,7 +175,15 @@ async fn handle_h3_request(
|
|
|
|
|
let backend_port = target.port.resolve(port);
|
|
|
|
|
let backend_addr = format!("{}:{}", backend_host, backend_port);
|
|
|
|
|
|
|
|
|
|
// Connect to backend via TCP HTTP/1.1 with timeout
|
|
|
|
|
// Determine if backend requires TLS (same logic as proxy_service.rs)
|
|
|
|
|
let mut use_tls = target.tls.is_some();
|
|
|
|
|
if let Some(ref tls) = route.action.tls {
|
|
|
|
|
if tls.mode == rustproxy_config::TlsMode::TerminateAndReencrypt {
|
|
|
|
|
use_tls = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Connect to backend via TCP with timeout
|
|
|
|
|
let tcp_stream = tokio::time::timeout(
|
|
|
|
|
connect_timeout,
|
|
|
|
|
tokio::net::TcpStream::connect(&backend_addr),
|
|
|
|
|
@@ -183,15 +193,27 @@ async fn handle_h3_request(
|
|
|
|
|
|
|
|
|
|
let _ = tcp_stream.set_nodelay(true);
|
|
|
|
|
|
|
|
|
|
let io = hyper_util::rt::TokioIo::new(tcp_stream);
|
|
|
|
|
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
if let Err(e) = conn.await {
|
|
|
|
|
debug!("Backend connection closed: {}", e);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
// Branch: wrap in TLS if backend requires it, then HTTP/1.1 handshake.
|
|
|
|
|
// hyper's SendRequest<B> is NOT generic over the IO type, so both branches
|
|
|
|
|
// produce the same type and can be unified.
|
|
|
|
|
let mut sender = if use_tls {
|
|
|
|
|
let connector = tokio_rustls::TlsConnector::from(Arc::clone(backend_tls_config));
|
|
|
|
|
let server_name = rustls::pki_types::ServerName::try_from(backend_host.to_string())
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Invalid backend SNI '{}': {}", backend_host, e))?;
|
|
|
|
|
let tls_stream = connector.connect(server_name, tcp_stream).await
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Backend TLS handshake to {} failed: {}", backend_addr, e))?;
|
|
|
|
|
let io = hyper_util::rt::TokioIo::new(tls_stream);
|
|
|
|
|
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
|
|
|
|
tokio::spawn(async move { let _ = conn.await; });
|
|
|
|
|
sender
|
|
|
|
|
} else {
|
|
|
|
|
let io = hyper_util::rt::TokioIo::new(tcp_stream);
|
|
|
|
|
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
|
|
|
|
|
tokio::spawn(async move { let _ = conn.await; });
|
|
|
|
|
sender
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Stream request body from H3 client to backend via an mpsc channel.
|
|
|
|
|
// This avoids buffering the entire request body in memory.
|
|
|
|
|
@@ -214,7 +236,7 @@ async fn handle_h3_request(
|
|
|
|
|
|
|
|
|
|
// Create a body that polls from the mpsc receiver
|
|
|
|
|
let body = H3RequestBody { receiver: body_rx };
|
|
|
|
|
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
|
|
|
|
|
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body, use_tls)?;
|
|
|
|
|
|
|
|
|
|
let response = sender.send_request(backend_req).await
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
|
|
|
|
@@ -237,6 +259,12 @@ async fn handle_h3_request(
|
|
|
|
|
h3_response = h3_response.header(name, value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Extract content-length for body loop termination (must be before into_body())
|
|
|
|
|
let content_length: Option<u64> = response.headers()
|
|
|
|
|
.get(hyper::header::CONTENT_LENGTH)
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.and_then(|s| s.parse().ok());
|
|
|
|
|
|
|
|
|
|
// Add Alt-Svc for HTTP/3 advertisement
|
|
|
|
|
let alt_svc = route.action.udp.as_ref()
|
|
|
|
|
.and_then(|u| u.quic.as_ref())
|
|
|
|
|
@@ -257,21 +285,52 @@ async fn handle_h3_request(
|
|
|
|
|
|
|
|
|
|
// Stream response body back
|
|
|
|
|
use http_body_util::BodyExt;
|
|
|
|
|
use http_body::Body as _;
|
|
|
|
|
let mut body = response.into_body();
|
|
|
|
|
let mut total_bytes_out: u64 = 0;
|
|
|
|
|
while let Some(frame) = body.frame().await {
|
|
|
|
|
match frame {
|
|
|
|
|
Ok(frame) => {
|
|
|
|
|
|
|
|
|
|
// Per-frame idle timeout: if no frame arrives within this duration, assume
|
|
|
|
|
// the body is complete (or the backend has stalled). This prevents indefinite
|
|
|
|
|
// hangs on close-delimited bodies or when hyper's internal trailers oneshot
|
|
|
|
|
// never resolves after all data has been received.
|
|
|
|
|
const FRAME_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
// Layer 1: If the body already knows it is finished (Content-Length
|
|
|
|
|
// bodies track remaining bytes internally), break immediately to
|
|
|
|
|
// avoid blocking on hyper's internal trailers oneshot.
|
|
|
|
|
if body.is_end_stream() {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Layer 3: Per-frame idle timeout safety net
|
|
|
|
|
match tokio::time::timeout(FRAME_IDLE_TIMEOUT, body.frame()).await {
|
|
|
|
|
Ok(Some(Ok(frame))) => {
|
|
|
|
|
if let Some(data) = frame.data_ref() {
|
|
|
|
|
total_bytes_out += data.len() as u64;
|
|
|
|
|
stream.send_data(Bytes::copy_from_slice(data)).await
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
|
|
|
|
|
|
|
|
|
// Layer 2: Content-Length byte count check
|
|
|
|
|
if let Some(cl) = content_length {
|
|
|
|
|
if total_bytes_out >= cl {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
Ok(Some(Err(e))) => {
|
|
|
|
|
warn!("Backend body read error: {}", e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
Ok(None) => break, // Body ended naturally
|
|
|
|
|
Err(_) => {
|
|
|
|
|
debug!(
|
|
|
|
|
"H3 body frame idle timeout ({:?}) after {} bytes; finishing stream",
|
|
|
|
|
FRAME_IDLE_TIMEOUT, total_bytes_out
|
|
|
|
|
);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -294,10 +353,12 @@ fn build_backend_request<B>(
|
|
|
|
|
host: &str,
|
|
|
|
|
original_request: &hyper::Request<()>,
|
|
|
|
|
body: B,
|
|
|
|
|
use_tls: bool,
|
|
|
|
|
) -> anyhow::Result<hyper::Request<B>> {
|
|
|
|
|
let scheme = if use_tls { "https" } else { "http" };
|
|
|
|
|
let mut req = hyper::Request::builder()
|
|
|
|
|
.method(method)
|
|
|
|
|
.uri(format!("http://{}{}", backend_addr, path))
|
|
|
|
|
.uri(format!("{}://{}{}", scheme, backend_addr, path))
|
|
|
|
|
.header("host", host);
|
|
|
|
|
|
|
|
|
|
// Forward non-pseudo headers
|
|
|
|
|
|