diff --git a/changelog.md b/changelog.md index b04d5cb..a58a4b4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-02-13 - 24.0.1 - fix(proxy) +improve proxy robustness: add connect timeouts, graceful shutdown, WebSocket watchdog, and metrics guard + +- Add tokio-util CancellationToken to HTTP handlers to support graceful shutdown (stop accepting new requests while letting in-flight requests finish). +- Introduce configurable upstream connect timeout (DEFAULT_CONNECT_TIMEOUT) and return 504 Gateway Timeout on connect timeouts to avoid hanging connections. +- Add WebSocket watchdog with inactivity and max-lifetime checks, activity tracking via AtomicU64, and cancellation-driven tunnel aborts. +- Add ConnectionGuard RAII in passthrough listener to ensure metrics.connection_closed() is called on all exit paths and disarm the guard when handing off to the HTTP proxy. +- Expose HttpProxyService::with_connect_timeout and wire connection timeout from ConnectionConfig into listeners. +- Add tokio-util workspace dependency (CancellationToken) and related code changes across rustproxy-http and rustproxy-passthrough. + ## 2026-02-13 - 24.0.0 - BREAKING CHANGE(smart-proxy) move certificate persistence to an in-memory store and introduce consumer-managed certStore API; add default self-signed fallback cert and change ACME account handling diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3914aa8..2a3c79a 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -971,6 +971,7 @@ dependencies = [ "rustproxy-security", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", ] diff --git a/rust/crates/rustproxy-http/Cargo.toml b/rust/crates/rustproxy-http/Cargo.toml index f3c7ba1..85415ae 100644 --- a/rust/crates/rustproxy-http/Cargo.toml +++ b/rust/crates/rustproxy-http/Cargo.toml @@ -22,3 +22,4 @@ thiserror = { workspace = true } anyhow = { workspace = true } arc-swap = { workspace = true } dashmap = { workspace = true } +tokio-util = { workspace = true } diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 935ca29..f9ef1b1 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use bytes::Bytes; use http_body_util::{BodyExt, Full, combinators::BoxBody}; @@ -14,6 +15,7 @@ use hyper::{Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use regex::Regex; use tokio::net::TcpStream; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use rustproxy_routing::RouteManager; @@ -23,11 +25,22 @@ use crate::request_filter::RequestFilter; use crate::response_filter::ResponseFilter; use crate::upstream_selector::UpstreamSelector; +/// Default upstream connect timeout (30 seconds). +const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + +/// Default WebSocket inactivity timeout (1 hour). +const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600); + +/// Default WebSocket max lifetime (24 hours). +const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400); + /// HTTP proxy service that processes HTTP traffic. pub struct HttpProxyService { route_manager: Arc, metrics: Arc, upstream_selector: UpstreamSelector, + /// Timeout for connecting to upstream backends. + connect_timeout: std::time::Duration, } impl HttpProxyService { @@ -36,6 +49,21 @@ impl HttpProxyService { route_manager, metrics, upstream_selector: UpstreamSelector::new(), + connect_timeout: DEFAULT_CONNECT_TIMEOUT, + } + } + + /// Create with a custom connect timeout. + pub fn with_connect_timeout( + route_manager: Arc, + metrics: Arc, + connect_timeout: std::time::Duration, + ) -> Self { + Self { + route_manager, + metrics, + upstream_selector: UpstreamSelector::new(), + connect_timeout, } } @@ -45,41 +73,59 @@ impl HttpProxyService { stream: TcpStream, peer_addr: std::net::SocketAddr, port: u16, + cancel: CancellationToken, ) { - self.handle_io(stream, peer_addr, port).await; + self.handle_io(stream, peer_addr, port, cancel).await; } /// Handle an incoming HTTP connection on any IO type (plain TCP or TLS-terminated). /// - /// Uses HTTP/1.1 with upgrade support. For clients that negotiate HTTP/2, - /// use `handle_io_auto` instead. + /// Uses HTTP/1.1 with upgrade support. Responds to graceful shutdown via the + /// cancel token — in-flight requests complete, but no new requests are accepted. pub async fn handle_io( self: Arc, stream: I, peer_addr: std::net::SocketAddr, port: u16, + cancel: CancellationToken, ) where I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, { let io = TokioIo::new(stream); + let cancel_inner = cancel.clone(); let service = hyper::service::service_fn(move |req: Request| { let svc = Arc::clone(&self); let peer = peer_addr; + let cn = cancel_inner.clone(); async move { - svc.handle_request(req, peer, port).await + svc.handle_request(req, peer, port, cn).await } }); // Use http1::Builder with upgrades for WebSocket support - let conn = hyper::server::conn::http1::Builder::new() + let mut conn = hyper::server::conn::http1::Builder::new() .keep_alive(true) .serve_connection(io, service) .with_upgrades(); - if let Err(e) = conn.await { - debug!("HTTP connection error from {}: {}", peer_addr, e); + // Use select to support graceful shutdown via cancellation token + let conn_pin = std::pin::Pin::new(&mut conn); + tokio::select! { + result = conn_pin => { + if let Err(e) = result { + debug!("HTTP connection error from {}: {}", peer_addr, e); + } + } + _ = cancel.cancelled() => { + // Graceful shutdown: let in-flight request finish, stop accepting new ones + let conn_pin = std::pin::Pin::new(&mut conn); + conn_pin.graceful_shutdown(); + if let Err(e) = conn.await { + debug!("HTTP connection error during shutdown from {}: {}", peer_addr, e); + } + } } } @@ -89,6 +135,7 @@ impl HttpProxyService { req: Request, peer_addr: std::net::SocketAddr, port: u16, + cancel: CancellationToken, ) -> Result>, hyper::Error> { let host = req.headers() .get("host") @@ -184,7 +231,7 @@ impl HttpProxyService { if is_websocket { let result = self.handle_websocket_upgrade( - req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, + req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, ).await; // Note: for WebSocket, connection_ended is called inside // the spawned tunnel task when the connection closes. @@ -223,15 +270,24 @@ impl HttpProxyService { } } - // Connect to upstream - let upstream_stream = match TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)).await { - Ok(s) => s, - Err(e) => { + // Connect to upstream with timeout + let upstream_stream = match tokio::time::timeout( + self.connect_timeout, + TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)), + ).await { + Ok(Ok(s)) => s, + Ok(Err(e)) => { error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(&upstream_key); self.metrics.connection_closed(route_id); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } + Err(_) => { + error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port); + self.upstream_selector.connection_ended(&upstream_key); + self.metrics.connection_closed(route_id); + return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); + } }; upstream_stream.set_nodelay(true).ok(); @@ -394,6 +450,7 @@ impl HttpProxyService { route: &rustproxy_config::RouteConfig, route_id: Option<&str>, upstream_key: &str, + cancel: CancellationToken, ) -> Result>, hyper::Error> { use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -417,16 +474,24 @@ impl HttpProxyService { info!("WebSocket upgrade from {} -> {}:{}", peer_addr, upstream.host, upstream.port); - let mut upstream_stream = match TcpStream::connect( - format!("{}:{}", upstream.host, upstream.port) + // Connect to upstream with timeout + let mut upstream_stream = match tokio::time::timeout( + self.connect_timeout, + TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)), ).await { - Ok(s) => s, - Err(e) => { + Ok(Ok(s)) => s, + Ok(Err(e)) => { error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(upstream_key); self.metrics.connection_closed(route_id); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } + Err(_) => { + error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port); + self.upstream_selector.connection_ended(upstream_key); + self.metrics.connection_closed(route_id); + return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); + } }; upstream_stream.set_nodelay(true).ok(); @@ -591,6 +656,11 @@ impl HttpProxyService { let (mut cr, mut cw) = tokio::io::split(client_io); let (mut ur, mut uw) = tokio::io::split(upstream_stream); + // Shared activity tracker for the watchdog + let last_activity = Arc::new(AtomicU64::new(0)); + let start = std::time::Instant::now(); + + let la1 = Arc::clone(&last_activity); let c2u = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; @@ -603,11 +673,13 @@ impl HttpProxyService { break; } total += n as u64; + la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); } let _ = uw.shutdown().await; total }); + let la2 = Arc::clone(&last_activity); let u2c = tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut total = 0u64; @@ -620,13 +692,59 @@ impl HttpProxyService { break; } total += n as u64; + la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); } let _ = cw.shutdown().await; total }); + // Watchdog: monitors inactivity, max lifetime, and cancellation + let la_watch = Arc::clone(&last_activity); + let c2u_handle = c2u.abort_handle(); + let u2c_handle = u2c.abort_handle(); + let inactivity_timeout = DEFAULT_WS_INACTIVITY_TIMEOUT; + let max_lifetime = DEFAULT_WS_MAX_LIFETIME; + + let watchdog = tokio::spawn(async move { + let check_interval = std::time::Duration::from_secs(5); + let mut last_seen = 0u64; + loop { + tokio::select! { + _ = tokio::time::sleep(check_interval) => {} + _ = cancel.cancelled() => { + debug!("WebSocket tunnel cancelled by shutdown"); + c2u_handle.abort(); + u2c_handle.abort(); + break; + } + } + + // Check max lifetime + if start.elapsed() >= max_lifetime { + debug!("WebSocket tunnel exceeded max lifetime, closing"); + c2u_handle.abort(); + u2c_handle.abort(); + break; + } + + // Check inactivity + let current = la_watch.load(Ordering::Relaxed); + if current == last_seen { + let elapsed_since_activity = start.elapsed().as_millis() as u64 - current; + if elapsed_since_activity >= inactivity_timeout.as_millis() as u64 { + debug!("WebSocket tunnel inactive for {}ms, closing", elapsed_since_activity); + c2u_handle.abort(); + u2c_handle.abort(); + break; + } + } + last_seen = current; + } + }); + let bytes_in = c2u.await.unwrap_or(0); let bytes_out = u2c.await.unwrap_or(0); + watchdog.abort(); debug!("WebSocket tunnel closed: {} bytes in, {} bytes out", bytes_in, bytes_out); @@ -812,6 +930,7 @@ impl Default for HttpProxyService { route_manager: Arc::new(RouteManager::new(vec![])), metrics: Arc::new(MetricsCollector::new()), upstream_selector: UpstreamSelector::new(), + connect_timeout: DEFAULT_CONNECT_TIMEOUT, } } } diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index bd5cda1..94baee2 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -15,6 +15,38 @@ use crate::forwarder; use crate::tls_handler; use crate::connection_tracker::ConnectionTracker; +/// RAII guard that decrements the active connection metric on drop. +/// Ensures connection_closed is called on ALL exit paths — normal, error, or panic. +struct ConnectionGuard { + metrics: Arc, + route_id: Option, + disarmed: bool, +} + +impl ConnectionGuard { + fn new(metrics: Arc, route_id: Option<&str>) -> Self { + Self { + metrics, + route_id: route_id.map(|s| s.to_string()), + disarmed: false, + } + } + + /// Disarm the guard — prevents the Drop from running. + /// Use when handing off to a path that manages its own cleanup (e.g., HTTP proxy). + fn disarm(mut self) { + self.disarmed = true; + } +} + +impl Drop for ConnectionGuard { + fn drop(&mut self) { + if !self.disarmed { + self.metrics.connection_closed(self.route_id.as_deref()); + } + } +} + #[derive(Debug, Error)] pub enum ListenerError { #[error("Failed to bind port {port}: {source}")] @@ -105,11 +137,12 @@ pub struct TcpListenerManager { impl TcpListenerManager { pub fn new(route_manager: Arc) -> Self { let metrics = Arc::new(MetricsCollector::new()); - let http_proxy = Arc::new(HttpProxyService::new( + let conn_config = ConnectionConfig::default(); + let http_proxy = Arc::new(HttpProxyService::with_connect_timeout( Arc::clone(&route_manager), Arc::clone(&metrics), + std::time::Duration::from_millis(conn_config.connection_timeout_ms), )); - let conn_config = ConnectionConfig::default(); let conn_tracker = Arc::new(ConnectionTracker::new( conn_config.max_connections_per_ip, conn_config.connection_rate_limit_per_minute, @@ -129,11 +162,12 @@ impl TcpListenerManager { /// Create with a metrics collector. pub fn with_metrics(route_manager: Arc, metrics: Arc) -> Self { - let http_proxy = Arc::new(HttpProxyService::new( + let conn_config = ConnectionConfig::default(); + let http_proxy = Arc::new(HttpProxyService::with_connect_timeout( Arc::clone(&route_manager), Arc::clone(&metrics), + std::time::Duration::from_millis(conn_config.connection_timeout_ms), )); - let conn_config = ConnectionConfig::default(); let conn_tracker = Arc::new(ConnectionTracker::new( conn_config.max_connections_per_ip, conn_config.connection_rate_limit_per_minute, @@ -427,6 +461,7 @@ impl TcpListenerManager { } metrics.connection_opened(route_id); + let _fast_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id); let connect_timeout = std::time::Duration::from_millis(conn_config.connection_timeout_ms); let inactivity_timeout = std::time::Duration::from_millis(conn_config.socket_timeout_ms); @@ -442,14 +477,8 @@ impl TcpListenerManager { tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)), ).await { Ok(Ok(s)) => s, - Ok(Err(e)) => { - metrics.connection_closed(route_id); - return Err(e.into()); - } - Err(_) => { - metrics.connection_closed(route_id); - return Err("Backend connection timeout".into()); - } + Ok(Err(e)) => return Err(e.into()), + Err(_) => return Err("Backend connection timeout".into()), }; backend.set_nodelay(true)?; @@ -480,7 +509,6 @@ impl TcpListenerManager { metrics.record_bytes(bytes_in, bytes_out, route_id); } - metrics.connection_closed(route_id); return Ok(()); } } @@ -617,8 +645,9 @@ impl TcpListenerManager { } } - // Track connection in metrics + // Track connection in metrics — guard ensures connection_closed on all exit paths metrics.connection_opened(route_id); + let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id); // Check if this is a socket-handler route that should be relayed to TypeScript if route_match.route.action.action_type == RouteActionType::SocketHandler { @@ -628,16 +657,13 @@ impl TcpListenerManager { }; if let Some(relay_socket_path) = relay_path { - let result = Self::relay_to_socket_handler( + return Self::relay_to_socket_handler( stream, n, port, peer_addr, &route_match, domain.as_deref(), is_tls, &relay_socket_path, ).await; - metrics.connection_closed(route_id); - return result; } else { debug!("Socket-handler route matched but no relay path configured"); - metrics.connection_closed(route_id); return Ok(()); } } @@ -646,7 +672,6 @@ impl TcpListenerManager { Some(t) => t, None => { debug!("Route matched but no target available"); - metrics.connection_closed(route_id); return Ok(()); } }; @@ -765,7 +790,9 @@ impl TcpListenerManager { "TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})", peer_addr, target_host, target_port, domain ); - http_proxy.handle_io(buf_stream, peer_addr, port).await; + // HTTP proxy manages its own per-request metrics — disarm TCP-level guard + _conn_guard.disarm(); + http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await; } else { debug!( "TLS Terminate + TCP: {} -> {}:{} (domain: {:?})", @@ -805,7 +832,9 @@ impl TcpListenerManager { if is_http { // Plain HTTP - use HTTP proxy for request-level routing debug!("HTTP proxy: {} on port {}", peer_addr, port); - http_proxy.handle_connection(stream, peer_addr, port).await; + // HTTP proxy manages its own per-request metrics — disarm TCP-level guard + _conn_guard.disarm(); + http_proxy.handle_connection(stream, peer_addr, port, cancel.clone()).await; Ok(()) } else { // Plain TCP forwarding (non-HTTP) @@ -843,7 +872,7 @@ impl TcpListenerManager { } }; - metrics.connection_closed(route_id); + // ConnectionGuard handles metrics.connection_closed() on drop result } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 4665216..181df75 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '24.0.0', + version: '24.0.1', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }