From 1ad3e61c15bf3e90f978f46f2b9cafada110b34d Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 4 Apr 2026 18:54:05 +0000 Subject: [PATCH] fix(metrics): correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic --- changelog.md | 8 ++ deno.lock | 8 +- rust/crates/rustproxy-http/src/h3_service.rs | 5 +- .../rustproxy-http/src/proxy_service.rs | 116 ++++++++++++++---- .../crates/rustproxy-metrics/src/collector.rs | 19 +-- ts/00_commitinfo_data.ts | 2 +- .../smart-proxy/rust-metrics-adapter.ts | 25 +--- 7 files changed, 125 insertions(+), 58 deletions(-) diff --git a/changelog.md b/changelog.md index 1c8a6f1..3054710 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-04-04 - 27.3.1 - fix(metrics) +correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic + +- move frontend protocol accounting from per-request to connection lifetime tracking for HTTP/1, HTTP/2, and HTTP/3 +- add backend protocol guards to connection drivers so active protocol metrics reflect live upstream connections +- prevent protocol counter underflow by using atomic saturating decrements in the metrics collector +- read backend protocol distribution directly from cached aggregate counters in the Rust metrics adapter + ## 2026-04-04 - 27.3.0 - feat(test) add end-to-end WebSocket proxy test coverage diff --git a/deno.lock b/deno.lock index 600047a..d88cf3c 100644 --- a/deno.lock +++ b/deno.lock @@ -12,9 +12,11 @@ "npm:@push.rocks/smartserve@^2.0.3": "2.0.3", "npm:@tsclass/tsclass@^9.5.0": "9.5.0", "npm:@types/node@^25.5.0": "25.5.0", + "npm:@types/ws@^8.18.1": "8.18.1", "npm:minimatch@^10.2.4": "10.2.4", "npm:typescript@^6.0.2": "6.0.2", - "npm:why-is-node-running@^3.2.2": "3.2.2" + "npm:why-is-node-running@^3.2.2": "3.2.2", + "npm:ws@^8.20.0": "8.20.0" }, "npm": { "@api.global/typedrequest-interfaces@2.0.2": { @@ -6743,9 +6745,11 @@ "npm:@push.rocks/smartserve@^2.0.3", "npm:@tsclass/tsclass@^9.5.0", "npm:@types/node@^25.5.0", + "npm:@types/ws@^8.18.1", "npm:minimatch@^10.2.4", "npm:typescript@^6.0.2", - "npm:why-is-node-running@^3.2.2" + "npm:why-is-node-running@^3.2.2", + "npm:ws@^8.20.0" ] } } diff --git a/rust/crates/rustproxy-http/src/h3_service.rs b/rust/crates/rustproxy-http/src/h3_service.rs index 0d05ea3..92371af 100644 --- a/rust/crates/rustproxy-http/src/h3_service.rs +++ b/rust/crates/rustproxy-http/src/h3_service.rs @@ -18,7 +18,7 @@ use tracing::{debug, warn}; use rustproxy_config::RouteConfig; use tokio_util::sync::CancellationToken; -use crate::proxy_service::{ConnActivity, HttpProxyService}; +use crate::proxy_service::{ConnActivity, HttpProxyService, ProtocolGuard}; /// HTTP/3 proxy service. /// @@ -48,6 +48,9 @@ impl H3ProxyService { let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address()); debug!("HTTP/3 connection from {} on port {}", remote_addr, port); + // Track frontend H3 connection for the QUIC connection's lifetime. + let _frontend_h3_guard = ProtocolGuard::frontend(Arc::clone(self.http_proxy.metrics()), "h3"); + let mut h3_conn: h3::server::Connection = h3::server::builder() .send_grease(false) diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 562aa88..c3632eb 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -140,6 +140,38 @@ impl Drop for ProtocolGuard { } } +/// Connection-level frontend protocol tracker. +/// +/// In `handle_io`, the HTTP protocol (h1 vs h2) is unknown until the first request +/// arrives. This struct uses `OnceLock` so the first request detects the protocol +/// and opens the counter; subsequent requests on the same connection are no-ops. +/// On Drop (when the connection ends), the counter is closed. +pub(crate) struct FrontendProtocolTracker { + metrics: Arc, + proto: std::sync::OnceLock<&'static str>, +} + +impl FrontendProtocolTracker { + fn new(metrics: Arc) -> Self { + Self { metrics, proto: std::sync::OnceLock::new() } + } + + /// Set the frontend protocol. Only the first call opens the counter. + fn set(&self, proto: &'static str) { + if self.proto.set(proto).is_ok() { + self.metrics.frontend_protocol_opened(proto); + } + } +} + +impl Drop for FrontendProtocolTracker { + fn drop(&mut self) { + if let Some(proto) = self.proto.get() { + self.metrics.frontend_protocol_closed(proto); + } + } +} + /// Backend stream that can be either plain TCP or TLS-wrapped. /// Used for `terminate-and-reencrypt` mode where the backend requires TLS. pub(crate) enum BackendStream { @@ -365,6 +397,11 @@ impl HttpProxyService { self.protocol_cache.snapshot() } + /// Access the shared metrics collector (used by H3ProxyService for protocol tracking). + pub fn metrics(&self) -> &Arc { + &self.metrics + } + /// Handle an incoming HTTP connection on a plain TCP stream. pub async fn handle_connection( self: Arc, @@ -409,10 +446,24 @@ impl HttpProxyService { let active_requests = Arc::new(AtomicU64::new(0)); let start = std::time::Instant::now(); + // Connection-level frontend protocol tracker: the first request detects + // h1 vs h2 from req.version() and opens the counter. On connection close + // (when handle_io returns), Drop closes the counter. + let frontend_tracker = Arc::new(FrontendProtocolTracker::new(Arc::clone(&self.metrics))); + let ft_inner = Arc::clone(&frontend_tracker); + let la_inner = Arc::clone(&last_activity); let ar_inner = Arc::clone(&active_requests); let cancel_inner = cancel.clone(); let service = hyper::service::service_fn(move |req: Request| { + // Detect frontend protocol from the first request on this connection. + // OnceLock ensures only the first call opens the counter. + let proto: &'static str = match req.version() { + hyper::Version::HTTP_2 => "h2", + _ => "h1", + }; + ft_inner.set(proto); + // Mark request start — RAII guard decrements on drop (panic-safe) la_inner.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); let req_guard = ActiveRequestGuard::new(Arc::clone(&ar_inner)); @@ -655,17 +706,8 @@ impl HttpProxyService { .map(|p| p.as_str().eq_ignore_ascii_case("websocket")) .unwrap_or(false); - // Track frontend protocol for distribution metrics (h1/h2/h3/ws) - let frontend_proto: &'static str = if is_h1_websocket || is_h2_websocket { - "ws" - } else { - match req.version() { - hyper::Version::HTTP_2 => "h2", - hyper::Version::HTTP_3 => "h3", - _ => "h1", // HTTP/1.0, HTTP/1.1 - } - }; - let _frontend_proto_guard = ProtocolGuard::frontend(Arc::clone(&self.metrics), frontend_proto); + // Frontend protocol is tracked at the connection level (handle_io / h3_service). + // WebSocket tunnels additionally get their own "ws" guards in the spawned task. if is_h1_websocket || is_h2_websocket { let result = self.handle_websocket_upgrade( @@ -1275,13 +1317,18 @@ impl HttpProxyService { } }; - tokio::spawn(async move { - match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await { - Ok(Err(e)) => debug!("Upstream connection error: {}", e), - Err(_) => debug!("H1 connection driver timed out after 300s"), - _ => {} - } - }); + { + let driver_metrics = Arc::clone(&self.metrics); + tokio::spawn(async move { + // Track backend H1 connection for the driver's lifetime + let _proto_guard = ProtocolGuard::backend(driver_metrics, "h1"); + match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await { + Ok(Err(e)) => debug!("Upstream connection error: {}", e), + Err(_) => debug!("H1 connection driver timed out after 300s"), + _ => {} + } + }); + } self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, domain, conn_activity, backend_key).await } @@ -1402,7 +1449,10 @@ impl HttpProxyService { let pool = Arc::clone(&self.connection_pool); let key = pool_key.clone(); let gen = Arc::clone(&gen_holder); + let driver_metrics = Arc::clone(&self.metrics); tokio::spawn(async move { + // Track backend H2 connection for the driver's lifetime + let _proto_guard = ProtocolGuard::backend(driver_metrics, "h2"); if let Err(e) = conn.await { warn!("HTTP/2 upstream connection error: {} ({:?})", e, e); } @@ -1701,7 +1751,10 @@ impl HttpProxyService { let pool = Arc::clone(&self.connection_pool); let key = pool_key.clone(); let gen = Arc::clone(&gen_holder); + let driver_metrics = Arc::clone(&self.metrics); tokio::spawn(async move { + // Track backend H2 connection for the driver's lifetime + let _proto_guard = ProtocolGuard::backend(driver_metrics, "h2"); if let Err(e) = conn.await { warn!("HTTP/2 upstream connection error: {} ({:?})", e, e); } @@ -1871,13 +1924,18 @@ impl HttpProxyService { } }; - tokio::spawn(async move { - match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await { - Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e), - Err(_) => debug!("H1 fallback: connection driver timed out after 300s"), - _ => {} - } - }); + { + let driver_metrics = Arc::clone(&self.metrics); + tokio::spawn(async move { + // Track backend H1 connection for the driver's lifetime + let _proto_guard = ProtocolGuard::backend(driver_metrics, "h1"); + match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await { + Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e), + Err(_) => debug!("H1 fallback: connection driver timed out after 300s"), + _ => {} + } + }); + } let mut upstream_req = Request::builder() .method(method) @@ -2425,7 +2483,10 @@ impl HttpProxyService { selector: upstream_selector, key: upstream_key_owned.clone(), }; - // Track backend WebSocket connection — guard decrements on tunnel close + // Track WebSocket tunnel as "ws" on both frontend and backend. + // Frontend h1/h2 is tracked at the connection level (handle_io); this + // additional "ws" guard captures the tunnel's lifetime independently. + let _frontend_ws_guard = ProtocolGuard::frontend(Arc::clone(&metrics), "ws"); let _backend_ws_guard = ProtocolGuard::backend(Arc::clone(&metrics), "ws"); let client_upgraded = match on_client_upgrade.await { @@ -2889,7 +2950,10 @@ impl HttpProxyService { let driver_pool = Arc::clone(&self.connection_pool); let driver_pool_key = pool_key.clone(); let driver_gen = Arc::clone(&gen_holder); + let driver_metrics = Arc::clone(&self.metrics); tokio::spawn(async move { + // Track backend H3 connection for the driver's lifetime + let _proto_guard = ProtocolGuard::backend(driver_metrics, "h3"); let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await; debug!("H3 connection driver closed: {:?}", close_err); let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed); diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index eff590d..862c160 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -506,13 +506,14 @@ impl MetricsCollector { total.fetch_add(1, Ordering::Relaxed); } - /// Record a frontend request/connection closed with a given protocol. + /// Record a frontend connection closed with a given protocol. pub fn frontend_protocol_closed(&self, proto: &str) { let (active, _) = self.frontend_proto_counters(proto); - let val = active.load(Ordering::Relaxed); - if val > 0 { - active.fetch_sub(1, Ordering::Relaxed); - } + // Atomic saturating decrement — avoids TOCTOU race where concurrent + // closes could both read val=1, both subtract, wrapping to u64::MAX. + active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { + if v > 0 { Some(v - 1) } else { None } + }).ok(); } /// Record a backend connection opened with a given protocol. @@ -525,10 +526,10 @@ impl MetricsCollector { /// Record a backend connection closed with a given protocol. pub fn backend_protocol_closed(&self, proto: &str) { let (active, _) = self.backend_proto_counters(proto); - let val = active.load(Ordering::Relaxed); - if val > 0 { - active.fetch_sub(1, Ordering::Relaxed); - } + // Atomic saturating decrement — see frontend_protocol_closed for rationale. + active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { + if v > 0 { Some(v - 1) } else { None } + }).ok(); } // ── Per-backend recording methods ── diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index d8cdd98..22a2cd1 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '27.3.0', + version: '27.3.1', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/rust-metrics-adapter.ts b/ts/proxies/smart-proxy/rust-metrics-adapter.ts index 5a21e82..5617583 100644 --- a/ts/proxies/smart-proxy/rust-metrics-adapter.ts +++ b/ts/proxies/smart-proxy/rust-metrics-adapter.ts @@ -106,27 +106,14 @@ export class RustMetricsAdapter implements IMetrics { }; }, backendProtocols: (): IProtocolDistribution => { - // Merge per-backend h1/h2/h3 data with aggregate ws/other counters const bp = this.cache?.backendProtocols; - let h1Active = 0, h1Total = 0; - let h2Active = 0, h2Total = 0; - let h3Active = 0, h3Total = 0; - if (this.cache?.backends) { - for (const bm of Object.values(this.cache.backends)) { - const m = bm as any; - const active = m.activeConnections ?? 0; - const total = m.totalConnections ?? 0; - switch (m.protocol) { - case 'h2': h2Active += active; h2Total += total; break; - case 'h3': h3Active += active; h3Total += total; break; - default: h1Active += active; h1Total += total; break; - } - } - } return { - h1Active, h1Total, - h2Active, h2Total, - h3Active, h3Total, + h1Active: bp?.h1Active ?? 0, + h1Total: bp?.h1Total ?? 0, + h2Active: bp?.h2Active ?? 0, + h2Total: bp?.h2Total ?? 0, + h3Active: bp?.h3Active ?? 0, + h3Total: bp?.h3Total ?? 0, wsActive: bp?.wsActive ?? 0, wsTotal: bp?.wsTotal ?? 0, otherActive: bp?.otherActive ?? 0,