From 02b4ed801839670837fb966550ce7f5f3b84c613 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 16 Feb 2026 14:35:26 +0000 Subject: [PATCH] fix(metrics): centralize connection-closed reporting via ConnectionGuard and remove duplicate explicit metrics.connection_closed calls --- changelog.md | 8 +++++ .../rustproxy-http/src/proxy_service.rs | 35 +------------------ .../rustproxy-passthrough/src/tcp_listener.rs | 17 +-------- ts/00_commitinfo_data.ts | 2 +- 4 files changed, 11 insertions(+), 51 deletions(-) diff --git a/changelog.md b/changelog.md index e302037..f1251ed 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-02-16 - 25.7.3 - fix(metrics) +centralize connection-closed reporting via ConnectionGuard and remove duplicate explicit metrics.connection_closed calls + +- Removed numerous explicit metrics.connection_closed calls from rust/crates/rustproxy-http/src/proxy_service.rs so connection teardown and byte counting are handled by the connection guard / counting body instead of ad-hoc calls. +- Simplified ConnectionGuard in rust/crates/rustproxy-passthrough/src/tcp_listener.rs: removed the disarm flag and disarm() method so Drop always reports connection_closed. +- Stopped disarming the TCP-level guard when handing connections off to HTTP proxy paths (HTTP/WebSocket/streaming flows) to avoid missing or double-reporting metrics. +- Fixes incorrect/duplicate connection-closed metric emission and ensures consistent byte/connection accounting during streaming and WebSocket upgrades. + ## 2026-02-16 - 25.7.2 - fix(rustproxy-http) preserve original Host header when proxying and add X-Forwarded-* headers; add TLS WebSocket echo backend helper and integration test for terminate-and-reencrypt websocket diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 5ffa5aa..4bc5b38 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -309,12 +309,10 @@ impl HttpProxyService { let route_id = route_match.route.id.as_deref(); let ip_str = peer_addr.ip().to_string(); self.metrics.record_http_request(); - self.metrics.connection_opened(route_id, Some(&ip_str)); // Apply request filters (IP check, rate limiting, auth) if let Some(ref security) = route_match.route.security { if let Some(response) = RequestFilter::apply(security, &req, &peer_addr) { - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(response); } } @@ -322,7 +320,6 @@ impl HttpProxyService { // Check for test response (returns immediately, no upstream needed) if let Some(ref advanced) = route_match.route.action.advanced { if let Some(ref test_response) = advanced.test_response { - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(Self::build_test_response(test_response)); } } @@ -330,7 +327,6 @@ impl HttpProxyService { // Check for static file serving if let Some(ref advanced) = route_match.route.action.advanced { if let Some(ref static_files) = advanced.static_files { - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(Self::serve_static_file(&path, static_files)); } } @@ -339,7 +335,6 @@ impl HttpProxyService { let target = match route_match.target { Some(t) => t, None => { - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::BAD_GATEWAY, "No target available")); } }; @@ -459,13 +454,11 @@ impl HttpProxyService { Ok(Err(e)) => { error!("Failed TLS connect to upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(&upstream_key); - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable")); } Err(_) => { error!("Upstream TLS connect timeout for {}:{}", upstream.host, upstream.port); self.upstream_selector.connection_ended(&upstream_key); - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout")); } } @@ -481,13 +474,11 @@ impl HttpProxyService { Ok(Err(e)) => { error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(&upstream_key); - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } Err(_) => { error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port); self.upstream_selector.connection_ended(&upstream_key); - self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); } } @@ -523,7 +514,6 @@ impl HttpProxyService { Ok(h) => h, Err(e) => { error!("Upstream handshake failed: {}", e); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed")); } }; @@ -559,7 +549,6 @@ impl HttpProxyService { Ok(resp) => resp, Err(e) => { error!("Upstream request failed: {}", e); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed")); } }; @@ -585,7 +574,6 @@ impl HttpProxyService { Ok(h) => h, Err(e) => { error!("HTTP/2 upstream handshake failed: {}", e); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed")); } }; @@ -620,7 +608,6 @@ impl HttpProxyService { Ok(resp) => resp, Err(e) => { error!("HTTP/2 upstream request failed: {}", e); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed")); } }; @@ -631,8 +618,7 @@ impl HttpProxyService { /// Build the client-facing response from an upstream response, streaming the body. /// /// The response body is wrapped in a `CountingBody` that counts bytes as they - /// stream from upstream to client. When the body is fully consumed (or dropped), - /// it reports byte counts to the metrics collector and calls `connection_closed`. + /// stream from upstream to client. async fn build_streaming_response( &self, upstream_response: Response, @@ -661,11 +647,6 @@ impl HttpProxyService { Direction::Out, ); - // Close the connection metric now — the HTTP request/response cycle is done - // from the proxy's perspective once we hand the streaming body to hyper. - // Bytes will still be counted as they flow. - self.metrics.connection_closed(route_id, Some(source_ip)); - let body: BoxBody = BoxBody::new(counting_body); Ok(response.body(body).unwrap()) @@ -697,7 +678,6 @@ impl HttpProxyService { .unwrap_or(""); if !allowed_origins.is_empty() && !allowed_origins.iter().any(|o| o == "*" || o == origin) { self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::FORBIDDEN, "Origin not allowed")); } } @@ -715,13 +695,11 @@ impl HttpProxyService { Ok(Err(e)) => { error!("WebSocket: failed TLS connect upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable")); } Err(_) => { error!("WebSocket: upstream TLS connect timeout for {}:{}", upstream.host, upstream.port); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout")); } } @@ -737,13 +715,11 @@ impl HttpProxyService { Ok(Err(e)) => { error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } Err(_) => { error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); } } @@ -836,7 +812,6 @@ impl HttpProxyService { if let Err(e) = upstream_stream.write_all(raw_request.as_bytes()).await { error!("WebSocket: failed to send upgrade request to upstream: {}", e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend write failed")); } @@ -847,7 +822,6 @@ impl HttpProxyService { Ok(0) => { error!("WebSocket: upstream closed before completing handshake"); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed")); } Ok(_) => { @@ -861,14 +835,12 @@ impl HttpProxyService { if response_buf.len() > 8192 { error!("WebSocket: upstream response headers too large"); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend response too large")); } } Err(e) => { error!("WebSocket: failed to read upstream response: {}", e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend read failed")); } } @@ -886,7 +858,6 @@ impl HttpProxyService { if status_code != 101 { debug!("WebSocket: upstream rejected upgrade with status {}", status_code); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response( StatusCode::from_u16(status_code).unwrap_or(StatusCode::BAD_GATEWAY), "WebSocket upgrade rejected by backend", @@ -930,9 +901,6 @@ impl HttpProxyService { Err(e) => { debug!("WebSocket: client upgrade failed: {}", e); upstream_selector.connection_ended(&upstream_key_owned); - if let Some(ref rid) = route_id_owned { - metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned)); - } return; } }; @@ -1037,7 +1005,6 @@ impl HttpProxyService { upstream_selector.connection_ended(&upstream_key_owned); if let Some(ref rid) = route_id_owned { metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str()), Some(&source_ip_owned)); - metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned)); } }); diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index f168a97..7b98ad1 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -22,7 +22,6 @@ struct ConnectionGuard { metrics: Arc, route_id: Option, source_ip: Option, - disarmed: bool, } impl ConnectionGuard { @@ -31,22 +30,13 @@ impl ConnectionGuard { metrics, route_id: route_id.map(|s| s.to_string()), source_ip: source_ip.map(|s| s.to_string()), - disarmed: false, } } - - /// Disarm the guard — prevents the Drop from running. - /// Use when handing off to a path that manages its own cleanup (e.g., HTTP proxy). - fn disarm(mut self) { - self.disarmed = true; - } } impl Drop for ConnectionGuard { fn drop(&mut self) { - if !self.disarmed { - self.metrics.connection_closed(self.route_id.as_deref(), self.source_ip.as_deref()); - } + self.metrics.connection_closed(self.route_id.as_deref(), self.source_ip.as_deref()); } } @@ -844,8 +834,6 @@ impl TcpListenerManager { "TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})", peer_addr, target_host, target_port, domain ); - // HTTP proxy manages its own per-request metrics — disarm TCP-level guard - _conn_guard.disarm(); http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await; } else { debug!( @@ -917,7 +905,6 @@ impl TcpListenerManager { "TLS Terminate+Reencrypt + HTTP: {} (domain: {:?})", peer_addr, domain ); - _conn_guard.disarm(); http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await; } else { // Non-HTTP: TLS-to-TLS tunnel (existing behavior for raw TCP protocols) @@ -937,8 +924,6 @@ impl TcpListenerManager { if is_http { // Plain HTTP - use HTTP proxy for request-level routing debug!("HTTP proxy: {} on port {}", peer_addr, port); - // HTTP proxy manages its own per-request metrics — disarm TCP-level guard - _conn_guard.disarm(); http_proxy.handle_connection(stream, peer_addr, port, cancel.clone()).await; Ok(()) } else { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 0e8c242..d90222e 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.7.2', + version: '25.7.3', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }