diff --git a/changelog.md b/changelog.md index c531374..e5f8a3e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-16 - 25.11.6 - fix(rustproxy-http,rustproxy-passthrough) +improve upstream connection cleanup and graceful tunnel shutdown + +- Evict pooled HTTP/2 connections when their driver exits and shorten the maximum pooled H2 age to reduce reuse of stale upstream connections. +- Strip hop-by-hop headers from backend responses before forwarding to HTTP/2 clients to avoid invalid H2 response handling. +- Replace immediate task aborts in WebSocket and TCP tunnel watchdogs with cancellation-driven graceful shutdown plus timed fallback aborts. +- Use non-blocking semaphore acquisition in the TCP listener so connection limits do not stall the accept loop for the entire port. + ## 2026-03-16 - 25.11.5 - fix(repo) no changes to commit diff --git a/rust/crates/rustproxy-http/src/connection_pool.rs b/rust/crates/rustproxy-http/src/connection_pool.rs index cb47ff4..95c3f3a 100644 --- a/rust/crates/rustproxy-http/src/connection_pool.rs +++ b/rust/crates/rustproxy-http/src/connection_pool.rs @@ -20,7 +20,8 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(90); const EVICTION_INTERVAL: Duration = Duration::from_secs(30); /// Maximum age for pooled HTTP/2 connections before proactive eviction. /// Prevents staleness from backends that close idle connections (e.g. nginx GOAWAY). -const MAX_H2_AGE: Duration = Duration::from_secs(300); +/// 120s is well within typical server GOAWAY windows (nginx: ~60s idle, envoy: ~60s). +const MAX_H2_AGE: Duration = Duration::from_secs(120); /// Identifies a unique backend endpoint. #[derive(Clone, Debug, Hash, Eq, PartialEq)] diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 5ba5cbb..32ae70a 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -966,11 +966,18 @@ impl HttpProxyService { } }; - tokio::spawn(async move { - if let Err(e) = conn.await { - debug!("HTTP/2 upstream connection error: {}", e); - } - }); + // Spawn the H2 connection driver; proactively evict from pool on exit + // so the next request gets a fresh connection instead of a dead sender. + { + let pool = Arc::clone(&self.connection_pool); + let key = pool_key.clone(); + tokio::spawn(async move { + if let Err(e) = conn.await { + debug!("HTTP/2 upstream connection error: {}", e); + } + pool.remove_h2(&key); + }); + } // Clone sender for potential pool registration; register only after first request succeeds let sender_for_pool = sender.clone(); @@ -1111,11 +1118,17 @@ impl HttpProxyService { } }; - tokio::spawn(async move { - if let Err(e) = conn.await { - debug!("H2 retry: upstream connection error: {}", e); - } - }); + // Spawn the H2 connection driver; proactively evict from pool on exit. + { + let pool = Arc::clone(&self.connection_pool); + let key = pool_key.clone(); + tokio::spawn(async move { + if let Err(e) = conn.await { + debug!("H2 retry: upstream connection error: {}", e); + } + pool.remove_h2(&key); + }); + } // Build request with empty body using absolute URI for H2 pseudo-headers let scheme = if pool_key.use_tls { "https" } else { "http" }; @@ -1234,11 +1247,17 @@ impl HttpProxyService { } } Ok(Ok((mut sender, conn))) => { - tokio::spawn(async move { - if let Err(e) = conn.await { - debug!("HTTP/2 upstream connection error: {}", e); - } - }); + // Spawn the H2 connection driver; proactively evict from pool on exit. + { + let pool = Arc::clone(&self.connection_pool); + let key = pool_key.clone(); + tokio::spawn(async move { + if let Err(e) = conn.await { + debug!("HTTP/2 upstream connection error: {}", e); + } + pool.remove_h2(&key); + }); + } // Save retry state before consuming parts/body (for bodyless requests like GET) // Clone BEFORE removing Host — H1 fallback needs Host header @@ -1578,6 +1597,19 @@ impl HttpProxyService { if let Some(headers) = response.headers_mut() { *headers = resp_parts.headers; + + // Strip hop-by-hop headers from the backend response. + // RFC 9113 §8.2.2 forbids connection-specific headers in HTTP/2 responses; + // forwarding them from an H1 backend can cause H2 stream resets. + // Mirrors the request-path stripping at the forward methods above. + headers.remove("connection"); + headers.remove("keep-alive"); + headers.remove("proxy-connection"); + headers.remove("transfer-encoding"); + headers.remove("te"); + headers.remove("trailer"); + // Note: "upgrade" is intentionally kept — needed for WebSocket 101 responses. + ResponseFilter::apply_headers(route, headers, None); } @@ -1913,6 +1945,10 @@ impl HttpProxyService { let last_activity = Arc::new(AtomicU64::new(0)); let start = std::time::Instant::now(); + // Per-connection cancellation token: the watchdog cancels this instead of + // aborting tasks, so the copy loops can shut down gracefully (TLS close_notify). + let ws_cancel = CancellationToken::new(); + // For H2 WebSocket: also update the connection-level activity tracker // to prevent the idle watchdog from killing the H2 connection let conn_act_c2u = conn_activity.as_ref().map(|ca| (Arc::clone(&ca.last_activity), ca.start)); @@ -1922,13 +1958,17 @@ impl HttpProxyService { let metrics_c2u = Arc::clone(&metrics); let route_c2u = route_id_owned.clone(); let ip_c2u = source_ip_owned.clone(); + let wsc1 = ws_cancel.clone(); let c2u = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; loop { - let n = match cr.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, + let n = tokio::select! { + result = cr.read(&mut buf) => match result { + Ok(0) | Err(_) => break, + Ok(n) => n, + }, + _ = wsc1.cancelled() => break, }; if uw.write_all(&buf[..n]).await.is_err() { break; @@ -1940,7 +1980,11 @@ impl HttpProxyService { ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); } } - let _ = uw.shutdown().await; + // Graceful shutdown with timeout (sends TLS close_notify / TCP FIN) + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + uw.shutdown(), + ).await; total }); @@ -1948,13 +1992,17 @@ impl HttpProxyService { let metrics_u2c = Arc::clone(&metrics); let route_u2c = route_id_owned.clone(); let ip_u2c = source_ip_owned.clone(); + let wsc2 = ws_cancel.clone(); let u2c = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; loop { - let n = match ur.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, + let n = tokio::select! { + result = ur.read(&mut buf) => match result { + Ok(0) | Err(_) => break, + Ok(n) => n, + }, + _ = wsc2.cancelled() => break, }; if cw.write_all(&buf[..n]).await.is_err() { break; @@ -1966,14 +2014,20 @@ impl HttpProxyService { ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed); } } - let _ = cw.shutdown().await; + // Graceful shutdown with timeout (sends TLS close_notify / TCP FIN) + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + cw.shutdown(), + ).await; total }); - // Watchdog: monitors inactivity, max lifetime, and cancellation + // Watchdog: monitors inactivity, max lifetime, and cancellation. + // First cancels the per-connection token for graceful shutdown (close_notify/FIN), + // then falls back to abort if the tasks are stuck (e.g. on a blocked write_all). let la_watch = Arc::clone(&last_activity); - let c2u_handle = c2u.abort_handle(); - let u2c_handle = u2c.abort_handle(); + let c2u_abort = c2u.abort_handle(); + let u2c_abort = u2c.abort_handle(); let inactivity_timeout = ws_inactivity_timeout; let max_lifetime = ws_max_lifetime; @@ -1985,8 +2039,6 @@ impl HttpProxyService { _ = tokio::time::sleep(check_interval) => {} _ = cancel.cancelled() => { debug!("WebSocket tunnel cancelled by shutdown"); - c2u_handle.abort(); - u2c_handle.abort(); break; } } @@ -1994,8 +2046,6 @@ impl HttpProxyService { // Check max lifetime if start.elapsed() >= max_lifetime { debug!("WebSocket tunnel exceeded max lifetime, closing"); - c2u_handle.abort(); - u2c_handle.abort(); break; } @@ -2005,13 +2055,18 @@ impl HttpProxyService { let elapsed_since_activity = start.elapsed().as_millis() as u64 - current; if elapsed_since_activity >= inactivity_timeout.as_millis() as u64 { debug!("WebSocket tunnel inactive for {}ms, closing", elapsed_since_activity); - c2u_handle.abort(); - u2c_handle.abort(); break; } } last_seen = current; } + // Phase 1: Signal copy loops to exit gracefully (allows close_notify/FIN) + ws_cancel.cancel(); + // Phase 2: Wait for graceful shutdown (2s shutdown timeout + 2s margin) + tokio::time::sleep(std::time::Duration::from_secs(4)).await; + // Phase 3: Force-abort if still stuck (e.g. blocked on write_all) + c2u_abort.abort(); + u2c_abort.abort(); }); let bytes_in = c2u.await.unwrap_or(0); diff --git a/rust/crates/rustproxy-passthrough/src/forwarder.rs b/rust/crates/rustproxy-passthrough/src/forwarder.rs index e92d628..447afb4 100644 --- a/rust/crates/rustproxy-passthrough/src/forwarder.rs +++ b/rust/crates/rustproxy-passthrough/src/forwarder.rs @@ -97,16 +97,25 @@ pub async fn forward_bidirectional_with_timeouts( let last_activity = Arc::new(AtomicU64::new(0)); let start = std::time::Instant::now(); + // Per-connection cancellation token: the watchdog cancels this instead of + // aborting tasks, so the copy loops can shut down gracefully (TCP FIN instead + // of RST, TLS close_notify if the stream is TLS-wrapped). + let conn_cancel = CancellationToken::new(); + let la1 = Arc::clone(&last_activity); let initial_len = initial_data.map_or(0u64, |d| d.len() as u64); let metrics_c2b = metrics.clone(); + let cc1 = conn_cancel.clone(); let c2b = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = initial_len; loop { - let n = match client_read.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, + let n = tokio::select! { + result = client_read.read(&mut buf) => match result { + Ok(0) | Err(_) => break, + Ok(n) => n, + }, + _ = cc1.cancelled() => break, }; if backend_write.write_all(&buf[..n]).await.is_err() { break; @@ -117,19 +126,27 @@ pub async fn forward_bidirectional_with_timeouts( ctx.collector.record_bytes(n as u64, 0, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } - let _ = backend_write.shutdown().await; + // Graceful shutdown with timeout (sends TCP FIN / TLS close_notify) + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + backend_write.shutdown(), + ).await; total }); let la2 = Arc::clone(&last_activity); let metrics_b2c = metrics; + let cc2 = conn_cancel.clone(); let b2c = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; loop { - let n = match backend_read.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, + let n = tokio::select! { + result = backend_read.read(&mut buf) => match result { + Ok(0) | Err(_) => break, + Ok(n) => n, + }, + _ = cc2.cancelled() => break, }; if client_write.write_all(&buf[..n]).await.is_err() { break; @@ -140,14 +157,20 @@ pub async fn forward_bidirectional_with_timeouts( ctx.collector.record_bytes(0, n as u64, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } - let _ = client_write.shutdown().await; + // Graceful shutdown with timeout (sends TCP FIN / TLS close_notify) + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + client_write.shutdown(), + ).await; total }); - // Watchdog: inactivity, max lifetime, and cancellation + // Watchdog: inactivity, max lifetime, and cancellation. + // First cancels the per-connection token for graceful shutdown (FIN/close_notify), + // then falls back to abort if the tasks are stuck (e.g. on a blocked write_all). let la_watch = Arc::clone(&last_activity); - let c2b_handle = c2b.abort_handle(); - let b2c_handle = b2c.abort_handle(); + let c2b_abort = c2b.abort_handle(); + let b2c_abort = b2c.abort_handle(); let watchdog = tokio::spawn(async move { let check_interval = std::time::Duration::from_secs(5); let mut last_seen = 0u64; @@ -155,16 +178,12 @@ pub async fn forward_bidirectional_with_timeouts( tokio::select! { _ = cancel.cancelled() => { debug!("Connection cancelled by shutdown"); - c2b_handle.abort(); - b2c_handle.abort(); break; } _ = tokio::time::sleep(check_interval) => { // Check max lifetime if start.elapsed() >= max_lifetime { debug!("Connection exceeded max lifetime, closing"); - c2b_handle.abort(); - b2c_handle.abort(); break; } @@ -174,8 +193,6 @@ pub async fn forward_bidirectional_with_timeouts( let elapsed_since_activity = start.elapsed().as_millis() as u64 - current; if elapsed_since_activity >= inactivity_timeout.as_millis() as u64 { debug!("Connection inactive for {}ms, closing", elapsed_since_activity); - c2b_handle.abort(); - b2c_handle.abort(); break; } } @@ -183,6 +200,13 @@ pub async fn forward_bidirectional_with_timeouts( } } } + // Phase 1: Signal copy loops to exit gracefully (allows FIN/close_notify) + conn_cancel.cancel(); + // Phase 2: Wait for graceful shutdown (2s shutdown timeout + 2s margin) + tokio::time::sleep(std::time::Duration::from_secs(4)).await; + // Phase 3: Force-abort if still stuck (e.g. blocked on write_all) + c2b_abort.abort(); + b2c_abort.abort(); }); let bytes_in = c2b.await.unwrap_or(0); diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index a08d659..094b63e 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -465,21 +465,19 @@ impl TcpListenerManager { Ok((stream, peer_addr)) => { let ip = peer_addr.ip(); - // Global connection limit — acquire semaphore permit with timeout - let permit = match tokio::time::timeout( - std::time::Duration::from_secs(5), - conn_semaphore.clone().acquire_owned(), - ).await { - Ok(Ok(permit)) => permit, - Ok(Err(_)) => { - // Semaphore closed — shouldn't happen, but be safe - debug!("Connection semaphore closed, dropping connection from {}", peer_addr); + // Global connection limit — non-blocking check. + // MUST NOT block the accept loop: a blocking acquire would stall + // ALL connections to this port (not just the one over limit), because + // listener.accept() is not polled while we await the semaphore. + let permit = match conn_semaphore.clone().try_acquire_owned() { + Ok(permit) => permit, + Err(tokio::sync::TryAcquireError::NoPermits) => { + debug!("Global connection limit reached, dropping connection from {}", peer_addr); drop(stream); continue; } - Err(_) => { - // Timeout — global limit reached - debug!("Global connection limit reached, dropping connection from {}", peer_addr); + Err(tokio::sync::TryAcquireError::Closed) => { + debug!("Connection semaphore closed, dropping connection from {}", peer_addr); drop(stream); continue; } @@ -1396,15 +1394,24 @@ impl TcpListenerManager { let last_activity = Arc::new(AtomicU64::new(0)); let start = std::time::Instant::now(); + // Per-connection cancellation token: the watchdog cancels this instead of + // aborting tasks, so the copy loops can shut down gracefully (TLS close_notify + // for terminate/reencrypt mode, TCP FIN for passthrough mode). + let conn_cancel = CancellationToken::new(); + let la1 = Arc::clone(&last_activity); let metrics_c2b = metrics.clone(); + let cc1 = conn_cancel.clone(); let c2b = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; loop { - let n = match client_read.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, + let n = tokio::select! { + result = client_read.read(&mut buf) => match result { + Ok(0) | Err(_) => break, + Ok(n) => n, + }, + _ = cc1.cancelled() => break, }; if backend_write.write_all(&buf[..n]).await.is_err() { break; @@ -1418,19 +1425,27 @@ impl TcpListenerManager { ctx.collector.record_bytes(n as u64, 0, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } - let _ = backend_write.shutdown().await; + // Graceful shutdown with timeout (sends TLS close_notify / TCP FIN) + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + backend_write.shutdown(), + ).await; total }); let la2 = Arc::clone(&last_activity); let metrics_b2c = metrics; + let cc2 = conn_cancel.clone(); let b2c = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; loop { - let n = match backend_read.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, + let n = tokio::select! { + result = backend_read.read(&mut buf) => match result { + Ok(0) | Err(_) => break, + Ok(n) => n, + }, + _ = cc2.cancelled() => break, }; if client_write.write_all(&buf[..n]).await.is_err() { break; @@ -1444,14 +1459,20 @@ impl TcpListenerManager { ctx.collector.record_bytes(0, n as u64, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } - let _ = client_write.shutdown().await; + // Graceful shutdown with timeout (sends TLS close_notify / TCP FIN) + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + client_write.shutdown(), + ).await; total }); - // Watchdog task: check for inactivity, max lifetime, and cancellation + // Watchdog task: check for inactivity, max lifetime, and cancellation. + // First cancels the per-connection token for graceful shutdown (close_notify/FIN), + // then falls back to abort if the tasks are stuck (e.g. on a blocked write_all). let la_watch = Arc::clone(&last_activity); - let c2b_handle = c2b.abort_handle(); - let b2c_handle = b2c.abort_handle(); + let c2b_abort = c2b.abort_handle(); + let b2c_abort = b2c.abort_handle(); let watchdog = tokio::spawn(async move { let check_interval = std::time::Duration::from_secs(5); let mut last_seen = 0u64; @@ -1459,16 +1480,12 @@ impl TcpListenerManager { tokio::select! { _ = cancel.cancelled() => { debug!("Split-stream connection cancelled by shutdown"); - c2b_handle.abort(); - b2c_handle.abort(); break; } _ = tokio::time::sleep(check_interval) => { // Check max lifetime if start.elapsed() >= max_lifetime { debug!("Connection exceeded max lifetime, closing"); - c2b_handle.abort(); - b2c_handle.abort(); break; } @@ -1479,8 +1496,6 @@ impl TcpListenerManager { let elapsed_since_activity = start.elapsed().as_millis() as u64 - current; if elapsed_since_activity >= inactivity_timeout.as_millis() as u64 { debug!("Connection inactive for {}ms, closing", elapsed_since_activity); - c2b_handle.abort(); - b2c_handle.abort(); break; } } @@ -1488,6 +1503,13 @@ impl TcpListenerManager { } } } + // Phase 1: Signal copy loops to exit gracefully (allows close_notify/FIN) + conn_cancel.cancel(); + // Phase 2: Wait for graceful shutdown (2s shutdown timeout + 2s margin) + tokio::time::sleep(std::time::Duration::from_secs(4)).await; + // Phase 3: Force-abort if still stuck (e.g. blocked on write_all) + c2b_abort.abort(); + b2c_abort.abort(); }); let bytes_in = c2b.await.unwrap_or(0); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 6de431f..27cf000 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.11.5', + version: '25.11.6', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }