diff --git a/changelog.md b/changelog.md index 7afb776..1a4183f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-03-26 - 26.2.4 - fix(rustproxy-http) +improve HTTP/3 connection reuse and clean up stale proxy state + +- Reuse pooled HTTP/3 SendRequest handles to skip repeated SETTINGS handshakes and reduce request overhead on QUIC pool hits +- Add periodic cleanup for per-route rate limiters and orphaned backend metrics to prevent unbounded memory growth after traffic or backend errors stop +- Enforce HTTP max connection lifetime alongside idle timeouts and apply configured lifetime values from the TCP listener +- Reduce HTTP/3 body copying by using owned Bytes paths for request and response streaming, and replace the custom response body adapter with a stream-based implementation +- Harden auxiliary proxy components by capping datagram handler buffer growth and removing duplicate RustProxy exit listeners + ## 2026-03-25 - 26.2.3 - fix(repo) no changes to commit diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 4bdeb1d..79663c5 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1270,6 +1270,7 @@ dependencies = [ "arc-swap", "bytes", "dashmap", + "futures", "h3", "h3-quinn", "http-body", diff --git a/rust/crates/rustproxy-http/Cargo.toml b/rust/crates/rustproxy-http/Cargo.toml index 4596f55..041630b 100644 --- a/rust/crates/rustproxy-http/Cargo.toml +++ b/rust/crates/rustproxy-http/Cargo.toml @@ -30,3 +30,4 @@ socket2 = { workspace = true } quinn = { workspace = true } h3 = { workspace = true } h3-quinn = { workspace = true } +futures = { version = "0.3", default-features = false, features = ["std"] } diff --git a/rust/crates/rustproxy-http/src/connection_pool.rs b/rust/crates/rustproxy-http/src/connection_pool.rs index 8efaca3..afa4033 100644 --- a/rust/crates/rustproxy-http/src/connection_pool.rs +++ b/rust/crates/rustproxy-http/src/connection_pool.rs @@ -56,7 +56,11 @@ struct PooledH2 { } /// A pooled QUIC/HTTP/3 connection (multiplexed like H2). +/// Stores the h3 `SendRequest` handle so pool hits skip the h3 SETTINGS handshake. pub struct PooledH3 { + /// Multiplexed h3 request handle — clone to open a new stream. + pub send_request: h3::client::SendRequest, + /// Raw QUIC connection — kept for liveness probing (close_reason) only. pub connection: quinn::Connection, pub created_at: Instant, pub generation: u64, @@ -197,7 +201,10 @@ impl ConnectionPool { /// Try to get a pooled QUIC connection for the given key. /// QUIC connections are multiplexed — the connection is shared, not removed. - pub fn checkout_h3(&self, key: &PoolKey) -> Option<(quinn::Connection, Duration)> { + pub fn checkout_h3( + &self, + key: &PoolKey, + ) -> Option<(h3::client::SendRequest, quinn::Connection, Duration)> { let entry = self.h3_pool.get(key)?; let pooled = entry.value(); let age = pooled.created_at.elapsed(); @@ -215,13 +222,20 @@ impl ConnectionPool { return None; } - Some((pooled.connection.clone(), age)) + Some((pooled.send_request.clone(), pooled.connection.clone(), age)) } - /// Register a QUIC connection in the pool. Returns the generation ID. - pub fn register_h3(&self, key: PoolKey, connection: quinn::Connection) -> u64 { + /// Register a QUIC connection and its h3 SendRequest handle in the pool. + /// Returns the generation ID. + pub fn register_h3( + &self, + key: PoolKey, + connection: quinn::Connection, + send_request: h3::client::SendRequest, + ) -> u64 { let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed); self.h3_pool.insert(key, PooledH3 { + send_request, connection, created_at: Instant::now(), generation: gen, diff --git a/rust/crates/rustproxy-http/src/h3_service.rs b/rust/crates/rustproxy-http/src/h3_service.rs index e1fd612..0d05ea3 100644 --- a/rust/crates/rustproxy-http/src/h3_service.rs +++ b/rust/crates/rustproxy-http/src/h3_service.rs @@ -116,7 +116,7 @@ async fn handle_h3_request( cancel: CancellationToken, ) -> anyhow::Result<()> { // Stream request body from H3 client via an mpsc channel. - let (body_tx, body_rx) = tokio::sync::mpsc::channel::(4); + let (body_tx, body_rx) = tokio::sync::mpsc::channel::(32); // Spawn the H3 body reader task with cancellation let body_cancel = cancel.clone(); @@ -132,8 +132,7 @@ async fn handle_h3_request( } }; let mut chunk = chunk; - let data = Bytes::copy_from_slice(chunk.chunk()); - chunk.advance(chunk.remaining()); + let data = chunk.copy_to_bytes(chunk.remaining()); if body_tx.send(data).await.is_err() { break; } @@ -179,8 +178,8 @@ async fn handle_h3_request( while let Some(frame) = resp_body.frame().await { match frame { Ok(frame) => { - if let Some(data) = frame.data_ref() { - stream.send_data(Bytes::copy_from_slice(data)).await + if let Ok(data) = frame.into_data() { + stream.send_data(data).await .map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?; } } diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 9795981..64d7b3e 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -72,15 +72,16 @@ const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_s /// If no new request arrives within this duration, the connection is closed. const DEFAULT_HTTP_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); +/// Default HTTP max connection lifetime (1 hour). +/// HTTP connections are forcefully closed after this duration regardless of activity. +const DEFAULT_HTTP_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(3600); + /// Default WebSocket inactivity timeout (1 hour). const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600); /// Default WebSocket max lifetime (24 hours). const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400); -/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled. -const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); - /// Protocol decision for backend connection. #[derive(Debug)] enum ProtocolDecision { @@ -222,6 +223,8 @@ pub struct HttpProxyService { protocol_cache: Arc, /// HTTP keep-alive idle timeout: close connection if no new request arrives within this duration. http_idle_timeout: std::time::Duration, + /// HTTP max connection lifetime: forcefully close connection after this duration regardless of activity. + http_max_lifetime: std::time::Duration, /// WebSocket inactivity timeout (no data in either direction). ws_inactivity_timeout: std::time::Duration, /// WebSocket maximum connection lifetime. @@ -248,6 +251,7 @@ impl HttpProxyService { connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()), protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()), http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT, + http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME, ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT, ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME, quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()), @@ -275,21 +279,24 @@ impl HttpProxyService { connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()), protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()), http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT, + http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME, ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT, ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME, quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()), } } - /// Set the HTTP keep-alive idle timeout, WebSocket inactivity timeout, and - /// WebSocket max lifetime from connection config values. + /// Set the HTTP keep-alive idle timeout, HTTP max lifetime, WebSocket inactivity + /// timeout, and WebSocket max lifetime from connection config values. pub fn set_connection_timeouts( &mut self, http_idle_timeout: std::time::Duration, + http_max_lifetime: std::time::Duration, ws_inactivity_timeout: std::time::Duration, ws_max_lifetime: std::time::Duration, ) { self.http_idle_timeout = http_idle_timeout; + self.http_max_lifetime = http_max_lifetime; self.ws_inactivity_timeout = ws_inactivity_timeout; self.ws_max_lifetime = ws_max_lifetime; } @@ -314,6 +321,15 @@ impl HttpProxyService { self.protocol_cache.clear(); } + /// Clean up expired entries in all per-route rate limiters. + /// Called from the background sampling task to prevent unbounded growth + /// when traffic stops after a burst of unique IPs. + pub fn cleanup_all_rate_limiters(&self) { + for entry in self.route_rate_limiters.iter() { + entry.value().cleanup(); + } + } + /// Snapshot the protocol cache for metrics/UI display. pub fn protocol_cache_snapshot(&self) -> Vec { self.protocol_cache.snapshot() @@ -354,6 +370,7 @@ impl HttpProxyService { // Capture timeouts before `self` is moved into the service closure. let idle_timeout = self.http_idle_timeout; + let max_lifetime = self.http_max_lifetime; // Activity tracker: updated at the START and END of each request. // The idle watchdog checks this to determine if the connection is idle @@ -412,15 +429,23 @@ impl HttpProxyService { } } _ = async { - // Idle watchdog: check every 5s whether the connection has been idle - // (no active requests AND no activity for idle_timeout). - // This avoids killing long-running requests or upgraded connections. + // Idle + lifetime watchdog: check every 5s whether the connection has been + // idle (no active requests AND no activity for idle_timeout) or exceeded + // the max connection lifetime. let check_interval = std::time::Duration::from_secs(5); let mut last_seen = 0u64; loop { tokio::time::sleep(check_interval).await; - // Never close while a request is in progress + // Check max connection lifetime (unconditional — even active connections + // must eventually be recycled to prevent resource accumulation). + if start.elapsed() >= max_lifetime { + debug!("HTTP connection exceeded max lifetime ({}s) from {}", + max_lifetime.as_secs(), peer_addr); + return; + } + + // Never close for idleness while a request is in progress if active_requests.load(Ordering::Relaxed) > 0 { last_seen = last_activity.load(Ordering::Relaxed); continue; @@ -437,7 +462,7 @@ impl HttpProxyService { last_seen = current; } } => { - debug!("HTTP connection idle timeout ({}s) from {}", idle_timeout.as_secs(), peer_addr); + debug!("HTTP connection timeout from {}", peer_addr); conn.as_mut().graceful_shutdown(); // Give any in-flight work 5s to drain after graceful shutdown let _ = tokio::time::timeout(std::time::Duration::from_secs(5), conn).await; @@ -791,10 +816,10 @@ impl HttpProxyService { }; // Try H3 pool checkout first - if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) { + if let Some((pooled_sr, quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) { self.metrics.backend_pool_hit(&upstream_key); let result = self.forward_h3( - quic_conn, parts, body, upstream_headers, &upstream_path, + quic_conn, Some(pooled_sr), parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); @@ -807,7 +832,7 @@ impl HttpProxyService { self.metrics.backend_pool_miss(&upstream_key); self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed()); let result = self.forward_h3( - quic_conn, parts, body, upstream_headers, &upstream_path, + quic_conn, None, parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); @@ -966,7 +991,7 @@ impl HttpProxyService { protocol: crate::connection_pool::PoolProtocol::H3, }; let result = self.forward_h3( - quic_conn, parts, body, upstream_headers, &upstream_path, + quic_conn, None, parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); @@ -1009,7 +1034,7 @@ impl HttpProxyService { protocol: crate::connection_pool::PoolProtocol::H3, }; let result = self.forward_h3( - quic_conn, parts, body, upstream_headers, &upstream_path, + quic_conn, None, parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); @@ -1068,7 +1093,7 @@ impl HttpProxyService { protocol: crate::connection_pool::PoolProtocol::H3, }; let result = self.forward_h3( - quic_conn, parts, body, upstream_headers, &upstream_path, + quic_conn, None, parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); @@ -1111,7 +1136,7 @@ impl HttpProxyService { protocol: crate::connection_pool::PoolProtocol::H3, }; let result = self.forward_h3( - quic_conn, parts, body, upstream_headers, &upstream_path, + quic_conn, None, parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, ).await; self.upstream_selector.connection_ended(&upstream_key); @@ -2744,7 +2769,12 @@ impl HttpProxyService { let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config) .expect("Failed to create QUIC client crypto config"); - let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto)); + + // Tune QUIC transport to match H2 flow-control: 2 MB per-stream receive window. + let mut transport = quinn::TransportConfig::default(); + transport.stream_receive_window(quinn::VarInt::from_u32(2 * 1024 * 1024)); + let mut client_config = quinn::ClientConfig::new(Arc::new(quic_crypto)); + client_config.transport_config(Arc::new(transport)); let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap()) .expect("Failed to create QUIC client endpoint"); @@ -2766,8 +2796,8 @@ impl HttpProxyService { let server_name = host.to_string(); let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?; - let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await - .map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??; + let connection = tokio::time::timeout(self.connect_timeout, connecting).await + .map_err(|_| format!("QUIC connect timeout ({:?}) for {}", self.connect_timeout, host))??; debug!("QUIC backend connection established to {}:{}", host, port); Ok(connection) @@ -2777,6 +2807,7 @@ impl HttpProxyService { async fn forward_h3( &self, quic_conn: quinn::Connection, + pooled_sender: Option>, parts: hyper::http::request::Parts, body: BoxBody, upstream_headers: hyper::HeaderMap, @@ -2789,33 +2820,42 @@ impl HttpProxyService { conn_activity: &ConnActivity, backend_key: &str, ) -> Result>, hyper::Error> { - let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone()); - let (mut driver, mut send_request) = match h3::client::builder() - .send_grease(false) - .build(h3_quinn_conn) - .await - { - Ok(pair) => pair, - Err(e) => { - error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed"); - self.metrics.backend_handshake_error(backend_key); - return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed")); - } - }; + // Obtain the h3 SendRequest handle: skip handshake + driver on pool hit. + let (mut send_request, gen_holder) = if let Some(sr) = pooled_sender { + // Pool hit — reuse existing h3 session, no SETTINGS round-trip + (sr, None) + } else { + // Fresh QUIC connection — full h3 handshake + driver spawn + let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone()); + let (mut driver, sr) = match h3::client::builder() + .send_grease(false) + .build(h3_quinn_conn) + .await + { + Ok(pair) => pair, + Err(e) => { + error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed"); + self.metrics.backend_handshake_error(backend_key); + return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed")); + } + }; - // Spawn the h3 connection driver - let driver_pool = Arc::clone(&self.connection_pool); - let driver_pool_key = pool_key.clone(); - let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX)); - let driver_gen = Arc::clone(&gen_holder); - tokio::spawn(async move { - let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await; - debug!("H3 connection driver closed: {:?}", close_err); - let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed); - if g != u64::MAX { - driver_pool.remove_h3_if_generation(&driver_pool_key, g); + let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX)); + { + let driver_pool = Arc::clone(&self.connection_pool); + let driver_pool_key = pool_key.clone(); + let driver_gen = Arc::clone(&gen_holder); + tokio::spawn(async move { + let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await; + debug!("H3 connection driver closed: {:?}", close_err); + let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed); + if g != u64::MAX { + driver_pool.remove_h3_if_generation(&driver_pool_key, g); + } + }); } - }); + (sr, Some(gen_holder)) + }; // Build the H3 request let uri = hyper::Uri::builder() @@ -2845,7 +2885,7 @@ impl HttpProxyService { } }; - // Stream request body + // Stream request body (zero-copy: into_data yields owned Bytes) let rid: Option> = route_id.map(Arc::from); let sip: Arc = Arc::from(source_ip); @@ -2855,9 +2895,9 @@ impl HttpProxyService { while let Some(frame) = body.frame().await { match frame { Ok(frame) => { - if let Some(data) = frame.data_ref() { + if let Ok(data) = frame.into_data() { self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip)); - if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await { + if let Err(e) = stream.send_data(data).await { error!(backend = %backend_key, error = %e, "H3 send_data failed"); return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed")); } @@ -2899,8 +2939,23 @@ impl HttpProxyService { ResponseFilter::apply_headers(route, headers, None); } - // Stream response body back via an adapter - let h3_body = H3ClientResponseBody { stream }; + // Stream response body back via unfold — correctly preserves waker across polls + let body_stream = futures::stream::unfold(stream, |mut s| async move { + match s.recv_data().await { + Ok(Some(mut buf)) => { + use bytes::Buf; + let data = buf.copy_to_bytes(buf.remaining()); + Some((Ok::<_, hyper::Error>(http_body::Frame::data(data)), s)) + } + Ok(None) => None, + Err(e) => { + warn!("H3 response body recv error: {}", e); + None + } + } + }); + let h3_body = http_body_util::StreamBody::new(body_stream); + let counting_body = CountingBody::new( h3_body, Arc::clone(&self.metrics), @@ -2917,10 +2972,16 @@ impl HttpProxyService { let body: BoxBody = BoxBody::new(counting_body); - // Register connection in pool on success + // Register connection in pool on success (fresh connections only) if status != StatusCode::BAD_GATEWAY { - let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn); - gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); + if let Some(gh) = gen_holder { + let g = self.connection_pool.register_h3( + pool_key.clone(), + quic_conn, + send_request, + ); + gh.store(g, std::sync::atomic::Ordering::Relaxed); + } } self.metrics.set_backend_protocol(backend_key, "h3"); @@ -2949,41 +3010,6 @@ fn parse_alt_svc_h3_port(header_value: &str) -> Option { None } -/// Response body adapter for H3 client responses. -/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`. -struct H3ClientResponseBody { - stream: h3::client::RequestStream, Bytes>, -} - -impl http_body::Body for H3ClientResponseBody { - type Data = Bytes; - type Error = hyper::Error; - - fn poll_frame( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - // h3's recv_data is async, so we need to poll it manually. - // Use a small future to poll the recv_data call. - use std::future::Future; - let mut fut = Box::pin(self.stream.recv_data()); - match fut.as_mut().poll(_cx) { - Poll::Ready(Ok(Some(mut buf))) => { - use bytes::Buf; - let data = Bytes::copy_from_slice(buf.chunk()); - buf.advance(buf.remaining()); - Poll::Ready(Some(Ok(http_body::Frame::data(data)))) - } - Poll::Ready(Ok(None)) => Poll::Ready(None), - Poll::Ready(Err(e)) => { - warn!("H3 response body recv error: {}", e); - Poll::Ready(None) - } - Poll::Pending => Poll::Pending, - } - } -} - /// Insecure certificate verifier for backend TLS connections (fallback only). /// The production path uses the shared config from tls_handler which has the same /// behavior but with session resumption across all outbound connections. @@ -3052,6 +3078,7 @@ impl Default for HttpProxyService { connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()), protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()), http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT, + http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME, ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT, ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME, quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()), diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 7522bde..8837e6b 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -624,6 +624,24 @@ impl MetricsCollector { self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k)); + + // Safety-net: prune orphaned backend error/stats entries for backends + // that have no active or total connections (error-only backends). + // These accumulate when backend_connect_error/backend_handshake_error + // create entries but backend_connection_opened is never called. + let known_backends: HashSet = self.backend_active.iter() + .map(|e| e.key().clone()) + .chain(self.backend_total.iter().map(|e| e.key().clone())) + .collect(); + self.backend_connect_errors.retain(|k, _| known_backends.contains(k)); + self.backend_handshake_errors.retain(|k, _| known_backends.contains(k)); + self.backend_request_errors.retain(|k, _| known_backends.contains(k)); + self.backend_connect_time_us.retain(|k, _| known_backends.contains(k)); + self.backend_connect_count.retain(|k, _| known_backends.contains(k)); + self.backend_pool_hits.retain(|k, _| known_backends.contains(k)); + self.backend_pool_misses.retain(|k, _| known_backends.contains(k)); + self.backend_h2_failures.retain(|k, _| known_backends.contains(k)); + self.backend_protocol.retain(|k, _| known_backends.contains(k)); } /// Remove per-route metrics for route IDs that are no longer active. diff --git a/rust/crates/rustproxy-passthrough/src/lib.rs b/rust/crates/rustproxy-passthrough/src/lib.rs index c843aba..28424a7 100644 --- a/rust/crates/rustproxy-passthrough/src/lib.rs +++ b/rust/crates/rustproxy-passthrough/src/lib.rs @@ -10,7 +10,6 @@ pub mod forwarder; pub mod proxy_protocol; pub mod tls_handler; pub mod connection_tracker; -pub mod socket_relay; pub mod socket_opts; pub mod udp_session; pub mod udp_listener; @@ -22,7 +21,6 @@ pub use forwarder::*; pub use proxy_protocol::*; pub use tls_handler::*; pub use connection_tracker::*; -pub use socket_relay::*; pub use socket_opts::*; pub use udp_session::*; pub use udp_listener::*; diff --git a/rust/crates/rustproxy-passthrough/src/socket_relay.rs b/rust/crates/rustproxy-passthrough/src/socket_relay.rs index 671d353..753d677 100644 --- a/rust/crates/rustproxy-passthrough/src/socket_relay.rs +++ b/rust/crates/rustproxy-passthrough/src/socket_relay.rs @@ -1,126 +1,4 @@ -//! Socket handler relay for connecting client connections to a TypeScript handler -//! via a Unix domain socket. +//! Socket handler relay module. //! -//! Protocol: Send a JSON metadata line terminated by `\n`, then bidirectional relay. - -use tokio::net::UnixStream; -use tokio::io::{AsyncWriteExt, AsyncReadExt}; -use tokio::net::TcpStream; -use serde::Serialize; -use tracing::debug; - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct RelayMetadata { - connection_id: u64, - remote_ip: String, - remote_port: u16, - local_port: u16, - sni: Option, - route_name: String, - initial_data_base64: Option, -} - -/// Relay a client connection to a TypeScript handler via Unix domain socket. -/// -/// Protocol: Send a JSON metadata line terminated by `\n`, then bidirectional relay. -pub async fn relay_to_handler( - client: TcpStream, - relay_socket_path: &str, - connection_id: u64, - remote_ip: String, - remote_port: u16, - local_port: u16, - sni: Option, - route_name: String, - initial_data: Option<&[u8]>, -) -> std::io::Result<()> { - debug!( - "Relaying connection {} to handler socket {}", - connection_id, relay_socket_path - ); - - // Connect to TypeScript handler Unix socket - let mut handler = UnixStream::connect(relay_socket_path).await?; - - // Build and send metadata header - let initial_data_base64 = initial_data.map(base64_encode); - - let metadata = RelayMetadata { - connection_id, - remote_ip, - remote_port, - local_port, - sni, - route_name, - initial_data_base64, - }; - - let metadata_json = serde_json::to_string(&metadata) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - - handler.write_all(metadata_json.as_bytes()).await?; - handler.write_all(b"\n").await?; - - // Bidirectional relay between client and handler - let (mut client_read, mut client_write) = client.into_split(); - let (mut handler_read, mut handler_write) = handler.into_split(); - - let c2h = tokio::spawn(async move { - let mut buf = vec![0u8; 65536]; - loop { - let n = match client_read.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, - }; - if handler_write.write_all(&buf[..n]).await.is_err() { - break; - } - } - let _ = handler_write.shutdown().await; - }); - - let h2c = tokio::spawn(async move { - let mut buf = vec![0u8; 65536]; - loop { - let n = match handler_read.read(&mut buf).await { - Ok(0) | Err(_) => break, - Ok(n) => n, - }; - if client_write.write_all(&buf[..n]).await.is_err() { - break; - } - } - let _ = client_write.shutdown().await; - }); - - let _ = tokio::join!(c2h, h2c); - - debug!("Relay connection {} completed", connection_id); - Ok(()) -} - -/// Simple base64 encoding without external dependency. -fn base64_encode(data: &[u8]) -> String { - const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - let mut result = String::new(); - for chunk in data.chunks(3) { - let b0 = chunk[0] as u32; - let b1 = if chunk.len() > 1 { chunk[1] as u32 } else { 0 }; - let b2 = if chunk.len() > 2 { chunk[2] as u32 } else { 0 }; - let n = (b0 << 16) | (b1 << 8) | b2; - result.push(CHARS[((n >> 18) & 0x3F) as usize] as char); - result.push(CHARS[((n >> 12) & 0x3F) as usize] as char); - if chunk.len() > 1 { - result.push(CHARS[((n >> 6) & 0x3F) as usize] as char); - } else { - result.push('='); - } - if chunk.len() > 2 { - result.push(CHARS[(n & 0x3F) as usize] as char); - } else { - result.push('='); - } - } - result -} +//! Note: The actual relay logic lives in `tcp_listener::relay_to_socket_handler()` +//! which has proper timeouts, cancellation, and metrics integration. diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index 6a67a1a..1eee236 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -182,6 +182,7 @@ impl TcpListenerManager { http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn()); http_proxy_svc.set_connection_timeouts( std::time::Duration::from_millis(conn_config.socket_timeout_ms), + std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms), std::time::Duration::from_millis(conn_config.socket_timeout_ms), std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms), ); @@ -220,6 +221,7 @@ impl TcpListenerManager { http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn()); http_proxy_svc.set_connection_timeouts( std::time::Duration::from_millis(conn_config.socket_timeout_ms), + std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms), std::time::Duration::from_millis(conn_config.socket_timeout_ms), std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms), ); @@ -263,6 +265,7 @@ impl TcpListenerManager { http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn()); http_proxy_svc.set_connection_timeouts( std::time::Duration::from_millis(config.socket_timeout_ms), + std::time::Duration::from_millis(config.max_connection_lifetime_ms), std::time::Duration::from_millis(config.socket_timeout_ms), std::time::Duration::from_millis(config.max_connection_lifetime_ms), ); diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index e48fdc9..e0b9a8a 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -363,6 +363,7 @@ impl RustProxy { // Start the throughput sampling task with cooperative cancellation let metrics = Arc::clone(&self.metrics); let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone(); + let http_proxy = self.listener_manager.as_ref().unwrap().http_proxy().clone(); let interval_ms = self.options.metrics.as_ref() .and_then(|m| m.sample_interval_ms) .unwrap_or(1000); @@ -378,6 +379,9 @@ impl RustProxy { metrics.sample_all(); // Periodically clean up stale rate-limit timestamp entries conn_tracker.cleanup_stale_timestamps(); + // Clean up expired rate limiter entries to prevent unbounded + // growth from unique IPs after traffic stops + http_proxy.cleanup_all_rate_limiters(); } } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 7cedbcb..16924af 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '26.2.3', + version: '26.2.4', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/datagram-handler-server.ts b/ts/proxies/smart-proxy/datagram-handler-server.ts index 1596652..627029e 100644 --- a/ts/proxies/smart-proxy/datagram-handler-server.ts +++ b/ts/proxies/smart-proxy/datagram-handler-server.ts @@ -26,6 +26,8 @@ interface IDatagramRelayMessage { * - TS→Rust: { type: "reply", sourceIp, sourcePort, destPort, payloadBase64 } */ export class DatagramHandlerServer { + private static readonly MAX_BUFFER_SIZE = 50 * 1024 * 1024; // 50 MB + private server: plugins.net.Server | null = null; private connection: plugins.net.Socket | null = null; private socketPath: string; @@ -100,6 +102,11 @@ export class DatagramHandlerServer { socket.on('data', (chunk: Buffer) => { this.readBuffer = Buffer.concat([this.readBuffer, chunk]); + if (this.readBuffer.length > DatagramHandlerServer.MAX_BUFFER_SIZE) { + logger.log('error', `DatagramHandlerServer: buffer exceeded ${DatagramHandlerServer.MAX_BUFFER_SIZE} bytes, resetting`); + this.readBuffer = Buffer.alloc(0); + return; + } this.processFrames(); }); diff --git a/ts/proxies/smart-proxy/smart-proxy.ts b/ts/proxies/smart-proxy/smart-proxy.ts index cfba498..d122bdc 100644 --- a/ts/proxies/smart-proxy/smart-proxy.ts +++ b/ts/proxies/smart-proxy/smart-proxy.ts @@ -128,6 +128,7 @@ export class SmartProxy extends plugins.EventEmitter { } // Handle unexpected exit (only emits error if not intentionally stopping) + this.bridge.removeAllListeners('exit'); this.bridge.on('exit', (code: number | null, signal: string | null) => { if (this.stopping) return; logger.log('error', `RustProxy exited unexpectedly (code=${code}, signal=${signal})`, { component: 'smart-proxy' });