diff --git a/changelog.md b/changelog.md index a8252f3..60e06b9 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-12 - 25.10.0 - feat(metrics) +add per-backend connection, error, protocol, and pool metrics with stale backend pruning + +- tracks backend connection lifecycle, connect timing, protocol detection, pool hit/miss rates, handshake/request errors, and h2 fallback failures in Rust metrics +- exposes backend metrics through the TypeScript metrics adapter with backend listings, protocol lookup, and top error summaries +- prunes backend metrics for backends no longer referenced by active routes, including preserved-port targets expanded across listening ports + ## 2026-03-11 - 25.9.3 - fix(rustproxy-http) Evict stale HTTP/2 pooled senders and retry bodyless requests with fresh backend connections to avoid 502s diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 2cc1035..134ddb5 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -618,6 +618,8 @@ impl HttpProxyService { // H2 pool checkout (H2 senders are Clone and multiplexed) if use_h2 { if let Some(sender) = self.connection_pool.checkout_h2(&pool_key) { + self.metrics.backend_pool_hit(&upstream_key); + self.metrics.set_backend_protocol(&upstream_key, "h2"); let result = self.forward_h2_pooled( sender, parts, body, upstream_headers, &upstream_path, route_match.route, route_id, &ip_str, &pool_key, @@ -629,6 +631,8 @@ impl HttpProxyService { } // --- Fresh connection path --- + self.metrics.backend_pool_miss(&upstream_key); + // Choose TLS config: use ALPN config for auto-detect probe, plain config otherwise let tls_config = if needs_alpn_probe { &self.backend_tls_config_alpn @@ -637,6 +641,7 @@ impl HttpProxyService { }; // Establish backend connection + let connect_start = std::time::Instant::now(); let (backend, detected_h2) = if upstream.use_tls { match tokio::time::timeout( self.connect_timeout, @@ -661,25 +666,39 @@ impl HttpProxyService { }; self.protocol_cache.insert(cache_key, detected); - debug!( - "Auto-detected {} for backend {}:{}", - if is_h2 { "HTTP/2" } else { "HTTP/1.1" }, - upstream.host, upstream.port + info!( + backend = %upstream_key, + protocol = if is_h2 { "h2" } else { "h1" }, + connect_time_ms = %connect_start.elapsed().as_millis(), + "Backend protocol detected via ALPN" ); is_h2 } else { use_h2 }; + self.metrics.backend_connection_opened(&upstream_key, connect_start.elapsed()); + self.metrics.set_backend_protocol(&upstream_key, if final_h2 { "h2" } else { "h1" }); (BackendStream::Tls(tls), final_h2) } Ok(Err(e)) => { - error!("Failed TLS connect to upstream {}:{}: {}", upstream.host, upstream.port, e); + error!( + backend = %upstream_key, + connect_time_ms = %connect_start.elapsed().as_millis(), + error = %e, + "Backend TLS connect failed" + ); + self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable")); } Err(_) => { - error!("Upstream TLS connect timeout for {}:{}", upstream.host, upstream.port); + error!( + backend = %upstream_key, + connect_time_ms = %connect_start.elapsed().as_millis(), + "Backend TLS connect timeout" + ); + self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout")); } @@ -694,15 +713,28 @@ impl HttpProxyService { let _ = socket2::SockRef::from(&s).set_tcp_keepalive( &socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60)) ); + self.metrics.backend_connection_opened(&upstream_key, connect_start.elapsed()); + self.metrics.set_backend_protocol(&upstream_key, if use_h2 { "h2" } else { "h1" }); (BackendStream::Plain(s), use_h2) } Ok(Err(e)) => { - error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e); + error!( + backend = %upstream_key, + connect_time_ms = %connect_start.elapsed().as_millis(), + error = %e, + "Backend TCP connect failed" + ); + self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } Err(_) => { - error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port); + error!( + backend = %upstream_key, + connect_time_ms = %connect_start.elapsed().as_millis(), + "Backend TCP connect timeout" + ); + self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); } @@ -740,6 +772,7 @@ impl HttpProxyService { ).await }; self.upstream_selector.connection_ended(&upstream_key); + self.metrics.backend_connection_closed(&upstream_key); result } @@ -758,8 +791,11 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, ) -> Result>, hyper::Error> { + let backend_key = format!("{}:{}", pool_key.host, pool_key.port); + // Try pooled H1 connection first — avoids TCP+TLS handshake if let Some(pooled_sender) = self.connection_pool.checkout_h1(pool_key) { + self.metrics.backend_pool_hit(&backend_key); return self.forward_h1_with_sender( pooled_sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, pool_key, @@ -773,7 +809,8 @@ impl HttpProxyService { ) = match hyper::client::conn::http1::handshake(io).await { Ok(h) => h, Err(e) => { - error!("Upstream handshake failed: {}", e); + error!(backend = %backend_key, error = %e, "Backend H1 handshake failed"); + self.metrics.backend_handshake_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed")); } }; @@ -825,7 +862,9 @@ impl HttpProxyService { let upstream_response = match sender.send_request(upstream_req).await { Ok(resp) => resp, Err(e) => { - error!("Upstream request failed: {}", e); + let bk = format!("{}:{}", pool_key.host, pool_key.port); + error!(backend = %bk, error = %e, "Backend H1 request failed"); + self.metrics.backend_request_error(&bk); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed")); } }; @@ -851,6 +890,7 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, ) -> Result>, hyper::Error> { + let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let exec = hyper_util::rt::TokioExecutor::new(); // Explicitly type the handshake with BoxBody for uniform pool type let (sender, conn): ( @@ -859,7 +899,8 @@ impl HttpProxyService { ) = match hyper::client::conn::http2::handshake(exec, io).await { Ok(h) => h, Err(e) => { - error!("HTTP/2 upstream handshake failed: {}", e); + error!(backend = %backend_key, error = %e, "Backend H2 handshake failed"); + self.metrics.backend_handshake_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed")); } }; @@ -930,7 +971,10 @@ impl HttpProxyService { route_id: Option<&str>, source_ip: &str, ) -> Result>, hyper::Error> { + let backend_key = format!("{}:{}", pool_key.host, pool_key.port); + // Establish fresh backend connection + let retry_connect_start = std::time::Instant::now(); let backend = if pool_key.use_tls { match tokio::time::timeout( self.connect_timeout, @@ -938,11 +982,13 @@ impl HttpProxyService { ).await { Ok(Ok(tls)) => BackendStream::Tls(tls), Ok(Err(e)) => { - error!("H2 retry: TLS connect failed for {}:{}: {}", pool_key.host, pool_key.port, e); + error!(backend = %backend_key, error = %e, "H2 retry: TLS connect failed"); + self.metrics.backend_connect_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable on H2 retry")); } Err(_) => { - error!("H2 retry: TLS connect timeout for {}:{}", pool_key.host, pool_key.port); + error!(backend = %backend_key, "H2 retry: TLS connect timeout"); + self.metrics.backend_connect_error(&backend_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend timeout on H2 retry")); } } @@ -956,15 +1002,18 @@ impl HttpProxyService { BackendStream::Plain(s) } Ok(Err(e)) => { - error!("H2 retry: connect failed for {}:{}: {}", pool_key.host, pool_key.port, e); + error!(backend = %backend_key, error = %e, "H2 retry: TCP connect failed"); + self.metrics.backend_connect_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable on H2 retry")); } Err(_) => { - error!("H2 retry: connect timeout for {}:{}", pool_key.host, pool_key.port); + error!(backend = %backend_key, "H2 retry: TCP connect timeout"); + self.metrics.backend_connect_error(&backend_key); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend timeout on H2 retry")); } } }; + self.metrics.backend_connection_opened(&backend_key, retry_connect_start.elapsed()); let io = TokioIo::new(backend); let exec = hyper_util::rt::TokioExecutor::new(); @@ -974,7 +1023,8 @@ impl HttpProxyService { ) = match hyper::client::conn::http2::handshake(exec, io).await { Ok(h) => h, Err(e) => { - error!("H2 retry: handshake failed for {}:{}: {}", pool_key.host, pool_key.port, e); + error!(backend = %backend_key, error = %e, "H2 retry: handshake failed"); + self.metrics.backend_handshake_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 retry handshake failed")); } }; @@ -1004,11 +1054,17 @@ impl HttpProxyService { match sender.send_request(upstream_req).await { Ok(resp) => { - self.build_streaming_response(resp, route, route_id, source_ip).await + let result = self.build_streaming_response(resp, route, route_id, source_ip).await; + // Close the fresh backend connection (opened at line 1016 above) + self.metrics.backend_connection_closed(&backend_key); + result } Err(e) => { - error!("H2 retry: request failed for {}:{}: {}", pool_key.host, pool_key.port, e); + error!(backend = %backend_key, error = %e, "H2 retry: request failed"); + self.metrics.backend_request_error(&backend_key); self.connection_pool.remove_h2(pool_key); + // Close the fresh backend connection (opened at line 1016 above) + self.metrics.backend_connection_closed(&backend_key); Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed on retry")) } } @@ -1086,10 +1142,13 @@ impl HttpProxyService { Err(e) => { // H2 request failed — backend advertises h2 via ALPN but doesn't // actually speak it. Update cache so future requests use H1. + let bk = format!("{}:{}", upstream.host, upstream.port); warn!( - "Auto-detect: H2 request failed for {}:{}, falling back to H1: {}", - upstream.host, upstream.port, e + backend = %bk, + error = %e, + "Auto-detect: H2 request failed, falling back to H1" ); + self.metrics.backend_h2_failure(&bk); let cache_key = crate::protocol_cache::ProtocolCacheKey { host: upstream.host.clone(), port: upstream.port, @@ -1108,10 +1167,13 @@ impl HttpProxyService { h2: false, }; let fallback_io = TokioIo::new(fallback_backend); - self.forward_h1_empty_body( + let result = self.forward_h1_empty_body( fallback_io, method, headers, upstream_path, route, route_id, source_ip, &h1_pool_key, - ).await + ).await; + // Close the reconnected backend connection (opened in reconnect_backend) + self.metrics.backend_connection_closed(&bk); + result } None => { Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback")) @@ -1126,10 +1188,14 @@ impl HttpProxyService { Err(e) => { // H2 handshake truly failed — fall back to H1 // Body is NOT consumed yet, so we can retry the full request. + let bk = format!("{}:{}", upstream.host, upstream.port); warn!( - "H2 handshake failed for {}:{}, falling back to H1: {}", - upstream.host, upstream.port, e + backend = %bk, + error = %e, + "H2 handshake failed, falling back to H1" ); + self.metrics.backend_h2_failure(&bk); + self.metrics.backend_handshake_error(&bk); // Update cache to H1 so subsequent requests skip H2 let cache_key = crate::protocol_cache::ProtocolCacheKey { @@ -1149,10 +1215,13 @@ impl HttpProxyService { h2: false, }; let fallback_io = TokioIo::new(fallback_backend); - self.forward_h1( + let result = self.forward_h1( fallback_io, parts, body, upstream_headers, upstream_path, upstream, route, route_id, source_ip, &h1_pool_key, - ).await + ).await; + // Close the reconnected backend connection (opened in reconnect_backend) + self.metrics.backend_connection_closed(&bk); + result } None => { Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback")) @@ -1175,13 +1244,15 @@ impl HttpProxyService { source_ip: &str, pool_key: &crate::connection_pool::PoolKey, ) -> Result>, hyper::Error> { + let backend_key = format!("{}:{}", pool_key.host, pool_key.port); let (mut sender, conn): ( hyper::client::conn::http1::SendRequest>, hyper::client::conn::http1::Connection, BoxBody>, ) = match hyper::client::conn::http1::handshake(io).await { Ok(h) => h, Err(e) => { - error!("H1 fallback: handshake failed: {}", e); + error!(backend = %backend_key, error = %e, "H1 fallback: handshake failed"); + self.metrics.backend_handshake_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H1 fallback handshake failed")); } }; @@ -1209,7 +1280,8 @@ impl HttpProxyService { let upstream_response = match sender.send_request(upstream_req).await { Ok(resp) => resp, Err(e) => { - error!("H1 fallback: request failed: {}", e); + error!(backend = %backend_key, error = %e, "H1 fallback: request failed"); + self.metrics.backend_request_error(&backend_key); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H1 fallback request failed")); } }; @@ -1225,18 +1297,25 @@ impl HttpProxyService { &self, upstream: &crate::upstream_selector::UpstreamSelection, ) -> Option { + let backend_key = format!("{}:{}", upstream.host, upstream.port); + let reconnect_start = std::time::Instant::now(); if upstream.use_tls { match tokio::time::timeout( self.connect_timeout, connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port), ).await { - Ok(Ok(tls)) => Some(BackendStream::Tls(tls)), + Ok(Ok(tls)) => { + self.metrics.backend_connection_opened(&backend_key, reconnect_start.elapsed()); + Some(BackendStream::Tls(tls)) + } Ok(Err(e)) => { - error!("H1 fallback: TLS reconnect failed for {}:{}: {}", upstream.host, upstream.port, e); + error!(backend = %backend_key, error = %e, "H1 fallback: TLS reconnect failed"); + self.metrics.backend_connect_error(&backend_key); None } Err(_) => { - error!("H1 fallback: TLS reconnect timeout for {}:{}", upstream.host, upstream.port); + error!(backend = %backend_key, "H1 fallback: TLS reconnect timeout"); + self.metrics.backend_connect_error(&backend_key); None } } @@ -1250,14 +1329,17 @@ impl HttpProxyService { let _ = socket2::SockRef::from(&s).set_tcp_keepalive( &socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60)) ); + self.metrics.backend_connection_opened(&backend_key, reconnect_start.elapsed()); Some(BackendStream::Plain(s)) } Ok(Err(e)) => { - error!("H1 fallback: reconnect failed for {}:{}: {}", upstream.host, upstream.port, e); + error!(backend = %backend_key, error = %e, "H1 fallback: TCP reconnect failed"); + self.metrics.backend_connect_error(&backend_key); None } Err(_) => { - error!("H1 fallback: reconnect timeout for {}:{}", upstream.host, upstream.port); + error!(backend = %backend_key, "H1 fallback: TCP reconnect timeout"); + self.metrics.backend_connect_error(&backend_key); None } } @@ -1300,10 +1382,14 @@ impl HttpProxyService { let upstream_response = match sender.send_request(upstream_req).await { Ok(resp) => resp, Err(e) => { - error!("HTTP/2 upstream request failed: {}", e); // Evict the dead sender so subsequent requests get fresh connections if let Some(key) = pool_key { + let bk = format!("{}:{}", key.host, key.port); + error!(backend = %bk, error = %e, "Backend H2 request failed"); + self.metrics.backend_request_error(&bk); self.connection_pool.remove_h2(key); + } else { + error!(error = %e, "Backend H2 request failed"); } return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed")); } diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 4bc6215..5a0c97f 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; +use std::time::Duration; use crate::throughput::{ThroughputSample, ThroughputTracker}; @@ -20,6 +21,7 @@ pub struct Metrics { pub throughput_recent_out_bytes_per_sec: u64, pub routes: std::collections::HashMap, pub ips: std::collections::HashMap, + pub backends: std::collections::HashMap, pub throughput_history: Vec, pub total_http_requests: u64, pub http_requests_per_sec: u64, @@ -52,6 +54,23 @@ pub struct IpMetrics { pub throughput_out_bytes_per_sec: u64, } +/// Per-backend metrics (keyed by "host:port"). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BackendMetrics { + pub active_connections: u64, + pub total_connections: u64, + pub protocol: String, + pub connect_errors: u64, + pub handshake_errors: u64, + pub request_errors: u64, + pub total_connect_time_us: u64, + pub connect_count: u64, + pub pool_hits: u64, + pub pool_misses: u64, + pub h2_failures: u64, +} + /// Statistics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -69,6 +88,9 @@ const DEFAULT_RETENTION_SECONDS: usize = 3600; /// Maximum number of IPs to include in a snapshot (top by active connections). const MAX_IPS_IN_SNAPSHOT: usize = 100; +/// Maximum number of backends to include in a snapshot (top by total connections). +const MAX_BACKENDS_IN_SNAPSHOT: usize = 100; + /// Metrics collector tracking connections and throughput. /// /// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches @@ -96,6 +118,19 @@ pub struct MetricsCollector { ip_pending_tp: DashMap, ip_throughput: DashMap>, + // ── Per-backend tracking (keyed by "host:port") ── + backend_active: DashMap, + backend_total: DashMap, + backend_protocol: DashMap, + backend_connect_errors: DashMap, + backend_handshake_errors: DashMap, + backend_request_errors: DashMap, + backend_connect_time_us: DashMap, + backend_connect_count: DashMap, + backend_pool_hits: DashMap, + backend_pool_misses: DashMap, + backend_h2_failures: DashMap, + // ── HTTP request tracking ── total_http_requests: AtomicU64, pending_http_requests: AtomicU64, @@ -134,6 +169,17 @@ impl MetricsCollector { ip_bytes_out: DashMap::new(), ip_pending_tp: DashMap::new(), ip_throughput: DashMap::new(), + backend_active: DashMap::new(), + backend_total: DashMap::new(), + backend_protocol: DashMap::new(), + backend_connect_errors: DashMap::new(), + backend_handshake_errors: DashMap::new(), + backend_request_errors: DashMap::new(), + backend_connect_time_us: DashMap::new(), + backend_connect_count: DashMap::new(), + backend_pool_hits: DashMap::new(), + backend_pool_misses: DashMap::new(), + backend_h2_failures: DashMap::new(), total_http_requests: AtomicU64::new(0), pending_http_requests: AtomicU64::new(0), http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), @@ -268,6 +314,113 @@ impl MetricsCollector { self.pending_http_requests.fetch_add(1, Ordering::Relaxed); } + // ── Per-backend recording methods ── + + /// Record a successful backend connection with its connect duration. + pub fn backend_connection_opened(&self, key: &str, connect_time: Duration) { + self.backend_active + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + self.backend_total + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + self.backend_connect_time_us + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(connect_time.as_micros() as u64, Ordering::Relaxed); + self.backend_connect_count + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Record a backend connection closing. + pub fn backend_connection_closed(&self, key: &str) { + if let Some(counter) = self.backend_active.get(key) { + let val = counter.load(Ordering::Relaxed); + if val > 0 { + counter.fetch_sub(1, Ordering::Relaxed); + } + } + } + + /// Record a backend connect error (TCP or TLS connect failure/timeout). + pub fn backend_connect_error(&self, key: &str) { + self.backend_connect_errors + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Record a backend handshake error (H1 or H2 handshake failure). + pub fn backend_handshake_error(&self, key: &str) { + self.backend_handshake_errors + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Record a backend request error (send_request failure). + pub fn backend_request_error(&self, key: &str) { + self.backend_request_errors + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Record a connection pool hit for a backend. + pub fn backend_pool_hit(&self, key: &str) { + self.backend_pool_hits + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Record a connection pool miss for a backend. + pub fn backend_pool_miss(&self, key: &str) { + self.backend_pool_misses + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Record an H2 failure (h2 attempted but fell back to h1). + pub fn backend_h2_failure(&self, key: &str) { + self.backend_h2_failures + .entry(key.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } + + /// Set the protocol in use for a backend ("h1" or "h2"). + pub fn set_backend_protocol(&self, key: &str, protocol: &str) { + self.backend_protocol + .entry(key.to_string()) + .and_modify(|v| { + if v != protocol { + *v = protocol.to_string(); + } + }) + .or_insert_with(|| protocol.to_string()); + } + + /// Remove per-backend metrics for backends no longer in any route target. + pub fn retain_backends(&self, active_backends: &HashSet) { + self.backend_active.retain(|k, _| active_backends.contains(k)); + self.backend_total.retain(|k, _| active_backends.contains(k)); + self.backend_protocol.retain(|k, _| active_backends.contains(k)); + self.backend_connect_errors.retain(|k, _| active_backends.contains(k)); + self.backend_handshake_errors.retain(|k, _| active_backends.contains(k)); + self.backend_request_errors.retain(|k, _| active_backends.contains(k)); + self.backend_connect_time_us.retain(|k, _| active_backends.contains(k)); + self.backend_connect_count.retain(|k, _| active_backends.contains(k)); + self.backend_pool_hits.retain(|k, _| active_backends.contains(k)); + self.backend_pool_misses.retain(|k, _| active_backends.contains(k)); + self.backend_h2_failures.retain(|k, _| active_backends.contains(k)); + } + /// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval). /// /// Drains the lock-free pending counters and feeds the accumulated bytes @@ -488,6 +641,72 @@ impl MetricsCollector { }); } + // Collect per-backend metrics, capped at top MAX_BACKENDS_IN_SNAPSHOT by total connections + let mut backend_entries: Vec<(String, BackendMetrics)> = Vec::new(); + for entry in self.backend_total.iter() { + let key = entry.key().clone(); + let total = entry.value().load(Ordering::Relaxed); + let active = self.backend_active + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let protocol = self.backend_protocol + .get(&key) + .map(|v| v.value().clone()) + .unwrap_or_else(|| "unknown".to_string()); + let connect_errors = self.backend_connect_errors + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let handshake_errors = self.backend_handshake_errors + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let request_errors = self.backend_request_errors + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let total_connect_time_us = self.backend_connect_time_us + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let connect_count = self.backend_connect_count + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let pool_hits = self.backend_pool_hits + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let pool_misses = self.backend_pool_misses + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let h2_failures = self.backend_h2_failures + .get(&key) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + + backend_entries.push((key, BackendMetrics { + active_connections: active, + total_connections: total, + protocol, + connect_errors, + handshake_errors, + request_errors, + total_connect_time_us, + connect_count, + pool_hits, + pool_misses, + h2_failures, + })); + } + // Sort by total connections descending, then cap + backend_entries.sort_by(|a, b| b.1.total_connections.cmp(&a.1.total_connections)); + backend_entries.truncate(MAX_BACKENDS_IN_SNAPSHOT); + + let backends: std::collections::HashMap = backend_entries.into_iter().collect(); + // HTTP request rates let (http_rps, http_rps_recent) = self.http_request_throughput .lock() @@ -509,6 +728,7 @@ impl MetricsCollector { throughput_recent_out_bytes_per_sec: global_recent_out, routes, ips, + backends, throughput_history, total_http_requests: self.total_http_requests.load(Ordering::Relaxed), http_requests_per_sec: http_rps, @@ -805,4 +1025,120 @@ mod tests { assert_eq!(snapshot.throughput_history[0].bytes_in, 100); assert_eq!(snapshot.throughput_history[4].bytes_in, 500); } + + #[test] + fn test_backend_metrics_basic() { + let collector = MetricsCollector::new(); + let key = "backend1:8080"; + + // Open connections with timing + collector.backend_connection_opened(key, Duration::from_millis(15)); + collector.backend_connection_opened(key, Duration::from_millis(25)); + + assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 2); + assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2); + assert_eq!(collector.backend_connect_count.get(key).unwrap().load(Ordering::Relaxed), 2); + // 15ms + 25ms = 40ms = 40_000us + assert_eq!(collector.backend_connect_time_us.get(key).unwrap().load(Ordering::Relaxed), 40_000); + + // Close one + collector.backend_connection_closed(key); + assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 1); + // total stays + assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2); + + // Record errors + collector.backend_connect_error(key); + collector.backend_handshake_error(key); + collector.backend_request_error(key); + collector.backend_h2_failure(key); + collector.backend_pool_hit(key); + collector.backend_pool_hit(key); + collector.backend_pool_miss(key); + + assert_eq!(collector.backend_connect_errors.get(key).unwrap().load(Ordering::Relaxed), 1); + assert_eq!(collector.backend_handshake_errors.get(key).unwrap().load(Ordering::Relaxed), 1); + assert_eq!(collector.backend_request_errors.get(key).unwrap().load(Ordering::Relaxed), 1); + assert_eq!(collector.backend_h2_failures.get(key).unwrap().load(Ordering::Relaxed), 1); + assert_eq!(collector.backend_pool_hits.get(key).unwrap().load(Ordering::Relaxed), 2); + assert_eq!(collector.backend_pool_misses.get(key).unwrap().load(Ordering::Relaxed), 1); + + // Protocol + collector.set_backend_protocol(key, "h1"); + assert_eq!(collector.backend_protocol.get(key).unwrap().value(), "h1"); + collector.set_backend_protocol(key, "h2"); + assert_eq!(collector.backend_protocol.get(key).unwrap().value(), "h2"); + } + + #[test] + fn test_backend_metrics_in_snapshot() { + let collector = MetricsCollector::new(); + + collector.backend_connection_opened("b1:443", Duration::from_millis(10)); + collector.backend_connection_opened("b2:8080", Duration::from_millis(20)); + collector.set_backend_protocol("b1:443", "h2"); + collector.set_backend_protocol("b2:8080", "h1"); + collector.backend_connect_error("b1:443"); + + let snapshot = collector.snapshot(); + assert_eq!(snapshot.backends.len(), 2); + + let b1 = snapshot.backends.get("b1:443").unwrap(); + assert_eq!(b1.active_connections, 1); + assert_eq!(b1.total_connections, 1); + assert_eq!(b1.protocol, "h2"); + assert_eq!(b1.connect_errors, 1); + assert_eq!(b1.total_connect_time_us, 10_000); + assert_eq!(b1.connect_count, 1); + + let b2 = snapshot.backends.get("b2:8080").unwrap(); + assert_eq!(b2.protocol, "h1"); + assert_eq!(b2.connect_errors, 0); + } + + #[test] + fn test_retain_backends_prunes_stale() { + let collector = MetricsCollector::new(); + + collector.backend_connection_opened("active:443", Duration::from_millis(5)); + collector.backend_connection_opened("stale:8080", Duration::from_millis(10)); + collector.set_backend_protocol("active:443", "h1"); + collector.set_backend_protocol("stale:8080", "h2"); + collector.backend_connect_error("stale:8080"); + + let active = HashSet::from(["active:443".to_string()]); + collector.retain_backends(&active); + + // active:443 should still exist + assert!(collector.backend_total.get("active:443").is_some()); + assert!(collector.backend_protocol.get("active:443").is_some()); + + // stale:8080 should be fully removed + assert!(collector.backend_active.get("stale:8080").is_none()); + assert!(collector.backend_total.get("stale:8080").is_none()); + assert!(collector.backend_protocol.get("stale:8080").is_none()); + assert!(collector.backend_connect_errors.get("stale:8080").is_none()); + assert!(collector.backend_connect_time_us.get("stale:8080").is_none()); + assert!(collector.backend_connect_count.get("stale:8080").is_none()); + assert!(collector.backend_pool_hits.get("stale:8080").is_none()); + assert!(collector.backend_pool_misses.get("stale:8080").is_none()); + assert!(collector.backend_h2_failures.get("stale:8080").is_none()); + } + + #[test] + fn test_backend_connection_closed_saturates() { + let collector = MetricsCollector::new(); + let key = "b:80"; + + // Close without opening — should not underflow + collector.backend_connection_closed(key); + // No entry created + assert!(collector.backend_active.get(key).is_none()); + + // Open one, close two — should saturate at 0 + collector.backend_connection_opened(key, Duration::from_millis(1)); + collector.backend_connection_closed(key); + collector.backend_connection_closed(key); + assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 0); + } } diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index 852557a..fcdb226 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -603,6 +603,31 @@ impl RustProxy { .collect(); self.metrics.retain_routes(&active_route_ids); + // Prune per-backend metrics for backends no longer in any route target. + // For PortSpec::Preserve routes, expand across all listening ports since + // the actual runtime port depends on the incoming connection. + let listening_ports = self.get_listening_ports(); + let active_backends: HashSet = routes.iter() + .filter_map(|r| r.action.targets.as_ref()) + .flat_map(|targets| targets.iter()) + .flat_map(|target| { + let hosts: Vec = target.host.to_vec().into_iter().map(|s| s.to_string()).collect(); + match &target.port { + rustproxy_config::PortSpec::Fixed(p) => { + hosts.into_iter().map(|h| format!("{}:{}", h, p)).collect::>() + } + _ => { + // Preserve/special: expand across all listening ports + let lp = &listening_ports; + hosts.into_iter() + .flat_map(|h| lp.iter().map(move |p| format!("{}:{}", h, *p))) + .collect::>() + } + } + }) + .collect(); + self.metrics.retain_backends(&active_backends); + // Atomically swap the route table let new_manager = Arc::new(new_manager); self.route_table.store(Arc::clone(&new_manager)); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index f77a18d..65a15e0 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.9.3', + version: '25.10.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/models/metrics-types.ts b/ts/proxies/smart-proxy/models/metrics-types.ts index d958ca1..5b16765 100644 --- a/ts/proxies/smart-proxy/models/metrics-types.ts +++ b/ts/proxies/smart-proxy/models/metrics-types.ts @@ -67,6 +67,13 @@ export interface IMetrics { connections(): number; }; + // Backend metrics + backends: { + byBackend(): Map; + protocols(): Map; + topByErrors(limit?: number): Array<{ backend: string; errors: number }>; + }; + // Performance metrics percentiles: { connectionDuration(): { p50: number; p95: number; p99: number }; @@ -98,6 +105,21 @@ export interface IMetricsConfig { prometheusPrefix: string; // Default: smartproxy_ } +/** + * Per-backend metrics + */ +export interface IBackendMetrics { + protocol: string; + activeConnections: number; + totalConnections: number; + connectErrors: number; + handshakeErrors: number; + requestErrors: number; + avgConnectTimeMs: number; + poolHitRate: number; + h2Failures: number; +} + /** * Internal interface for connection byte tracking */ diff --git a/ts/proxies/smart-proxy/rust-metrics-adapter.ts b/ts/proxies/smart-proxy/rust-metrics-adapter.ts index 38f4108..0ce6edd 100644 --- a/ts/proxies/smart-proxy/rust-metrics-adapter.ts +++ b/ts/proxies/smart-proxy/rust-metrics-adapter.ts @@ -1,4 +1,4 @@ -import type { IMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js'; +import type { IMetrics, IBackendMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js'; import type { RustProxyBridge } from './rust-proxy-bridge.js'; /** @@ -169,6 +169,55 @@ export class RustMetricsAdapter implements IMetrics { }, }; + public backends = { + byBackend: (): Map => { + const result = new Map(); + if (this.cache?.backends) { + for (const [key, bm] of Object.entries(this.cache.backends)) { + const m = bm as any; + const totalTimeUs = m.totalConnectTimeUs ?? 0; + const count = m.connectCount ?? 0; + const poolHits = m.poolHits ?? 0; + const poolMisses = m.poolMisses ?? 0; + const poolTotal = poolHits + poolMisses; + result.set(key, { + protocol: m.protocol ?? 'unknown', + activeConnections: m.activeConnections ?? 0, + totalConnections: m.totalConnections ?? 0, + connectErrors: m.connectErrors ?? 0, + handshakeErrors: m.handshakeErrors ?? 0, + requestErrors: m.requestErrors ?? 0, + avgConnectTimeMs: count > 0 ? (totalTimeUs / count) / 1000 : 0, + poolHitRate: poolTotal > 0 ? poolHits / poolTotal : 0, + h2Failures: m.h2Failures ?? 0, + }); + } + } + return result; + }, + protocols: (): Map => { + const result = new Map(); + if (this.cache?.backends) { + for (const [key, bm] of Object.entries(this.cache.backends)) { + result.set(key, (bm as any).protocol ?? 'unknown'); + } + } + return result; + }, + topByErrors: (limit: number = 10): Array<{ backend: string; errors: number }> => { + const result: Array<{ backend: string; errors: number }> = []; + if (this.cache?.backends) { + for (const [key, bm] of Object.entries(this.cache.backends)) { + const m = bm as any; + const errors = (m.connectErrors ?? 0) + (m.handshakeErrors ?? 0) + (m.requestErrors ?? 0); + if (errors > 0) result.push({ backend: key, errors }); + } + } + result.sort((a, b) => b.errors - a.errors); + return result.slice(0, limit); + }, + }; + public percentiles = { connectionDuration: (): { p50: number; p95: number; p99: number } => { return { p50: 0, p95: 0, p99: 0 };