From b04eb0ab17f6b9fbc687fd15130f6c115968c2ac Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 4 Apr 2026 16:52:25 +0000 Subject: [PATCH] feat(metrics): add frontend and backend protocol distribution metrics --- changelog.md | 7 + .../rustproxy-http/src/proxy_service.rs | 44 ++++++ .../crates/rustproxy-metrics/src/collector.rs | 144 ++++++++++++++++++ .../rustproxy-passthrough/src/tcp_listener.rs | 36 +++++ ts/00_commitinfo_data.ts | 2 +- .../smart-proxy/models/metrics-types.ts | 19 +++ .../smart-proxy/rust-metrics-adapter.ts | 45 +++++- 7 files changed, 295 insertions(+), 2 deletions(-) diff --git a/changelog.md b/changelog.md index ff13ff2..587c1db 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-04 - 27.2.0 - feat(metrics) +add frontend and backend protocol distribution metrics + +- track active and total frontend protocol counts for h1, h2, h3, websocket, and other traffic +- add backend protocol counters with RAII guards to ensure metrics are decremented on all exit paths +- expose protocol distribution through the TypeScript metrics interfaces and Rust metrics adapter + ## 2026-03-27 - 27.1.0 - feat(rustproxy-passthrough) add selective connection recycling for route, security, and certificate updates diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 64d7b3e..562aa88 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -110,6 +110,36 @@ impl Drop for ActiveRequestGuard { } } +/// RAII guard that calls frontend_protocol_closed or backend_protocol_closed on drop. +/// Ensures active protocol counters are decremented on all exit paths. +pub(crate) struct ProtocolGuard { + metrics: Arc, + version: &'static str, + is_frontend: bool, +} + +impl ProtocolGuard { + pub fn frontend(metrics: Arc, version: &'static str) -> Self { + metrics.frontend_protocol_opened(version); + Self { metrics, version, is_frontend: true } + } + + pub fn backend(metrics: Arc, version: &'static str) -> Self { + metrics.backend_protocol_opened(version); + Self { metrics, version, is_frontend: false } + } +} + +impl Drop for ProtocolGuard { + fn drop(&mut self) { + if self.is_frontend { + self.metrics.frontend_protocol_closed(self.version); + } else { + self.metrics.backend_protocol_closed(self.version); + } + } +} + /// Backend stream that can be either plain TCP or TLS-wrapped. /// Used for `terminate-and-reencrypt` mode where the backend requires TLS. pub(crate) enum BackendStream { @@ -625,6 +655,18 @@ impl HttpProxyService { .map(|p| p.as_str().eq_ignore_ascii_case("websocket")) .unwrap_or(false); + // Track frontend protocol for distribution metrics (h1/h2/h3/ws) + let frontend_proto: &'static str = if is_h1_websocket || is_h2_websocket { + "ws" + } else { + match req.version() { + hyper::Version::HTTP_2 => "h2", + hyper::Version::HTTP_3 => "h3", + _ => "h1", // HTTP/1.0, HTTP/1.1 + } + }; + let _frontend_proto_guard = ProtocolGuard::frontend(Arc::clone(&self.metrics), frontend_proto); + if is_h1_websocket || is_h2_websocket { let result = self.handle_websocket_upgrade( req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, &ip_str, is_h2_websocket, @@ -2383,6 +2425,8 @@ impl HttpProxyService { selector: upstream_selector, key: upstream_key_owned.clone(), }; + // Track backend WebSocket connection — guard decrements on tunnel close + let _backend_ws_guard = ProtocolGuard::backend(Arc::clone(&metrics), "ws"); let client_upgraded = match on_client_upgrade.await { Ok(upgraded) => upgraded, diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 8837e6b..eff590d 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -33,6 +33,9 @@ pub struct Metrics { pub total_datagrams_out: u64, // Protocol detection cache snapshot (populated by RustProxy from HttpProxyService) pub detected_protocols: Vec, + // Protocol distribution for frontend (client→proxy) and backend (proxy→upstream) + pub frontend_protocols: ProtocolMetrics, + pub backend_protocols: ProtocolMetrics, } /// Per-route metrics. @@ -99,6 +102,23 @@ pub struct ProtocolCacheEntryMetric { pub h3_consecutive_failures: Option, } +/// Protocol distribution metrics for frontend (client→proxy) and backend (proxy→upstream). +/// Tracks active and total counts for each protocol category. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct ProtocolMetrics { + pub h1_active: u64, + pub h1_total: u64, + pub h2_active: u64, + pub h2_total: u64, + pub h3_active: u64, + pub h3_total: u64, + pub ws_active: u64, + pub ws_total: u64, + pub other_active: u64, + pub other_total: u64, +} + /// Statistics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -170,6 +190,30 @@ pub struct MetricsCollector { total_datagrams_in: AtomicU64, total_datagrams_out: AtomicU64, + // ── Frontend protocol tracking (h1/h2/h3/ws/other) ── + frontend_h1_active: AtomicU64, + frontend_h1_total: AtomicU64, + frontend_h2_active: AtomicU64, + frontend_h2_total: AtomicU64, + frontend_h3_active: AtomicU64, + frontend_h3_total: AtomicU64, + frontend_ws_active: AtomicU64, + frontend_ws_total: AtomicU64, + frontend_other_active: AtomicU64, + frontend_other_total: AtomicU64, + + // ── Backend protocol tracking (h1/h2/h3/ws/other) ── + backend_h1_active: AtomicU64, + backend_h1_total: AtomicU64, + backend_h2_active: AtomicU64, + backend_h2_total: AtomicU64, + backend_h3_active: AtomicU64, + backend_h3_total: AtomicU64, + backend_ws_active: AtomicU64, + backend_ws_total: AtomicU64, + backend_other_active: AtomicU64, + backend_other_total: AtomicU64, + // ── Lock-free pending throughput counters (hot path) ── global_pending_tp_in: AtomicU64, global_pending_tp_out: AtomicU64, @@ -221,6 +265,26 @@ impl MetricsCollector { total_http_requests: AtomicU64::new(0), pending_http_requests: AtomicU64::new(0), http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), + frontend_h1_active: AtomicU64::new(0), + frontend_h1_total: AtomicU64::new(0), + frontend_h2_active: AtomicU64::new(0), + frontend_h2_total: AtomicU64::new(0), + frontend_h3_active: AtomicU64::new(0), + frontend_h3_total: AtomicU64::new(0), + frontend_ws_active: AtomicU64::new(0), + frontend_ws_total: AtomicU64::new(0), + frontend_other_active: AtomicU64::new(0), + frontend_other_total: AtomicU64::new(0), + backend_h1_active: AtomicU64::new(0), + backend_h1_total: AtomicU64::new(0), + backend_h2_active: AtomicU64::new(0), + backend_h2_total: AtomicU64::new(0), + backend_h3_active: AtomicU64::new(0), + backend_h3_total: AtomicU64::new(0), + backend_ws_active: AtomicU64::new(0), + backend_ws_total: AtomicU64::new(0), + backend_other_active: AtomicU64::new(0), + backend_other_total: AtomicU64::new(0), global_pending_tp_in: AtomicU64::new(0), global_pending_tp_out: AtomicU64::new(0), route_pending_tp: DashMap::new(), @@ -411,6 +475,62 @@ impl MetricsCollector { self.total_datagrams_out.fetch_add(1, Ordering::Relaxed); } + // ── Frontend/backend protocol distribution tracking ── + + /// Get the (active, total) counter pair for a frontend protocol. + fn frontend_proto_counters(&self, proto: &str) -> (&AtomicU64, &AtomicU64) { + match proto { + "h2" => (&self.frontend_h2_active, &self.frontend_h2_total), + "h3" => (&self.frontend_h3_active, &self.frontend_h3_total), + "ws" => (&self.frontend_ws_active, &self.frontend_ws_total), + "other" => (&self.frontend_other_active, &self.frontend_other_total), + _ => (&self.frontend_h1_active, &self.frontend_h1_total), // h1 + default + } + } + + /// Get the (active, total) counter pair for a backend protocol. + fn backend_proto_counters(&self, proto: &str) -> (&AtomicU64, &AtomicU64) { + match proto { + "h2" => (&self.backend_h2_active, &self.backend_h2_total), + "h3" => (&self.backend_h3_active, &self.backend_h3_total), + "ws" => (&self.backend_ws_active, &self.backend_ws_total), + "other" => (&self.backend_other_active, &self.backend_other_total), + _ => (&self.backend_h1_active, &self.backend_h1_total), // h1 + default + } + } + + /// Record a frontend request/connection opened with a given protocol. + pub fn frontend_protocol_opened(&self, proto: &str) { + let (active, total) = self.frontend_proto_counters(proto); + active.fetch_add(1, Ordering::Relaxed); + total.fetch_add(1, Ordering::Relaxed); + } + + /// Record a frontend request/connection closed with a given protocol. + pub fn frontend_protocol_closed(&self, proto: &str) { + let (active, _) = self.frontend_proto_counters(proto); + let val = active.load(Ordering::Relaxed); + if val > 0 { + active.fetch_sub(1, Ordering::Relaxed); + } + } + + /// Record a backend connection opened with a given protocol. + pub fn backend_protocol_opened(&self, proto: &str) { + let (active, total) = self.backend_proto_counters(proto); + active.fetch_add(1, Ordering::Relaxed); + total.fetch_add(1, Ordering::Relaxed); + } + + /// Record a backend connection closed with a given protocol. + pub fn backend_protocol_closed(&self, proto: &str) { + let (active, _) = self.backend_proto_counters(proto); + let val = active.load(Ordering::Relaxed); + if val > 0 { + active.fetch_sub(1, Ordering::Relaxed); + } + } + // ── Per-backend recording methods ── /// Record a successful backend connection with its connect duration. @@ -866,6 +986,30 @@ impl MetricsCollector { total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed), total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed), detected_protocols: vec![], + frontend_protocols: ProtocolMetrics { + h1_active: self.frontend_h1_active.load(Ordering::Relaxed), + h1_total: self.frontend_h1_total.load(Ordering::Relaxed), + h2_active: self.frontend_h2_active.load(Ordering::Relaxed), + h2_total: self.frontend_h2_total.load(Ordering::Relaxed), + h3_active: self.frontend_h3_active.load(Ordering::Relaxed), + h3_total: self.frontend_h3_total.load(Ordering::Relaxed), + ws_active: self.frontend_ws_active.load(Ordering::Relaxed), + ws_total: self.frontend_ws_total.load(Ordering::Relaxed), + other_active: self.frontend_other_active.load(Ordering::Relaxed), + other_total: self.frontend_other_total.load(Ordering::Relaxed), + }, + backend_protocols: ProtocolMetrics { + h1_active: self.backend_h1_active.load(Ordering::Relaxed), + h1_total: self.backend_h1_total.load(Ordering::Relaxed), + h2_active: self.backend_h2_active.load(Ordering::Relaxed), + h2_total: self.backend_h2_total.load(Ordering::Relaxed), + h3_active: self.backend_h3_active.load(Ordering::Relaxed), + h3_total: self.backend_h3_total.load(Ordering::Relaxed), + ws_active: self.backend_ws_active.load(Ordering::Relaxed), + ws_total: self.backend_ws_total.load(Ordering::Relaxed), + other_active: self.backend_other_active.load(Ordering::Relaxed), + other_total: self.backend_other_total.load(Ordering::Relaxed), + }, } } } diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index 26f82d1..a59f9e3 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -43,6 +43,33 @@ impl Drop for ConnectionGuard { } } +/// RAII guard for frontend+backend protocol distribution tracking. +/// Calls the appropriate _closed methods on drop for both frontend and backend. +struct ProtocolGuard { + metrics: Arc, + frontend_proto: Option<&'static str>, + backend_proto: Option<&'static str>, +} + +impl ProtocolGuard { + fn new(metrics: Arc, frontend: &'static str, backend: &'static str) -> Self { + metrics.frontend_protocol_opened(frontend); + metrics.backend_protocol_opened(backend); + Self { metrics, frontend_proto: Some(frontend), backend_proto: Some(backend) } + } +} + +impl Drop for ProtocolGuard { + fn drop(&mut self) { + if let Some(proto) = self.frontend_proto { + self.metrics.frontend_protocol_closed(proto); + } + if let Some(proto) = self.backend_proto { + self.metrics.backend_protocol_closed(proto); + } + } +} + /// RAII guard that calls ConnectionTracker::connection_closed on drop. /// Ensures per-IP tracking is cleaned up on ALL exit paths — normal, error, or panic. struct ConnectionTrackerGuard { @@ -1024,6 +1051,9 @@ impl TcpListenerManager { peer_addr, target_host, target_port, domain ); + // Track as "other" protocol (non-HTTP passthrough) + let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other"); + let mut actual_buf = vec![0u8; n]; stream.read_exact(&mut actual_buf).await?; @@ -1090,6 +1120,8 @@ impl TcpListenerManager { "TLS Terminate + TCP: {} -> {}:{} (domain: {:?})", peer_addr, target_host, target_port, domain ); + // Track as "other" protocol (TLS-terminated non-HTTP) + let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other"); // Raw TCP forwarding of decrypted stream let backend = match tokio::time::timeout( connect_timeout, @@ -1176,6 +1208,8 @@ impl TcpListenerManager { "TLS Terminate+Reencrypt + TCP: {} -> {}:{}", peer_addr, target_host, target_port ); + // Track as "other" protocol (TLS-terminated non-HTTP, re-encrypted) + let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other"); Self::handle_tls_reencrypt_tunnel( buf_stream, &target_host, target_port, peer_addr, Arc::clone(&metrics), route_id, @@ -1192,6 +1226,8 @@ impl TcpListenerManager { Ok(()) } else { // Plain TCP forwarding (non-HTTP) + // Track as "other" protocol (plain TCP, non-HTTP) + let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other"); let mut backend = match tokio::time::timeout( connect_timeout, tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)), diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 93d97af..a3c3ac4 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '27.1.0', + version: '27.2.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/models/metrics-types.ts b/ts/proxies/smart-proxy/models/metrics-types.ts index 57a278d..dabd510 100644 --- a/ts/proxies/smart-proxy/models/metrics-types.ts +++ b/ts/proxies/smart-proxy/models/metrics-types.ts @@ -32,6 +32,23 @@ export interface IThroughputHistoryPoint { /** * Main metrics interface with clean, grouped API */ +/** + * Protocol distribution for frontend (client→proxy) or backend (proxy→upstream). + * Tracks active and total counts for h1/h2/h3/ws/other. + */ +export interface IProtocolDistribution { + h1Active: number; + h1Total: number; + h2Active: number; + h2Total: number; + h3Active: number; + h3Total: number; + wsActive: number; + wsTotal: number; + otherActive: number; + otherTotal: number; +} + export interface IMetrics { // Connection metrics connections: { @@ -40,6 +57,8 @@ export interface IMetrics { byRoute(): Map; byIP(): Map; topIPs(limit?: number): Array<{ ip: string; count: number }>; + frontendProtocols(): IProtocolDistribution; + backendProtocols(): IProtocolDistribution; }; // Throughput metrics (bytes per second) diff --git a/ts/proxies/smart-proxy/rust-metrics-adapter.ts b/ts/proxies/smart-proxy/rust-metrics-adapter.ts index f493a4d..5a21e82 100644 --- a/ts/proxies/smart-proxy/rust-metrics-adapter.ts +++ b/ts/proxies/smart-proxy/rust-metrics-adapter.ts @@ -1,4 +1,4 @@ -import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js'; +import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js'; import type { RustProxyBridge } from './rust-proxy-bridge.js'; /** @@ -90,6 +90,49 @@ export class RustMetricsAdapter implements IMetrics { result.sort((a, b) => b.count - a.count); return result.slice(0, limit); }, + frontendProtocols: (): IProtocolDistribution => { + const fp = this.cache?.frontendProtocols; + return { + h1Active: fp?.h1Active ?? 0, + h1Total: fp?.h1Total ?? 0, + h2Active: fp?.h2Active ?? 0, + h2Total: fp?.h2Total ?? 0, + h3Active: fp?.h3Active ?? 0, + h3Total: fp?.h3Total ?? 0, + wsActive: fp?.wsActive ?? 0, + wsTotal: fp?.wsTotal ?? 0, + otherActive: fp?.otherActive ?? 0, + otherTotal: fp?.otherTotal ?? 0, + }; + }, + backendProtocols: (): IProtocolDistribution => { + // Merge per-backend h1/h2/h3 data with aggregate ws/other counters + const bp = this.cache?.backendProtocols; + let h1Active = 0, h1Total = 0; + let h2Active = 0, h2Total = 0; + let h3Active = 0, h3Total = 0; + if (this.cache?.backends) { + for (const bm of Object.values(this.cache.backends)) { + const m = bm as any; + const active = m.activeConnections ?? 0; + const total = m.totalConnections ?? 0; + switch (m.protocol) { + case 'h2': h2Active += active; h2Total += total; break; + case 'h3': h3Active += active; h3Total += total; break; + default: h1Active += active; h1Total += total; break; + } + } + } + return { + h1Active, h1Total, + h2Active, h2Total, + h3Active, h3Total, + wsActive: bp?.wsActive ?? 0, + wsTotal: bp?.wsTotal ?? 0, + otherActive: bp?.otherActive ?? 0, + otherTotal: bp?.otherTotal ?? 0, + }; + }, }; public throughput = {