diff --git a/changelog.md b/changelog.md index 034b3f9..137d656 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-16 - 25.11.8 - fix(rustproxy-http) +prevent premature idle timeouts during streamed HTTP responses and ensure TLS close_notify is sent on dropped connections + +- track active streaming response bodies so the HTTP idle watchdog does not close connections mid-transfer +- add a ShutdownOnDrop wrapper for TLS-terminated HTTP connections to send shutdown on drop and avoid improperly terminated TLS sessions +- apply the shutdown wrapper in passthrough TLS terminate and terminate+reencrypt HTTP handling + ## 2026-03-16 - 25.11.7 - fix(rustproxy) prevent TLS route reload certificate mismatches and tighten passthrough connection handling diff --git a/rust/crates/rustproxy-http/src/counting_body.rs b/rust/crates/rustproxy-http/src/counting_body.rs index 996b192..b330e68 100644 --- a/rust/crates/rustproxy-http/src/counting_body.rs +++ b/rust/crates/rustproxy-http/src/counting_body.rs @@ -27,6 +27,10 @@ pub struct CountingBody { connection_activity: Option>, /// Start instant for computing elapsed ms for connection_activity. activity_start: Option, + /// Optional active-request counter. When set, CountingBody increments on creation + /// and decrements on Drop, keeping the HTTP idle watchdog aware that a response + /// body is still streaming (even after the request handler has returned). + active_requests: Option>, } /// Which direction the bytes flow. @@ -55,6 +59,7 @@ impl CountingBody { direction, connection_activity: None, activity_start: None, + active_requests: None, } } @@ -67,6 +72,15 @@ impl CountingBody { self } + /// Set the active-request counter for the HTTP idle watchdog. + /// CountingBody increments on creation and decrements on Drop, ensuring the + /// idle watchdog sees an "active request" while the response body streams. + pub fn with_active_requests(mut self, counter: Arc) -> Self { + counter.fetch_add(1, Ordering::Relaxed); + self.active_requests = Some(counter); + self + } + /// Report a chunk of bytes immediately to the metrics collector. #[inline] fn report_chunk(&self, len: u64) { @@ -122,3 +136,13 @@ where self.inner.size_hint() } } + +impl Drop for CountingBody { + fn drop(&mut self) { + // Decrement the active-request counter so the HTTP idle watchdog + // knows this response body is no longer streaming. + if let Some(ref counter) = self.active_requests { + counter.fetch_sub(1, Ordering::Relaxed); + } + } +} diff --git a/rust/crates/rustproxy-http/src/lib.rs b/rust/crates/rustproxy-http/src/lib.rs index dddf183..f077b49 100644 --- a/rust/crates/rustproxy-http/src/lib.rs +++ b/rust/crates/rustproxy-http/src/lib.rs @@ -9,6 +9,7 @@ pub mod protocol_cache; pub mod proxy_service; pub mod request_filter; pub mod response_filter; +pub mod shutdown_on_drop; pub mod template; pub mod upstream_selector; diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 32ae70a..1256216 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -39,6 +39,10 @@ use crate::upstream_selector::UpstreamSelector; struct ConnActivity { last_activity: Arc, start: std::time::Instant, + /// Active-request counter from handle_io's idle watchdog. When set, CountingBody + /// increments on creation and decrements on Drop, keeping the watchdog aware that + /// a response body is still streaming after the request handler has returned. + active_requests: Option>, } /// Default upstream connect timeout (30 seconds). @@ -302,7 +306,7 @@ impl HttpProxyService { let cn = cancel_inner.clone(); let la = Arc::clone(&la_inner); let st = start; - let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start }; + let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)) }; async move { let result = svc.handle_request(req, peer, port, cn, ca).await; // Mark request end — update activity timestamp before guard drops @@ -1624,6 +1628,15 @@ impl HttpProxyService { Direction::Out, ).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start); + // Keep active_requests > 0 while the response body streams, so the idle + // watchdog doesn't kill the connection mid-transfer (e.g. during git fetch). + // CountingBody increments on creation and decrements on Drop. + let counting_body = if let Some(ref ar) = conn_activity.active_requests { + counting_body.with_active_requests(Arc::clone(ar)) + } else { + counting_body + }; + let body: BoxBody = BoxBody::new(counting_body); Ok(response.body(body).unwrap()) diff --git a/rust/crates/rustproxy-http/src/shutdown_on_drop.rs b/rust/crates/rustproxy-http/src/shutdown_on_drop.rs new file mode 100644 index 0000000..d878b27 --- /dev/null +++ b/rust/crates/rustproxy-http/src/shutdown_on_drop.rs @@ -0,0 +1,90 @@ +//! Wrapper that ensures TLS close_notify is sent when the stream is dropped. +//! +//! When hyper drops an HTTP connection (backend error, timeout, normal H2 close), +//! the underlying TLS stream is dropped WITHOUT `shutdown()`. tokio-rustls cannot +//! send `close_notify` in Drop (requires async). This wrapper tracks whether +//! `poll_shutdown` was called and, if not, spawns a background task to send it. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +/// Wraps an AsyncRead+AsyncWrite stream and ensures `shutdown()` is called when +/// dropped, even if the caller (e.g. hyper) doesn't explicitly shut down. +/// +/// This guarantees TLS `close_notify` is sent for TLS-wrapped streams, preventing +/// "GnuTLS recv error (-110): The TLS connection was non-properly terminated" errors. +pub struct ShutdownOnDrop { + inner: Option, + shutdown_called: bool, +} + +impl ShutdownOnDrop { + /// Create a new wrapper around the given stream. + pub fn new(stream: S) -> Self { + Self { + inner: Some(stream), + shutdown_called: false, + } + } +} + +impl AsyncRead for ShutdownOnDrop { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_read(cx, buf) + } +} + +impl AsyncWrite for ShutdownOnDrop { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + let result = Pin::new(this.inner.as_mut().unwrap()).poll_shutdown(cx); + if result.is_ready() { + this.shutdown_called = true; + } + result + } +} + +impl Drop for ShutdownOnDrop { + fn drop(&mut self) { + // If shutdown was already called (hyper closed properly), nothing to do. + // If not (hyper dropped without shutdown — e.g. H2 close, error, timeout), + // spawn a background task to send close_notify / TCP FIN. + if !self.shutdown_called { + if let Some(mut stream) = self.inner.take() { + tokio::spawn(async move { + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + tokio::io::AsyncWriteExt::shutdown(&mut stream), + ).await; + // stream is dropped here — all resources freed + }); + } + } + } +} diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index c06ae22..8f0f1b3 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -1014,7 +1014,11 @@ impl TcpListenerManager { "TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})", peer_addr, target_host, target_port, domain ); - http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await; + // Wrap in ShutdownOnDrop to ensure TLS close_notify is sent + // even if hyper drops the connection without calling shutdown + // (e.g. H2 close, backend error, idle timeout drain). + let wrapped = rustproxy_http::shutdown_on_drop::ShutdownOnDrop::new(buf_stream); + http_proxy.handle_io(wrapped, peer_addr, port, cancel.clone()).await; } else { debug!( "TLS Terminate + TCP: {} -> {}:{} (domain: {:?})", @@ -1096,7 +1100,10 @@ impl TcpListenerManager { "TLS Terminate+Reencrypt + HTTP: {} (domain: {:?})", peer_addr, domain ); - http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await; + // Wrap in ShutdownOnDrop to ensure TLS close_notify is sent + // even if hyper drops the connection without calling shutdown. + let wrapped = rustproxy_http::shutdown_on_drop::ShutdownOnDrop::new(buf_stream); + http_proxy.handle_io(wrapped, peer_addr, port, cancel.clone()).await; } else { // Non-HTTP: TLS-to-TLS tunnel (existing behavior for raw TCP protocols) debug!( diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 7a8f475..6db59bf 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.11.7', + version: '25.11.8', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }