diff --git a/changelog.md b/changelog.md index 33362e8..0fc68c3 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-15 - 25.11.1 - fix(rustproxy-http) +keep connection idle tracking alive during streaming and tune HTTP/2 connection lifetimes + +- Propagate connection activity tracking through HTTP/1, HTTP/2, and WebSocket forwarding so active request and response body streams do not trigger the idle watchdog. +- Update CountingBody to refresh connection activity timestamps while data frames are polled during uploads and downloads. +- Increase pooled HTTP/2 max age and set explicit HTTP/2 connection window sizes to improve long-lived streaming behavior. + ## 2026-03-15 - 25.11.0 - feat(rustproxy-http) add HTTP/2 Extended CONNECT WebSocket proxy support diff --git a/rust/crates/rustproxy-http/src/connection_pool.rs b/rust/crates/rustproxy-http/src/connection_pool.rs index 0f466ba..cb47ff4 100644 --- a/rust/crates/rustproxy-http/src/connection_pool.rs +++ b/rust/crates/rustproxy-http/src/connection_pool.rs @@ -20,7 +20,7 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(90); const EVICTION_INTERVAL: Duration = Duration::from_secs(30); /// Maximum age for pooled HTTP/2 connections before proactive eviction. /// Prevents staleness from backends that close idle connections (e.g. nginx GOAWAY). -const MAX_H2_AGE: Duration = Duration::from_secs(120); +const MAX_H2_AGE: Duration = Duration::from_secs(300); /// Identifies a unique backend endpoint. #[derive(Clone, Debug, Hash, Eq, PartialEq)] diff --git a/rust/crates/rustproxy-http/src/counting_body.rs b/rust/crates/rustproxy-http/src/counting_body.rs index 5d3fb87..8882289 100644 --- a/rust/crates/rustproxy-http/src/counting_body.rs +++ b/rust/crates/rustproxy-http/src/counting_body.rs @@ -25,6 +25,11 @@ pub struct CountingBody { direction: Direction, /// Whether we've already reported the bytes (to avoid double-reporting on drop). reported: bool, + /// Optional connection-level activity tracker. When set, poll_frame updates this + /// to keep the idle watchdog alive during active body streaming (uploads/downloads). + connection_activity: Option>, + /// Start instant for computing elapsed ms for connection_activity. + activity_start: Option, } /// Which direction the bytes flow. @@ -53,9 +58,20 @@ impl CountingBody { source_ip, direction, reported: false, + connection_activity: None, + activity_start: None, } } + /// Set the connection-level activity tracker. When set, each data frame + /// updates this timestamp to prevent the idle watchdog from killing the + /// connection during active body streaming. + pub fn with_connection_activity(mut self, activity: Arc, start: std::time::Instant) -> Self { + self.connection_activity = Some(activity); + self.activity_start = Some(start); + self + } + /// Report accumulated bytes to the metrics collector. fn report(&mut self) { if self.reported { @@ -103,6 +119,10 @@ where Poll::Ready(Some(Ok(frame))) => { if let Some(data) = frame.data_ref() { this.counted_bytes.fetch_add(data.len() as u64, Ordering::Relaxed); + // Keep the connection-level idle watchdog alive during body streaming + if let (Some(activity), Some(start)) = (&this.connection_activity, &this.activity_start) { + activity.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); + } } Poll::Ready(Some(Ok(frame))) } diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index cda6fca..7b9c236 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -33,6 +33,14 @@ use crate::request_filter::RequestFilter; use crate::response_filter::ResponseFilter; use crate::upstream_selector::UpstreamSelector; +/// Per-connection context for keeping the idle watchdog alive during body streaming. +/// Passed through the forwarding chain so CountingBody can update the timestamp. +#[derive(Clone)] +struct ConnActivity { + last_activity: Arc, + start: std::time::Instant, +} + /// Default upstream connect timeout (30 seconds). const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); @@ -294,8 +302,9 @@ impl HttpProxyService { let cn = cancel_inner.clone(); let la = Arc::clone(&la_inner); let st = start; + let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start }; async move { - let result = svc.handle_request(req, peer, port, cn).await; + let result = svc.handle_request(req, peer, port, cn, ca).await; // Mark request end — update activity timestamp before guard drops la.store(st.elapsed().as_millis() as u64, Ordering::Relaxed); drop(req_guard); // Explicitly drop to decrement active_requests @@ -306,8 +315,11 @@ impl HttpProxyService { // Auto-detect h1 vs h2 based on ALPN / connection preface. // serve_connection_with_upgrades supports h1 Upgrade (WebSocket) and h2 Extended CONNECT (RFC 8441). let mut builder = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); - // Advertise Extended CONNECT support so H2 clients can initiate WebSocket connections - builder.http2().enable_connect_protocol(); + // Configure H2 server settings: Extended CONNECT for WebSocket + flow control tuning + builder.http2() + .enable_connect_protocol() + .initial_stream_window_size(2 * 1024 * 1024) // 2MB per stream (vs default 64KB) + .initial_connection_window_size(8 * 1024 * 1024); // 8MB per client connection let conn = builder.serve_connection_with_upgrades(io, service); // Pin on the heap — auto::UpgradeableConnection is !Unpin let mut conn = Box::pin(conn); @@ -367,6 +379,7 @@ impl HttpProxyService { peer_addr: std::net::SocketAddr, port: u16, cancel: CancellationToken, + conn_activity: ConnActivity, ) -> Result>, hyper::Error> { let host = req.headers() .get("host") @@ -500,6 +513,7 @@ impl HttpProxyService { if is_h1_websocket || is_h2_websocket { let result = self.handle_websocket_upgrade( req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, &ip_str, is_h2_websocket, + if is_h2_websocket { Some(conn_activity.clone()) } else { None }, ).await; // Note: for WebSocket, connection_ended is called inside // the spawned tunnel task when the connection closes. @@ -640,7 +654,7 @@ impl HttpProxyService { self.metrics.set_backend_protocol(&upstream_key, "h2"); let result = self.forward_h2_pooled( sender, parts, body, upstream_headers, &upstream_path, - route_match.route, route_id, &ip_str, &pool_key, domain_str, + route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity, ).await; self.upstream_selector.connection_ended(&upstream_key); return result; @@ -779,19 +793,19 @@ impl HttpProxyService { self.forward_h2_with_fallback( io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str, &final_pool_key, - host.clone(), domain_str, + host.clone(), domain_str, &conn_activity, ).await } else { // Explicit H2 mode: hard-fail on handshake error (preserved behavior) self.forward_h2( io, parts, body, upstream_headers, &upstream_path, - &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, + &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, ).await } } else { self.forward_h1( io, parts, body, upstream_headers, &upstream_path, - &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, + &upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, ).await }; self.upstream_selector.connection_ended(&upstream_key); @@ -814,6 +828,7 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let backend_key = format!("{}:{}", pool_key.host, pool_key.port); @@ -822,7 +837,7 @@ impl HttpProxyService { self.metrics.backend_pool_hit(&backend_key); return self.forward_h1_with_sender( pooled_sender, parts, body, upstream_headers, upstream_path, - route, route_id, source_ip, pool_key, domain, + route, route_id, source_ip, pool_key, domain, conn_activity, ).await; } @@ -845,7 +860,7 @@ impl HttpProxyService { } }); - self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, pool_key, domain).await + self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, pool_key, domain, conn_activity).await } /// Common H1 forwarding logic used by both fresh and pooled paths. @@ -861,6 +876,7 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { // Always use HTTP/1.1 for h1 backend connections (h2 incoming requests have version HTTP/2.0) let mut upstream_req = Request::builder() @@ -879,7 +895,7 @@ impl HttpProxyService { route_id.map(|s| s.to_string()), Some(source_ip.to_string()), Direction::In, - ); + ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let boxed_body: BoxBody = BoxBody::new(counting_req_body); let upstream_req = upstream_req.body(boxed_body).unwrap(); @@ -897,7 +913,7 @@ impl HttpProxyService { // Return sender to pool (body streams lazily, sender is reusable once response head is received) self.connection_pool.checkin_h1(pool_key.clone(), sender); - self.build_streaming_response(upstream_response, route, route_id, source_ip).await + self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await } /// Forward request to backend via HTTP/2 with body streaming (fresh connection). @@ -915,6 +931,7 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let exec = hyper_util::rt::TokioExecutor::new(); @@ -923,8 +940,8 @@ impl HttpProxyService { .timer(hyper_util::rt::TokioTimer::new()) .keep_alive_interval(std::time::Duration::from_secs(10)) .keep_alive_timeout(std::time::Duration::from_secs(5)) - .adaptive_window(true) - .initial_stream_window_size(2 * 1024 * 1024); + .initial_stream_window_size(2 * 1024 * 1024) + .initial_connection_window_size(16 * 1024 * 1024); let (sender, conn): ( hyper::client::conn::http2::SendRequest>, hyper::client::conn::http2::Connection, BoxBody, hyper_util::rt::TokioExecutor>, @@ -950,7 +967,7 @@ impl HttpProxyService { // Clone sender for potential pool registration; register only after first request succeeds let sender_for_pool = sender.clone(); - let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain).await; + let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity).await; if matches!(&result, Ok(ref resp) if resp.status() != StatusCode::BAD_GATEWAY) { self.connection_pool.register_h2(pool_key.clone(), sender_for_pool); } @@ -972,6 +989,7 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { // Save retry state for bodyless requests (cheap: Method is an enum, HeaderMap clones Arc-backed Bytes) let retry_state = if body.is_end_stream() { @@ -982,7 +1000,7 @@ impl HttpProxyService { let result = self.forward_h2_with_sender( sender, parts, body, upstream_headers, upstream_path, - route, route_id, source_ip, Some(pool_key), domain, + route, route_id, source_ip, Some(pool_key), domain, conn_activity, ).await; // If the request failed (502) and we can retry with an empty body, do so @@ -993,7 +1011,7 @@ impl HttpProxyService { "Stale pooled H2 sender, retrying with fresh connection"); return self.retry_h2_with_fresh_connection( method, headers, upstream_path, - pool_key, route, route_id, source_ip, domain, + pool_key, route, route_id, source_ip, domain, conn_activity, ).await; } } @@ -1012,6 +1030,7 @@ impl HttpProxyService { route_id: Option<&str>, source_ip: &str, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let backend_key = format!("{}:{}", pool_key.host, pool_key.port); @@ -1064,8 +1083,8 @@ impl HttpProxyService { .timer(hyper_util::rt::TokioTimer::new()) .keep_alive_interval(std::time::Duration::from_secs(10)) .keep_alive_timeout(std::time::Duration::from_secs(5)) - .adaptive_window(true) - .initial_stream_window_size(2 * 1024 * 1024); + .initial_stream_window_size(2 * 1024 * 1024) + .initial_connection_window_size(16 * 1024 * 1024); let (mut sender, conn): ( hyper::client::conn::http2::SendRequest>, hyper::client::conn::http2::Connection, BoxBody, hyper_util::rt::TokioExecutor>, @@ -1116,7 +1135,7 @@ impl HttpProxyService { Ok(resp) => { // Register in pool only after request succeeds self.connection_pool.register_h2(pool_key.clone(), sender); - let result = self.build_streaming_response(resp, route, route_id, source_ip).await; + let result = self.build_streaming_response(resp, route, route_id, source_ip, conn_activity).await; // Close the fresh backend connection (opened above) self.metrics.backend_connection_closed(&backend_key); result @@ -1152,6 +1171,7 @@ impl HttpProxyService { pool_key: &crate::connection_pool::PoolKey, requested_host: Option, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let exec = hyper_util::rt::TokioExecutor::new(); let mut h2_builder = hyper::client::conn::http2::Builder::new(exec); @@ -1159,8 +1179,8 @@ impl HttpProxyService { .timer(hyper_util::rt::TokioTimer::new()) .keep_alive_interval(std::time::Duration::from_secs(10)) .keep_alive_timeout(std::time::Duration::from_secs(5)) - .adaptive_window(true) - .initial_stream_window_size(2 * 1024 * 1024); + .initial_stream_window_size(2 * 1024 * 1024) + .initial_connection_window_size(16 * 1024 * 1024); let handshake_result = tokio::time::timeout( self.connect_timeout, h2_builder.handshake(io), @@ -1196,7 +1216,7 @@ impl HttpProxyService { let fallback_io = TokioIo::new(fallback_backend); let result = self.forward_h1( fallback_io, parts, body, upstream_headers, upstream_path, - upstream, route, route_id, source_ip, &h1_pool_key, domain, + upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, ).await; self.metrics.backend_connection_closed(&bk); result @@ -1244,7 +1264,7 @@ impl HttpProxyService { route_id.map(|s| s.to_string()), Some(source_ip.to_string()), Direction::In, - ); + ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let boxed_body: BoxBody = BoxBody::new(counting_req_body); let upstream_req = upstream_req.body(boxed_body).unwrap(); @@ -1252,7 +1272,7 @@ impl HttpProxyService { Ok(upstream_response) => { // H2 works! Register sender in pool for multiplexed reuse self.connection_pool.register_h2(pool_key.clone(), sender); - self.build_streaming_response(upstream_response, route, route_id, source_ip).await + self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await } Err(e) => { // H2 request failed — backend advertises h2 via ALPN but doesn't @@ -1285,7 +1305,7 @@ impl HttpProxyService { let fallback_io = TokioIo::new(fallback_backend); let result = self.forward_h1_empty_body( fallback_io, method, headers, upstream_path, - route, route_id, source_ip, &h1_pool_key, domain, + route, route_id, source_ip, &h1_pool_key, domain, conn_activity, ).await; // Close the reconnected backend connection (opened in reconnect_backend) self.metrics.backend_connection_closed(&bk); @@ -1334,7 +1354,7 @@ impl HttpProxyService { let fallback_io = TokioIo::new(fallback_backend); let result = self.forward_h1( fallback_io, parts, body, upstream_headers, upstream_path, - upstream, route, route_id, source_ip, &h1_pool_key, domain, + upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, ).await; // Close the reconnected backend connection (opened in reconnect_backend) self.metrics.backend_connection_closed(&bk); @@ -1361,6 +1381,7 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let (mut sender, conn): ( @@ -1407,7 +1428,7 @@ impl HttpProxyService { // Return sender to pool for keep-alive reuse self.connection_pool.checkin_h1(pool_key.clone(), sender); - self.build_streaming_response(upstream_response, route, route_id, source_ip).await + self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await } /// Reconnect to a backend (used for H2→H1 fallback). @@ -1478,6 +1499,7 @@ impl HttpProxyService { source_ip: &str, pool_key: Option<&crate::connection_pool::PoolKey>, domain: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { // Build absolute URI for H2 pseudo-headers (:scheme, :authority) // Use the requested domain as authority (not backend address) so :authority matches Host header @@ -1506,7 +1528,7 @@ impl HttpProxyService { route_id.map(|s| s.to_string()), Some(source_ip.to_string()), Direction::In, - ); + ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let boxed_body: BoxBody = BoxBody::new(counting_req_body); let upstream_req = upstream_req.body(boxed_body).unwrap(); @@ -1527,7 +1549,7 @@ impl HttpProxyService { } }; - self.build_streaming_response(upstream_response, route, route_id, source_ip).await + self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await } /// Build the client-facing response from an upstream response, streaming the body. @@ -1540,6 +1562,7 @@ impl HttpProxyService { route: &rustproxy_config::RouteConfig, route_id: Option<&str>, source_ip: &str, + conn_activity: &ConnActivity, ) -> Result>, hyper::Error> { let (resp_parts, resp_body) = upstream_response.into_parts(); @@ -1560,7 +1583,7 @@ impl HttpProxyService { route_id.map(|s| s.to_string()), Some(source_ip.to_string()), Direction::Out, - ); + ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); let body: BoxBody = BoxBody::new(counting_body); @@ -1579,6 +1602,7 @@ impl HttpProxyService { cancel: CancellationToken, source_ip: &str, is_h2: bool, + conn_activity: Option, ) -> Result>, hyper::Error> { use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -1882,6 +1906,11 @@ impl HttpProxyService { let last_activity = Arc::new(AtomicU64::new(0)); let start = std::time::Instant::now(); + // For H2 WebSocket: also update the connection-level activity tracker + // to prevent the idle watchdog from killing the H2 connection + let conn_act_c2u = conn_activity.as_ref().map(|ca| (Arc::clone(&ca.last_activity), ca.start)); + let conn_act_u2c = conn_activity.as_ref().map(|ca| (Arc::clone(&ca.last_activity), ca.start)); + let la1 = Arc::clone(&last_activity); let c2u = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; @@ -1896,6 +1925,9 @@ impl HttpProxyService { } total += n as u64; la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); + if let Some((ref ca, ca_start)) = conn_act_c2u { + ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); + } } let _ = uw.shutdown().await; total @@ -1915,6 +1947,9 @@ impl HttpProxyService { } total += n as u64; la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); + if let Some((ref ca, ca_start)) = conn_act_u2c { + ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); + } } let _ = cw.shutdown().await; total diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index dff3502..faffce7 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.11.0', + version: '25.11.1', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }