diff --git a/changelog.md b/changelog.md index 2d4157a..c36ae2f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-17 - 25.11.24 - fix(rustproxy-http) +improve async static file serving, websocket handshake buffering, and shared metric metadata handling + +- convert static file serving to async filesystem operations and await directory/file checks +- preserve and forward bytes read past the WebSocket handshake header terminator to avoid dropping buffered upstream data +- reuse Arc values for route and source identifiers across counting bodies and metric reporting +- standardize backend key propagation across H1/H2 forwarding, retry, and fallback paths for consistent logging and metrics + ## 2026-03-17 - 25.11.23 - fix(rustproxy-http,rustproxy-metrics) reduce per-frame metrics overhead by batching body byte accounting diff --git a/rust/crates/rustproxy-http/src/counting_body.rs b/rust/crates/rustproxy-http/src/counting_body.rs index d3aee19..23b82b8 100644 --- a/rust/crates/rustproxy-http/src/counting_body.rs +++ b/rust/crates/rustproxy-http/src/counting_body.rs @@ -25,8 +25,8 @@ const BYTE_FLUSH_THRESHOLD: u64 = 65_536; pub struct CountingBody { inner: Pin>, metrics: Arc, - route_id: Option, - source_ip: Option, + route_id: Option>, + source_ip: Option>, /// Whether we count bytes as "in" (request body) or "out" (response body). direction: Direction, /// Accumulated bytes not yet flushed to the metrics collector. @@ -56,8 +56,8 @@ impl CountingBody { pub fn new( inner: B, metrics: Arc, - route_id: Option, - source_ip: Option, + route_id: Option>, + source_ip: Option>, direction: Direction, ) -> Self { Self { diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 6e733c4..ec3d1af 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -502,7 +502,7 @@ impl HttpProxyService { // Check for static file serving if let Some(ref advanced) = route_match.route.action.advanced { if let Some(ref static_files) = advanced.static_files { - return Ok(Self::serve_static_file(&path, static_files)); + return Ok(Self::serve_static_file(&path, static_files).await); } } @@ -615,11 +615,10 @@ impl HttpProxyService { }; // X-Forwarded-For: append client IP to existing chain - let client_ip = peer_addr.ip().to_string(); let xff_value = if let Some(existing) = upstream_headers.get("x-forwarded-for") { - format!("{}, {}", existing.to_str().unwrap_or(""), client_ip) + format!("{}, {}", existing.to_str().unwrap_or(""), ip_str) } else { - client_ip + ip_str.clone() }; if let Ok(val) = hyper::header::HeaderValue::from_str(&xff_value) { upstream_headers.insert( @@ -691,7 +690,7 @@ impl HttpProxyService { self.metrics.set_backend_protocol(&upstream_key, "h2"); let result = self.forward_h2_pooled( sender, parts, body, upstream_headers, &upstream_path, - route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity, + route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); return result; @@ -844,19 +843,19 @@ impl HttpProxyService { self.forward_h2_with_fallback( io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str, &final_pool_key, - host.clone(), domain_str, &conn_activity, + host.clone(), domain_str, &conn_activity, &upstream_key, ).await } else { // Explicit H2 mode: hard-fail on handshake error (preserved behavior) self.forward_h2( io, parts, body, upstream_headers, &upstream_path, - &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, + &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, &upstream_key, ).await } } else { self.forward_h1( io, parts, body, upstream_headers, &upstream_path, - &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, + &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, &upstream_key, ).await }; self.upstream_selector.connection_ended(&upstream_key); @@ -880,15 +879,14 @@ impl HttpProxyService { pool_key: &crate::connection_pool::PoolKey, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { - let backend_key = format!("{}:{}", pool_key.host, pool_key.port); - // Try pooled H1 connection first — avoids TCP+TLS handshake if let Some(pooled_sender) = self.connection_pool.checkout_h1(pool_key) { - self.metrics.backend_pool_hit(&backend_key); + self.metrics.backend_pool_hit(backend_key); return self.forward_h1_with_sender( pooled_sender, parts, body, upstream_headers, upstream_path, - route, route_id, source_ip, pool_key, domain, conn_activity, + route, route_id, source_ip, domain, conn_activity, backend_key, ).await; } @@ -900,7 +898,7 @@ impl HttpProxyService { Ok(h) => h, Err(e) => { error!(backend = %backend_key, domain = %domain, error = %e, "Backend H1 handshake failed"); - self.metrics.backend_handshake_error(&backend_key); + self.metrics.backend_handshake_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed")); } }; @@ -911,7 +909,7 @@ impl HttpProxyService { } }); - self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, pool_key, domain, conn_activity).await + self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, domain, conn_activity, backend_key).await } /// Common H1 forwarding logic used by both fresh and pooled paths. @@ -925,9 +923,9 @@ impl HttpProxyService { route: &rustproxy_config::RouteConfig, route_id: Option<&str>, source_ip: &str, - pool_key: &crate::connection_pool::PoolKey, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { // Always use HTTP/1.1 for h1 backend connections (h2 incoming requests have version HTTP/2.0) let mut upstream_req = Request::builder() @@ -939,12 +937,16 @@ impl HttpProxyService { *headers = upstream_headers; } + // Compute Arc once for both request and response CountingBody + let rid: Option> = route_id.map(Arc::from); + let sip: Arc = Arc::from(source_ip); + // Wrap the request body in CountingBody then box it for the uniform pool type let counting_req_body = CountingBody::new( body, Arc::clone(&self.metrics), - route_id.map(|s| s.to_string()), - Some(source_ip.to_string()), + rid.clone(), + Some(Arc::clone(&sip)), Direction::In, ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let boxed_body: BoxBody = BoxBody::new(counting_req_body); @@ -954,9 +956,8 @@ impl HttpProxyService { let upstream_response = match sender.send_request(upstream_req).await { Ok(resp) => resp, Err(e) => { - let bk = format!("{}:{}", pool_key.host, pool_key.port); - error!(backend = %bk, domain = %domain, error = %e, "Backend H1 request failed"); - self.metrics.backend_request_error(&bk); + error!(backend = %backend_key, domain = %domain, error = %e, "Backend H1 request failed"); + self.metrics.backend_request_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed")); } }; @@ -971,7 +972,7 @@ impl HttpProxyService { // of large streaming responses (e.g. 352MB Docker layers) takes priority. drop(sender); - self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await + self.build_streaming_response(upstream_response, route, rid, sip, conn_activity).await } /// Forward request to backend via HTTP/2 with body streaming (fresh connection). @@ -990,8 +991,8 @@ impl HttpProxyService { pool_key: &crate::connection_pool::PoolKey, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { - let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let exec = hyper_util::rt::TokioExecutor::new(); let mut h2_builder = hyper::client::conn::http2::Builder::new(exec); h2_builder @@ -1007,12 +1008,12 @@ impl HttpProxyService { Ok(Ok(h)) => h, Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "Backend H2 handshake failed"); - self.metrics.backend_handshake_error(&backend_key); + self.metrics.backend_handshake_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed")); } Err(_) => { error!(backend = %backend_key, domain = %domain, "Backend H2 handshake timeout"); - self.metrics.backend_handshake_error(&backend_key); + self.metrics.backend_handshake_error(backend_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend H2 handshake timeout")); } }; @@ -1039,7 +1040,7 @@ impl HttpProxyService { } let sender_for_pool = sender.clone(); - let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity).await; + let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity, backend_key).await; if matches!(&result, Ok(ref resp) if resp.status() != StatusCode::BAD_GATEWAY) { let g = self.connection_pool.register_h2(pool_key.clone(), sender_for_pool); gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); @@ -1063,6 +1064,7 @@ impl HttpProxyService { pool_key: &crate::connection_pool::PoolKey, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { // Save retry state for bodyless requests (cheap: Method is an enum, HeaderMap clones Arc-backed Bytes) let retry_state = if body.is_end_stream() { @@ -1073,18 +1075,18 @@ impl HttpProxyService { let result = self.forward_h2_with_sender( sender, parts, body, upstream_headers, upstream_path, - route, route_id, source_ip, Some(pool_key), domain, conn_activity, + route, route_id, source_ip, Some(pool_key), domain, conn_activity, backend_key, ).await; // If the request failed (502) and we can retry with an empty body, do so let is_502 = matches!(&result, Ok(resp) if resp.status() == StatusCode::BAD_GATEWAY); if is_502 { if let Some((method, headers)) = retry_state { - warn!(backend = %format!("{}:{}", pool_key.host, pool_key.port), domain = %domain, + warn!(backend = %backend_key, domain = %domain, "Stale pooled H2 sender, retrying with fresh connection"); return self.retry_h2_with_fresh_connection( method, headers, upstream_path, - pool_key, route, route_id, source_ip, domain, conn_activity, + pool_key, route, route_id, source_ip, domain, conn_activity, backend_key, ).await; } } @@ -1104,8 +1106,8 @@ impl HttpProxyService { source_ip: &str, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { - let backend_key = format!("{}:{}", pool_key.host, pool_key.port); // Establish fresh backend connection let retry_connect_start = std::time::Instant::now(); @@ -1117,12 +1119,12 @@ impl HttpProxyService { Ok(Ok(tls)) => BackendStream::Tls(tls), Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: TLS connect failed"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable on H2 retry")); } Err(_) => { error!(backend = %backend_key, domain = %domain, "H2 retry: TLS connect timeout"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend timeout on H2 retry")); } } @@ -1137,17 +1139,17 @@ impl HttpProxyService { } Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: TCP connect failed"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable on H2 retry")); } Err(_) => { error!(backend = %backend_key, domain = %domain, "H2 retry: TCP connect timeout"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend timeout on H2 retry")); } } }; - self.metrics.backend_connection_opened(&backend_key, retry_connect_start.elapsed()); + self.metrics.backend_connection_opened(backend_key, retry_connect_start.elapsed()); let io = TokioIo::new(backend); let exec = hyper_util::rt::TokioExecutor::new(); @@ -1165,14 +1167,14 @@ impl HttpProxyService { Ok(Ok(h)) => h, Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "H2 retry: handshake failed"); - self.metrics.backend_handshake_error(&backend_key); - self.metrics.backend_connection_closed(&backend_key); + self.metrics.backend_handshake_error(backend_key); + self.metrics.backend_connection_closed(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 retry handshake failed")); } Err(_) => { error!(backend = %backend_key, domain = %domain, "H2 retry: handshake timeout"); - self.metrics.backend_handshake_error(&backend_key); - self.metrics.backend_connection_closed(&backend_key); + self.metrics.backend_handshake_error(backend_key); + self.metrics.backend_connection_closed(backend_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend H2 retry handshake timeout")); } }; @@ -1220,16 +1222,16 @@ impl HttpProxyService { // Register in pool only after request succeeds let g = self.connection_pool.register_h2(pool_key.clone(), sender); gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); - let result = self.build_streaming_response(resp, route, route_id, source_ip, conn_activity).await; + let result = self.build_streaming_response(resp, route, route_id.map(Arc::from), Arc::from(source_ip), conn_activity).await; // Close the fresh backend connection (opened above) - self.metrics.backend_connection_closed(&backend_key); + self.metrics.backend_connection_closed(backend_key); result } Err(e) => { error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: request failed"); - self.metrics.backend_request_error(&backend_key); + self.metrics.backend_request_error(backend_key); // Close the fresh backend connection (opened above) - self.metrics.backend_connection_closed(&backend_key); + self.metrics.backend_connection_closed(backend_key); Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed on retry")) } } @@ -1257,6 +1259,7 @@ impl HttpProxyService { requested_host: Option, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { let exec = hyper_util::rt::TokioExecutor::new(); let mut h2_builder = hyper::client::conn::http2::Builder::new(exec); @@ -1274,14 +1277,13 @@ impl HttpProxyService { match handshake_result { Err(_) => { // H2 handshake timed out — fall back to H1 - let bk = format!("{}:{}", upstream.host, upstream.port); warn!( - backend = %bk, + backend = %backend_key, domain = %domain, "H2 handshake timeout, falling back to H1" ); - self.metrics.backend_h2_failure(&bk); - self.metrics.backend_handshake_error(&bk); + self.metrics.backend_h2_failure(backend_key); + self.metrics.backend_handshake_error(backend_key); let cache_key = crate::protocol_cache::ProtocolCacheKey { host: upstream.host.clone(), @@ -1290,7 +1292,7 @@ impl HttpProxyService { }; self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); - match self.reconnect_backend(upstream, domain).await { + match self.reconnect_backend(upstream, domain, backend_key).await { Some(fallback_backend) => { let h1_pool_key = crate::connection_pool::PoolKey { host: upstream.host.clone(), @@ -1301,9 +1303,9 @@ impl HttpProxyService { let fallback_io = TokioIo::new(fallback_backend); let result = self.forward_h1( fallback_io, parts, body, upstream_headers, upstream_path, - upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, + upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, backend_key, ).await; - self.metrics.backend_connection_closed(&bk); + self.metrics.backend_connection_closed(backend_key); result } None => { @@ -1354,11 +1356,13 @@ impl HttpProxyService { *headers = upstream_headers; } + let rid: Option> = route_id.map(Arc::from); + let sip: Arc = Arc::from(source_ip); let counting_req_body = CountingBody::new( body, Arc::clone(&self.metrics), - route_id.map(|s| s.to_string()), - Some(source_ip.to_string()), + rid.clone(), + Some(Arc::clone(&sip)), Direction::In, ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let boxed_body: BoxBody = BoxBody::new(counting_req_body); @@ -1368,40 +1372,33 @@ impl HttpProxyService { Ok(upstream_response) => { let g = self.connection_pool.register_h2(pool_key.clone(), sender); gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); - self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await + self.build_streaming_response(upstream_response, route, rid, sip, conn_activity).await } Err(e) => { // H2 request failed on a stream level (e.g. RST_STREAM PROTOCOL_ERROR). // The H2 handshake succeeded, so the backend genuinely speaks H2 — don't // poison the protocol cache. Only handshake-level failures (below) should // downgrade the cache to H1. - let bk = format!("{}:{}", upstream.host, upstream.port); debug!( - backend = %bk, + backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "H2 stream error, retrying this request as H1" ); - self.metrics.backend_h2_failure(&bk); + self.metrics.backend_h2_failure(backend_key); // Retry as H1 for bodyless requests; return 502 for requests with bodies if let Some((method, headers)) = retry_state { - match self.reconnect_backend(upstream, domain).await { + match self.reconnect_backend(upstream, domain, backend_key).await { Some(fallback_backend) => { - let h1_pool_key = crate::connection_pool::PoolKey { - host: upstream.host.clone(), - port: upstream.port, - use_tls: upstream.use_tls, - h2: false, - }; let fallback_io = TokioIo::new(fallback_backend); let result = self.forward_h1_empty_body( fallback_io, method, headers, upstream_path, - route, route_id, source_ip, &h1_pool_key, domain, conn_activity, + route, route_id, source_ip, domain, conn_activity, backend_key, ).await; // Close the reconnected backend connection (opened in reconnect_backend) - self.metrics.backend_connection_closed(&bk); + self.metrics.backend_connection_closed(backend_key); result } None => { @@ -1417,15 +1414,14 @@ impl HttpProxyService { Ok(Err(e)) => { // H2 handshake truly failed — fall back to H1 // Body is NOT consumed yet, so we can retry the full request. - let bk = format!("{}:{}", upstream.host, upstream.port); warn!( - backend = %bk, + backend = %backend_key, domain = %domain, error = %e, "H2 handshake failed, falling back to H1" ); - self.metrics.backend_h2_failure(&bk); - self.metrics.backend_handshake_error(&bk); + self.metrics.backend_h2_failure(backend_key); + self.metrics.backend_handshake_error(backend_key); // Update cache to H1 so subsequent requests skip H2 let cache_key = crate::protocol_cache::ProtocolCacheKey { @@ -1436,7 +1432,7 @@ impl HttpProxyService { self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); // Reconnect for H1 (the original io was consumed by the failed h2 handshake) - match self.reconnect_backend(upstream, domain).await { + match self.reconnect_backend(upstream, domain, backend_key).await { Some(fallback_backend) => { let h1_pool_key = crate::connection_pool::PoolKey { host: upstream.host.clone(), @@ -1447,10 +1443,10 @@ impl HttpProxyService { let fallback_io = TokioIo::new(fallback_backend); let result = self.forward_h1( fallback_io, parts, body, upstream_headers, upstream_path, - upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, + upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, backend_key, ).await; // Close the reconnected backend connection (opened in reconnect_backend) - self.metrics.backend_connection_closed(&bk); + self.metrics.backend_connection_closed(backend_key); result } None => { @@ -1472,11 +1468,10 @@ impl HttpProxyService { route: &rustproxy_config::RouteConfig, route_id: Option<&str>, source_ip: &str, - pool_key: &crate::connection_pool::PoolKey, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { - let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let (mut sender, conn): ( hyper::client::conn::http1::SendRequest>, hyper::client::conn::http1::Connection, BoxBody>, @@ -1484,7 +1479,7 @@ impl HttpProxyService { Ok(h) => h, Err(e) => { error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: handshake failed"); - self.metrics.backend_handshake_error(&backend_key); + self.metrics.backend_handshake_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H1 fallback handshake failed")); } }; @@ -1513,7 +1508,7 @@ impl HttpProxyService { Ok(resp) => resp, Err(e) => { error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: request failed"); - self.metrics.backend_request_error(&backend_key); + self.metrics.backend_request_error(backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H1 fallback request failed")); } }; @@ -1521,7 +1516,7 @@ impl HttpProxyService { // Don't pool the sender while response body is still streaming (same safety as forward_h1_with_sender) drop(sender); - self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await + self.build_streaming_response(upstream_response, route, route_id.map(Arc::from), Arc::from(source_ip), conn_activity).await } /// Reconnect to a backend (used for H2→H1 fallback). @@ -1529,8 +1524,8 @@ impl HttpProxyService { &self, upstream: &crate::upstream_selector::UpstreamSelection, domain: &str, + backend_key: &str, ) -> Option { - let backend_key = format!("{}:{}", upstream.host, upstream.port); let reconnect_start = std::time::Instant::now(); if upstream.use_tls { match tokio::time::timeout( @@ -1538,17 +1533,17 @@ impl HttpProxyService { connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port), ).await { Ok(Ok(tls)) => { - self.metrics.backend_connection_opened(&backend_key, reconnect_start.elapsed()); + self.metrics.backend_connection_opened(backend_key, reconnect_start.elapsed()); Some(BackendStream::Tls(tls)) } Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: TLS reconnect failed"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); None } Err(_) => { error!(backend = %backend_key, domain = %domain, "H1 fallback: TLS reconnect timeout"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); None } } @@ -1562,17 +1557,17 @@ impl HttpProxyService { let _ = socket2::SockRef::from(&s).set_tcp_keepalive( &socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60)) ); - self.metrics.backend_connection_opened(&backend_key, reconnect_start.elapsed()); + self.metrics.backend_connection_opened(backend_key, reconnect_start.elapsed()); Some(BackendStream::Plain(s)) } Ok(Err(e)) => { error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: TCP reconnect failed"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); None } Err(_) => { error!(backend = %backend_key, domain = %domain, "H1 fallback: TCP reconnect timeout"); - self.metrics.backend_connect_error(&backend_key); + self.metrics.backend_connect_error(backend_key); None } } @@ -1593,6 +1588,7 @@ impl HttpProxyService { pool_key: Option<&crate::connection_pool::PoolKey>, domain: &str, conn_activity: &ConnActivity, + backend_key: &str, ) -> Result>, hyper::Error> { // Build absolute URI for H2 pseudo-headers (:scheme, :authority) // Use the requested domain as authority (not backend address) so :authority matches Host header @@ -1614,12 +1610,16 @@ impl HttpProxyService { *headers = upstream_headers; } + // Compute Arc once for both request and response CountingBody + let rid: Option> = route_id.map(Arc::from); + let sip: Arc = Arc::from(source_ip); + // Wrap the request body in CountingBody then box it for the uniform pool type let counting_req_body = CountingBody::new( body, Arc::clone(&self.metrics), - route_id.map(|s| s.to_string()), - Some(source_ip.to_string()), + rid.clone(), + Some(Arc::clone(&sip)), Direction::In, ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let boxed_body: BoxBody = BoxBody::new(counting_req_body); @@ -1631,9 +1631,8 @@ impl HttpProxyService { Err(e) => { // Evict the dead sender so subsequent requests get fresh connections if let Some(key) = pool_key { - let bk = format!("{}:{}", key.host, key.port); - error!(backend = %bk, domain = %domain, error = %e, error_debug = ?e, "Backend H2 request failed"); - self.metrics.backend_request_error(&bk); + error!(backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "Backend H2 request failed"); + self.metrics.backend_request_error(backend_key); self.connection_pool.remove_h2(key); } else { error!(domain = %domain, error = %e, error_debug = ?e, "Backend H2 request failed"); @@ -1642,7 +1641,7 @@ impl HttpProxyService { } }; - self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await + self.build_streaming_response(upstream_response, route, rid, sip, conn_activity).await } /// Build the client-facing response from an upstream response, streaming the body. @@ -1653,8 +1652,8 @@ impl HttpProxyService { &self, upstream_response: Response, route: &rustproxy_config::RouteConfig, - route_id: Option<&str>, - source_ip: &str, + route_id: Option>, + source_ip: Arc, conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let (resp_parts, resp_body) = upstream_response.into_parts(); @@ -1686,8 +1685,8 @@ impl HttpProxyService { let counting_body = CountingBody::new( resp_body, Arc::clone(&self.metrics), - route_id.map(|s| s.to_string()), - Some(source_ip.to_string()), + route_id, + Some(source_ip), Direction::Out, ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); @@ -1906,21 +1905,26 @@ impl HttpProxyService { } let mut response_buf = Vec::with_capacity(4096); - let mut temp = [0u8; 1]; + let mut read_buf = [0u8; 4096]; + let extra_bytes: Vec; loop { - match upstream_stream.read(&mut temp).await { + match upstream_stream.read(&mut read_buf).await { Ok(0) => { error!("WebSocket: upstream closed before completing handshake"); self.upstream_selector.connection_ended(upstream_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed")); } - Ok(_) => { - response_buf.push(temp[0]); - if response_buf.len() >= 4 { - let len = response_buf.len(); - if response_buf[len-4..] == *b"\r\n\r\n" { - break; - } + Ok(n) => { + let prev_len = response_buf.len(); + response_buf.extend_from_slice(&read_buf[..n]); + // Scan for \r\n\r\n, backing up 3 bytes to handle split across reads + let search_start = prev_len.saturating_sub(3); + if let Some(pos) = response_buf[search_start..].windows(4) + .position(|w| w == b"\r\n\r\n") + { + let header_end = search_start + pos + 4; + extra_bytes = response_buf.split_off(header_end); + break; } if response_buf.len() > 8192 { error!("WebSocket: upstream response headers too large"); @@ -1995,8 +1999,8 @@ impl HttpProxyService { ); let metrics = Arc::clone(&self.metrics); - let route_id_owned = route_id.map(|s| s.to_string()); - let source_ip_owned = source_ip.to_string(); + let route_id_owned: Option> = route_id.map(Arc::from); + let source_ip_owned: Arc = Arc::from(source_ip); let upstream_selector = self.upstream_selector.clone(); let upstream_key_owned = upstream_key.to_string(); let ws_inactivity_timeout = self.ws_inactivity_timeout; @@ -2050,7 +2054,7 @@ impl HttpProxyService { break; } total += n as u64; - metrics_c2u.record_bytes(n as u64, 0, route_c2u.as_deref(), Some(&ip_c2u)); + metrics_c2u.record_bytes(n as u64, 0, route_c2u.as_deref(), Some(&*ip_c2u)); la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); if let Some((ref ca, ca_start)) = conn_act_c2u { ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); @@ -2072,6 +2076,23 @@ impl HttpProxyService { let u2c = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; + // Forward any bytes buffered past the HTTP header terminator during handshake + if !extra_bytes.is_empty() { + let n = extra_bytes.len(); + if cw.write_all(&extra_bytes).await.is_err() { + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + cw.shutdown(), + ).await; + return 0u64; + } + total += n as u64; + metrics_u2c.record_bytes(0, n as u64, route_u2c.as_deref(), Some(&*ip_u2c)); + la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); + if let Some((ref ca, ca_start)) = conn_act_u2c { + ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); + } + } loop { let n = tokio::select! { result = ur.read(&mut buf) => match result { @@ -2084,7 +2105,7 @@ impl HttpProxyService { break; } total += n as u64; - metrics_u2c.record_bytes(0, n as u64, route_u2c.as_deref(), Some(&ip_u2c)); + metrics_u2c.record_bytes(0, n as u64, route_u2c.as_deref(), Some(&*ip_u2c)); la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); if let Some((ref ca, ca_start)) = conn_act_u2c { ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); @@ -2224,13 +2245,13 @@ impl HttpProxyService { } /// Serve a static file from the configured directory. - fn serve_static_file( + async fn serve_static_file( path: &str, config: &rustproxy_config::RouteStaticFiles, ) -> Response> { - use std::path::Path; + use std::path::PathBuf; - let root = Path::new(&config.root); + let root = PathBuf::from(&config.root); // Sanitize path to prevent directory traversal let clean_path = path.trim_start_matches('/'); @@ -2239,7 +2260,12 @@ impl HttpProxyService { let mut file_path = root.join(&clean_path); // If path points to a directory, try index files - if file_path.is_dir() || clean_path.is_empty() { + let is_dir = if clean_path.is_empty() { + true + } else { + tokio::fs::metadata(&file_path).await.map(|m| m.is_dir()).unwrap_or(false) + }; + if is_dir { let index_files = config.index_files.as_deref() .or(config.index.as_deref()) .unwrap_or(&[]); @@ -2253,7 +2279,7 @@ impl HttpProxyService { } else { file_path.join(index) }; - if candidate.is_file() { + if tokio::fs::metadata(&candidate).await.map(|m| m.is_file()).unwrap_or(false) { file_path = candidate; found = true; break; @@ -2265,11 +2291,11 @@ impl HttpProxyService { } // Ensure the resolved path is within the root (prevent traversal) - let canonical_root = match root.canonicalize() { + let canonical_root = match tokio::fs::canonicalize(&root).await { Ok(p) => p, Err(_) => return error_response(StatusCode::NOT_FOUND, "Not found"), }; - let canonical_file = match file_path.canonicalize() { + let canonical_file = match tokio::fs::canonicalize(&file_path).await { Ok(p) => p, Err(_) => return error_response(StatusCode::NOT_FOUND, "Not found"), }; @@ -2283,7 +2309,7 @@ impl HttpProxyService { } // Read the file - match std::fs::read(&file_path) { + match tokio::fs::read(&file_path).await { Ok(content) => { let content_type = guess_content_type(&file_path); let mut response = Response::builder() diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 1665845..333e702 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.11.23', + version: '25.11.24', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }