Compare commits

..

4 Commits

Author SHA1 Message Date
246b44913e v25.11.8
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-16 08:58:11 +00:00
b3d4949225 fix(rustproxy-http): prevent premature idle timeouts during streamed HTTP responses and ensure TLS close_notify is sent on dropped connections 2026-03-16 08:58:11 +00:00
0475e6b442 v25.11.7
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-16 03:01:16 +00:00
8cdb95a853 fix(rustproxy): prevent TLS route reload certificate mismatches and tighten passthrough connection handling 2026-03-16 03:01:16 +00:00
9 changed files with 192 additions and 28 deletions

View File

@@ -1,5 +1,19 @@
# Changelog
## 2026-03-16 - 25.11.8 - fix(rustproxy-http)
prevent premature idle timeouts during streamed HTTP responses and ensure TLS close_notify is sent on dropped connections
- track active streaming response bodies so the HTTP idle watchdog does not close connections mid-transfer
- add a ShutdownOnDrop wrapper for TLS-terminated HTTP connections to send shutdown on drop and avoid improperly terminated TLS sessions
- apply the shutdown wrapper in passthrough TLS terminate and terminate+reencrypt HTTP handling
## 2026-03-16 - 25.11.7 - fix(rustproxy)
prevent TLS route reload certificate mismatches and tighten passthrough connection handling
- Load updated TLS configs before swapping the route manager so newly visible routes always have their certificates available.
- Add timeouts when peeking initial decrypted data after TLS handshake to avoid leaked idle connections.
- Raise dropped, blocked, unmatched, and errored passthrough connection events from debug to warn for better operational visibility.
## 2026-03-16 - 25.11.6 - fix(rustproxy-http,rustproxy-passthrough)
improve upstream connection cleanup and graceful tunnel shutdown

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "25.11.6",
"version": "25.11.8",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",

View File

@@ -27,6 +27,10 @@ pub struct CountingBody<B> {
connection_activity: Option<Arc<AtomicU64>>,
/// Start instant for computing elapsed ms for connection_activity.
activity_start: Option<std::time::Instant>,
/// Optional active-request counter. When set, CountingBody increments on creation
/// and decrements on Drop, keeping the HTTP idle watchdog aware that a response
/// body is still streaming (even after the request handler has returned).
active_requests: Option<Arc<AtomicU64>>,
}
/// Which direction the bytes flow.
@@ -55,6 +59,7 @@ impl<B> CountingBody<B> {
direction,
connection_activity: None,
activity_start: None,
active_requests: None,
}
}
@@ -67,6 +72,15 @@ impl<B> CountingBody<B> {
self
}
/// Set the active-request counter for the HTTP idle watchdog.
/// CountingBody increments on creation and decrements on Drop, ensuring the
/// idle watchdog sees an "active request" while the response body streams.
pub fn with_active_requests(mut self, counter: Arc<AtomicU64>) -> Self {
counter.fetch_add(1, Ordering::Relaxed);
self.active_requests = Some(counter);
self
}
/// Report a chunk of bytes immediately to the metrics collector.
#[inline]
fn report_chunk(&self, len: u64) {
@@ -122,3 +136,13 @@ where
self.inner.size_hint()
}
}
impl<B> Drop for CountingBody<B> {
fn drop(&mut self) {
// Decrement the active-request counter so the HTTP idle watchdog
// knows this response body is no longer streaming.
if let Some(ref counter) = self.active_requests {
counter.fetch_sub(1, Ordering::Relaxed);
}
}
}

View File

@@ -9,6 +9,7 @@ pub mod protocol_cache;
pub mod proxy_service;
pub mod request_filter;
pub mod response_filter;
pub mod shutdown_on_drop;
pub mod template;
pub mod upstream_selector;

View File

@@ -39,6 +39,10 @@ use crate::upstream_selector::UpstreamSelector;
struct ConnActivity {
last_activity: Arc<AtomicU64>,
start: std::time::Instant,
/// Active-request counter from handle_io's idle watchdog. When set, CountingBody
/// increments on creation and decrements on Drop, keeping the watchdog aware that
/// a response body is still streaming after the request handler has returned.
active_requests: Option<Arc<AtomicU64>>,
}
/// Default upstream connect timeout (30 seconds).
@@ -302,7 +306,7 @@ impl HttpProxyService {
let cn = cancel_inner.clone();
let la = Arc::clone(&la_inner);
let st = start;
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start };
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)) };
async move {
let result = svc.handle_request(req, peer, port, cn, ca).await;
// Mark request end — update activity timestamp before guard drops
@@ -1624,6 +1628,15 @@ impl HttpProxyService {
Direction::Out,
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
// Keep active_requests > 0 while the response body streams, so the idle
// watchdog doesn't kill the connection mid-transfer (e.g. during git fetch).
// CountingBody increments on creation and decrements on Drop.
let counting_body = if let Some(ref ar) = conn_activity.active_requests {
counting_body.with_active_requests(Arc::clone(ar))
} else {
counting_body
};
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
Ok(response.body(body).unwrap())

View File

@@ -0,0 +1,90 @@
//! Wrapper that ensures TLS close_notify is sent when the stream is dropped.
//!
//! When hyper drops an HTTP connection (backend error, timeout, normal H2 close),
//! the underlying TLS stream is dropped WITHOUT `shutdown()`. tokio-rustls cannot
//! send `close_notify` in Drop (requires async). This wrapper tracks whether
//! `poll_shutdown` was called and, if not, spawns a background task to send it.
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
/// Wraps an AsyncRead+AsyncWrite stream and ensures `shutdown()` is called when
/// dropped, even if the caller (e.g. hyper) doesn't explicitly shut down.
///
/// This guarantees TLS `close_notify` is sent for TLS-wrapped streams, preventing
/// "GnuTLS recv error (-110): The TLS connection was non-properly terminated" errors.
pub struct ShutdownOnDrop<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> {
inner: Option<S>,
shutdown_called: bool,
}
impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> ShutdownOnDrop<S> {
/// Create a new wrapper around the given stream.
pub fn new(stream: S) -> Self {
Self {
inner: Some(stream),
shutdown_called: false,
}
}
}
impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsyncRead for ShutdownOnDrop<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_read(cx, buf)
}
}
impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsyncWrite for ShutdownOnDrop<S> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_write(cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
let result = Pin::new(this.inner.as_mut().unwrap()).poll_shutdown(cx);
if result.is_ready() {
this.shutdown_called = true;
}
result
}
}
impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> Drop for ShutdownOnDrop<S> {
fn drop(&mut self) {
// If shutdown was already called (hyper closed properly), nothing to do.
// If not (hyper dropped without shutdown — e.g. H2 close, error, timeout),
// spawn a background task to send close_notify / TCP FIN.
if !self.shutdown_called {
if let Some(mut stream) = self.inner.take() {
tokio::spawn(async move {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(2),
tokio::io::AsyncWriteExt::shutdown(&mut stream),
).await;
// stream is dropped here — all resources freed
});
}
}
}
}

View File

@@ -472,12 +472,12 @@ impl TcpListenerManager {
let permit = match conn_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(tokio::sync::TryAcquireError::NoPermits) => {
debug!("Global connection limit reached, dropping connection from {}", peer_addr);
warn!("Global connection limit reached, dropping connection from {}", peer_addr);
drop(stream);
continue;
}
Err(tokio::sync::TryAcquireError::Closed) => {
debug!("Connection semaphore closed, dropping connection from {}", peer_addr);
warn!("Connection semaphore closed, dropping connection from {}", peer_addr);
drop(stream);
continue;
}
@@ -485,7 +485,7 @@ impl TcpListenerManager {
// Check per-IP limits and rate limiting
if !conn_tracker.try_accept(&ip) {
debug!("Rejected connection from {} (per-IP limit or rate limit)", peer_addr);
warn!("Rejected connection from {} (per-IP limit or rate limit)", peer_addr);
drop(stream);
drop(permit);
continue;
@@ -517,7 +517,7 @@ impl TcpListenerManager {
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc,
).await;
if let Err(e) = result {
debug!("Connection error from {}: {}", peer_addr, e);
warn!("Connection error from {}: {}", peer_addr, e);
}
});
}
@@ -662,7 +662,7 @@ impl TcpListenerManager {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
security, &peer_addr.ip(),
) {
debug!("Connection from {} blocked by route security", peer_addr);
warn!("Connection from {} blocked by route security", peer_addr);
return Ok(());
}
}
@@ -808,7 +808,7 @@ impl TcpListenerManager {
let route_match = match route_match {
Some(rm) => rm,
None => {
debug!("No route matched for port {} domain {:?}", port, domain);
warn!("No route matched for port {} domain {:?} from {}", port, domain, peer_addr);
if is_http {
// Send a proper HTTP error instead of dropping the connection
use tokio::io::AsyncWriteExt;
@@ -842,7 +842,7 @@ impl TcpListenerManager {
security,
&peer_addr.ip(),
) {
debug!("Connection from {} blocked by route security", peer_addr);
warn!("Connection from {} blocked by route security", peer_addr);
return Ok(());
}
}
@@ -985,13 +985,18 @@ impl TcpListenerManager {
Err(_) => return Err("TLS handshake timeout".into()),
};
// Peek at decrypted data to determine if HTTP
// Peek at decrypted data to determine if HTTP.
// Timeout prevents connection leak if client completes TLS
// but never sends application data (scanners, health probes, slow-loris).
let mut buf_stream = tokio::io::BufReader::new(tls_stream);
let peeked = {
use tokio::io::AsyncBufReadExt;
match buf_stream.fill_buf().await {
Ok(data) => sni_parser::is_http(data),
Err(_) => false,
match tokio::time::timeout(
std::time::Duration::from_millis(conn_config.initial_data_timeout_ms),
buf_stream.fill_buf(),
).await {
Ok(Ok(data)) => sni_parser::is_http(data),
Ok(Err(_)) | Err(_) => false,
}
};
@@ -1009,7 +1014,11 @@ impl TcpListenerManager {
"TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})",
peer_addr, target_host, target_port, domain
);
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
// Wrap in ShutdownOnDrop to ensure TLS close_notify is sent
// even if hyper drops the connection without calling shutdown
// (e.g. H2 close, backend error, idle timeout drain).
let wrapped = rustproxy_http::shutdown_on_drop::ShutdownOnDrop::new(buf_stream);
http_proxy.handle_io(wrapped, peer_addr, port, cancel.clone()).await;
} else {
debug!(
"TLS Terminate + TCP: {} -> {}:{} (domain: {:?})",
@@ -1060,13 +1069,18 @@ impl TcpListenerManager {
Err(_) => return Err("TLS handshake timeout".into()),
};
// Peek at decrypted data to detect protocol
// Peek at decrypted data to detect protocol.
// Timeout prevents connection leak if client completes TLS
// but never sends application data (scanners, health probes, slow-loris).
let mut buf_stream = tokio::io::BufReader::new(tls_stream);
let is_http_data = {
use tokio::io::AsyncBufReadExt;
match buf_stream.fill_buf().await {
Ok(data) => sni_parser::is_http(data),
Err(_) => false,
match tokio::time::timeout(
std::time::Duration::from_millis(conn_config.initial_data_timeout_ms),
buf_stream.fill_buf(),
).await {
Ok(Ok(data)) => sni_parser::is_http(data),
Ok(Err(_)) | Err(_) => false,
}
};
@@ -1086,7 +1100,10 @@ impl TcpListenerManager {
"TLS Terminate+Reencrypt + HTTP: {} (domain: {:?})",
peer_addr, domain
);
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
// Wrap in ShutdownOnDrop to ensure TLS close_notify is sent
// even if hyper drops the connection without calling shutdown.
let wrapped = rustproxy_http::shutdown_on_drop::ShutdownOnDrop::new(buf_stream);
http_proxy.handle_io(wrapped, peer_addr, port, cancel.clone()).await;
} else {
// Non-HTTP: TLS-to-TLS tunnel (existing behavior for raw TCP protocols)
debug!(

View File

@@ -632,15 +632,13 @@ impl RustProxy {
let new_manager = Arc::new(new_manager);
self.route_table.store(Arc::clone(&new_manager));
// Update listener manager
// Update listener manager.
// IMPORTANT: TLS configs must be swapped BEFORE the route manager so that
// new routes only become visible after their certs are loaded. The reverse
// order (routes first) creates a window where connections match new routes
// but get the old TLS acceptor, causing cert mismatches.
if let Some(ref mut listener) = self.listener_manager {
listener.update_route_manager(Arc::clone(&new_manager));
// Cancel connections on routes that were removed or disabled
listener.invalidate_removed_routes(&active_route_ids);
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
listener.prune_http_proxy_caches(&active_route_ids);
// Update TLS configs
// 1. Update TLS configs first (so new certs are available before new routes)
let mut tls_configs = Self::extract_tls_configs(&routes);
if let Some(ref cm_arc) = self.cert_manager {
let cm = cm_arc.lock().await;
@@ -661,6 +659,13 @@ impl RustProxy {
}
listener.set_tls_configs(tls_configs);
// 2. Now swap the route manager (new routes become visible with certs already loaded)
listener.update_route_manager(Arc::clone(&new_manager));
// Cancel connections on routes that were removed or disabled
listener.invalidate_removed_routes(&active_route_ids);
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
listener.prune_http_proxy_caches(&active_route_ids);
// Add new ports
for port in &new_ports {
if !old_ports.contains(port) {

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '25.11.6',
version: '25.11.8',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}