From fb0c0dcc31046d05df3f6b547b20684b95a8ed2a Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 12 Mar 2026 21:41:54 +0000 Subject: [PATCH] fix(rustproxy-http): stabilize upstream HTTP/2 forwarding and fallback behavior --- changelog.md | 9 ++ .../rustproxy-http/src/proxy_service.rs | 148 ++++++++++++++---- ts/00_commitinfo_data.ts | 2 +- 3 files changed, 130 insertions(+), 29 deletions(-) diff --git a/changelog.md b/changelog.md index 01bd690..0304a19 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-03-12 - 25.10.4 - fix(rustproxy-http) +stabilize upstream HTTP/2 forwarding and fallback behavior + +- Remove hop-by-hop headers before forwarding requests to HTTP/2 backends to comply with RFC 9113. +- Use ALPN-enabled TLS configuration whenever HTTP/2 is possible, including explicit H2 connections and retries. +- Add HTTP/2 handshake timeouts, tuned connection settings, and fallback to HTTP/1 when H2 negotiation times out or fails. +- Register pooled HTTP/2 senders only after a successful first request to avoid reusing broken connections. +- Build absolute URIs for HTTP/2 upstream requests so pseudo-headers such as scheme and authority are derived correctly. + ## 2026-03-12 - 25.10.3 - fix(rustproxy-http) include request domain in backend proxy error and protocol detection logs diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 5602147..4f4a10d 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -539,6 +539,15 @@ impl HttpProxyService { } } + // Remove hop-by-hop headers (RFC 9113 ยง8.2.2 forbids connection-specific headers in H2) + upstream_headers.remove("connection"); + upstream_headers.remove("keep-alive"); + upstream_headers.remove("proxy-connection"); + upstream_headers.remove("transfer-encoding"); + upstream_headers.remove("te"); + upstream_headers.remove("trailer"); + upstream_headers.remove("upgrade"); + // Add standard reverse-proxy headers (X-Forwarded-*) { let original_host = host.as_deref().unwrap_or(""); @@ -634,8 +643,8 @@ impl HttpProxyService { // --- Fresh connection path --- self.metrics.backend_pool_miss(&upstream_key); - // Choose TLS config: use ALPN config for auto-detect probe, plain config otherwise - let tls_config = if needs_alpn_probe { + // Choose TLS config: use ALPN config when H2 is possible (auto-detect probe OR explicit H2) + let tls_config = if needs_alpn_probe || use_h2 { &self.backend_tls_config_alpn } else { &self.backend_tls_config @@ -901,17 +910,27 @@ impl HttpProxyService { ) -> Result>, hyper::Error> { let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let exec = hyper_util::rt::TokioExecutor::new(); - // Explicitly type the handshake with BoxBody for uniform pool type + let mut h2_builder = hyper::client::conn::http2::Builder::new(exec); + h2_builder + .keep_alive_interval(std::time::Duration::from_secs(10)) + .keep_alive_timeout(std::time::Duration::from_secs(5)) + .adaptive_window(true) + .initial_stream_window_size(2 * 1024 * 1024); let (sender, conn): ( hyper::client::conn::http2::SendRequest>, hyper::client::conn::http2::Connection, BoxBody, hyper_util::rt::TokioExecutor>, - ) = match hyper::client::conn::http2::handshake(exec, io).await { - Ok(h) => h, - Err(e) => { + ) = match tokio::time::timeout(self.connect_timeout, h2_builder.handshake(io)).await { + Ok(Ok(h)) => h, + Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, "Backend H2 handshake failed"); self.metrics.backend_handshake_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed")); } + Err(_) => { + error!(backend = %backend_key, domain = %domain, "Backend H2 handshake timeout"); + self.metrics.backend_handshake_error(&backend_key); + return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend H2 handshake timeout")); + } }; tokio::spawn(async move { @@ -920,10 +939,13 @@ impl HttpProxyService { } }); - // Register for multiplexed reuse - self.connection_pool.register_h2(pool_key.clone(), sender.clone()); - - self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain).await + // Clone sender for potential pool registration; register only after first request succeeds + let sender_for_pool = sender.clone(); + let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain).await; + if matches!(&result, Ok(ref resp) if resp.status() != StatusCode::BAD_GATEWAY) { + self.connection_pool.register_h2(pool_key.clone(), sender_for_pool); + } + result } /// Forward request using an existing (pooled) HTTP/2 sender. @@ -989,7 +1011,7 @@ impl HttpProxyService { let backend = if pool_key.use_tls { match tokio::time::timeout( self.connect_timeout, - connect_tls_backend(&self.backend_tls_config, &pool_key.host, pool_key.port), + connect_tls_backend(&self.backend_tls_config_alpn, &pool_key.host, pool_key.port), ).await { Ok(Ok(tls)) => BackendStream::Tls(tls), Ok(Err(e)) => { @@ -1028,16 +1050,29 @@ impl HttpProxyService { let io = TokioIo::new(backend); let exec = hyper_util::rt::TokioExecutor::new(); + let mut h2_builder = hyper::client::conn::http2::Builder::new(exec); + h2_builder + .keep_alive_interval(std::time::Duration::from_secs(10)) + .keep_alive_timeout(std::time::Duration::from_secs(5)) + .adaptive_window(true) + .initial_stream_window_size(2 * 1024 * 1024); let (mut sender, conn): ( hyper::client::conn::http2::SendRequest>, hyper::client::conn::http2::Connection, BoxBody, hyper_util::rt::TokioExecutor>, - ) = match hyper::client::conn::http2::handshake(exec, io).await { - Ok(h) => h, - Err(e) => { + ) = match tokio::time::timeout(self.connect_timeout, h2_builder.handshake(io)).await { + Ok(Ok(h)) => h, + Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: handshake failed"); self.metrics.backend_handshake_error(&backend_key); + self.metrics.backend_connection_closed(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 retry handshake failed")); } + Err(_) => { + error!(backend = %backend_key, domain = %domain, "H2 retry: handshake timeout"); + self.metrics.backend_handshake_error(&backend_key); + self.metrics.backend_connection_closed(&backend_key); + return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend H2 retry handshake timeout")); + } }; tokio::spawn(async move { @@ -1046,13 +1081,13 @@ impl HttpProxyService { } }); - // Register fresh sender in pool for future requests - self.connection_pool.register_h2(pool_key.clone(), sender.clone()); - - // Build request with empty body + // Build request with empty body using absolute URI for H2 pseudo-headers + let h2_uri = format!("{}://{}:{}{}", + if pool_key.use_tls { "https" } else { "http" }, + pool_key.host, pool_key.port, upstream_path); let mut upstream_req = Request::builder() .method(method) - .uri(upstream_path); + .uri(&h2_uri); if let Some(headers) = upstream_req.headers_mut() { *headers = upstream_headers; @@ -1065,6 +1100,8 @@ impl HttpProxyService { match sender.send_request(upstream_req).await { Ok(resp) => { + // Register in pool only after request succeeds + self.connection_pool.register_h2(pool_key.clone(), sender); let result = self.build_streaming_response(resp, route, route_id, source_ip).await; // Close the fresh backend connection (opened above) self.metrics.backend_connection_closed(&backend_key); @@ -1073,7 +1110,6 @@ impl HttpProxyService { Err(e) => { error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: request failed"); self.metrics.backend_request_error(&backend_key); - self.connection_pool.remove_h2(pool_key); // Close the fresh backend connection (opened above) self.metrics.backend_connection_closed(&backend_key); Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed on retry")) @@ -1104,13 +1140,58 @@ impl HttpProxyService { domain: &str, ) -> Result>, hyper::Error> { let exec = hyper_util::rt::TokioExecutor::new(); - let handshake_result: Result<( - hyper::client::conn::http2::SendRequest>, - hyper::client::conn::http2::Connection, BoxBody, hyper_util::rt::TokioExecutor>, - ), hyper::Error> = hyper::client::conn::http2::handshake(exec, io).await; + let mut h2_builder = hyper::client::conn::http2::Builder::new(exec); + h2_builder + .keep_alive_interval(std::time::Duration::from_secs(10)) + .keep_alive_timeout(std::time::Duration::from_secs(5)) + .adaptive_window(true) + .initial_stream_window_size(2 * 1024 * 1024); + let handshake_result = tokio::time::timeout( + self.connect_timeout, + h2_builder.handshake(io), + ).await; match handshake_result { - Ok((mut sender, conn)) => { + Err(_) => { + // H2 handshake timed out โ€” fall back to H1 + let bk = format!("{}:{}", upstream.host, upstream.port); + warn!( + backend = %bk, + domain = %domain, + "H2 handshake timeout, falling back to H1" + ); + self.metrics.backend_h2_failure(&bk); + self.metrics.backend_handshake_error(&bk); + + let cache_key = crate::protocol_cache::ProtocolCacheKey { + host: upstream.host.clone(), + port: upstream.port, + requested_host: requested_host.clone(), + }; + self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); + + match self.reconnect_backend(upstream, domain).await { + Some(fallback_backend) => { + let h1_pool_key = crate::connection_pool::PoolKey { + host: upstream.host.clone(), + port: upstream.port, + use_tls: upstream.use_tls, + h2: false, + }; + let fallback_io = TokioIo::new(fallback_backend); + let result = self.forward_h1( + fallback_io, parts, body, upstream_headers, upstream_path, + upstream, route, route_id, source_ip, &h1_pool_key, domain, + ).await; + self.metrics.backend_connection_closed(&bk); + result + } + None => { + Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 timeout fallback")) + } + } + } + Ok(Ok((mut sender, conn))) => { tokio::spawn(async move { if let Err(e) = conn.await { debug!("HTTP/2 upstream connection error: {}", e); @@ -1127,9 +1208,12 @@ impl HttpProxyService { // Build and send the h2 request inline (don't register in pool yet โ€” // we need to verify the request actually succeeds first, because some // backends advertise h2 via ALPN but don't speak the h2 binary protocol). + let h2_uri = format!("{}://{}:{}{}", + if upstream.use_tls { "https" } else { "http" }, + upstream.host, upstream.port, upstream_path); let mut upstream_req = Request::builder() .method(parts.method) - .uri(upstream_path); + .uri(&h2_uri); if let Some(headers) = upstream_req.headers_mut() { *headers = upstream_headers; @@ -1198,7 +1282,7 @@ impl HttpProxyService { } } } - Err(e) => { + Ok(Err(e)) => { // H2 handshake truly failed โ€” fall back to H1 // Body is NOT consumed yet, so we can retry the full request. let bk = format!("{}:{}", upstream.host, upstream.port); @@ -1376,9 +1460,17 @@ impl HttpProxyService { pool_key: Option<&crate::connection_pool::PoolKey>, domain: &str, ) -> Result>, hyper::Error> { + // Build absolute URI for H2 pseudo-headers (:scheme, :authority) + let h2_uri = if let Some(pk) = pool_key { + format!("{}://{}:{}{}", + if pk.use_tls { "https" } else { "http" }, + pk.host, pk.port, upstream_path) + } else { + upstream_path.to_string() + }; let mut upstream_req = Request::builder() .method(parts.method) - .uri(upstream_path); + .uri(&h2_uri); if let Some(headers) = upstream_req.headers_mut() { *headers = upstream_headers; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index f3a1ff4..fb505f7 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.10.3', + version: '25.10.4', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }