use dashmap::DashMap; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; use std::time::Duration; use crate::throughput::{ThroughputSample, ThroughputTracker}; /// Aggregated metrics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Metrics { pub active_connections: u64, pub total_connections: u64, pub bytes_in: u64, pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, pub throughput_recent_in_bytes_per_sec: u64, pub throughput_recent_out_bytes_per_sec: u64, pub routes: std::collections::HashMap, pub ips: std::collections::HashMap, pub backends: std::collections::HashMap, pub throughput_history: Vec, pub total_http_requests: u64, pub http_requests_per_sec: u64, pub http_requests_per_sec_recent: u64, } /// Per-route metrics. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RouteMetrics { pub active_connections: u64, pub total_connections: u64, pub bytes_in: u64, pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, pub throughput_recent_in_bytes_per_sec: u64, pub throughput_recent_out_bytes_per_sec: u64, } /// Per-IP metrics. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct IpMetrics { pub active_connections: u64, pub total_connections: u64, pub bytes_in: u64, pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, } /// Per-backend metrics (keyed by "host:port"). #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BackendMetrics { pub active_connections: u64, pub total_connections: u64, pub protocol: String, pub connect_errors: u64, pub handshake_errors: u64, pub request_errors: u64, pub total_connect_time_us: u64, pub connect_count: u64, pub pool_hits: u64, pub pool_misses: u64, pub h2_failures: u64, } /// Statistics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Statistics { pub active_connections: u64, pub total_connections: u64, pub routes_count: u64, pub listening_ports: Vec, pub uptime_seconds: u64, } /// Default retention for throughput samples (1 hour). const DEFAULT_RETENTION_SECONDS: usize = 3600; /// Maximum number of IPs to include in a snapshot (top by active connections). const MAX_IPS_IN_SNAPSHOT: usize = 100; /// Maximum number of backends to include in a snapshot (top by total connections). const MAX_BACKENDS_IN_SNAPSHOT: usize = 100; /// Metrics collector tracking connections and throughput. /// /// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches /// `AtomicU64` counters. The cold path (`sample_all`, called at 1Hz) drains /// those atomics and feeds the throughput trackers under a Mutex. This avoids /// contention when `record_bytes` is called per-chunk in the TCP copy loop. pub struct MetricsCollector { active_connections: AtomicU64, total_connections: AtomicU64, total_bytes_in: AtomicU64, total_bytes_out: AtomicU64, /// Per-route active connection counts route_connections: DashMap, /// Per-route total connection counts route_total_connections: DashMap, /// Per-route byte counters route_bytes_in: DashMap, route_bytes_out: DashMap, // ── Per-IP tracking ── ip_connections: DashMap, ip_total_connections: DashMap, ip_bytes_in: DashMap, ip_bytes_out: DashMap, ip_pending_tp: DashMap, ip_throughput: DashMap>, // ── Per-backend tracking (keyed by "host:port") ── backend_active: DashMap, backend_total: DashMap, backend_protocol: DashMap, backend_connect_errors: DashMap, backend_handshake_errors: DashMap, backend_request_errors: DashMap, backend_connect_time_us: DashMap, backend_connect_count: DashMap, backend_pool_hits: DashMap, backend_pool_misses: DashMap, backend_h2_failures: DashMap, // ── HTTP request tracking ── total_http_requests: AtomicU64, pending_http_requests: AtomicU64, http_request_throughput: Mutex, // ── Lock-free pending throughput counters (hot path) ── global_pending_tp_in: AtomicU64, global_pending_tp_out: AtomicU64, route_pending_tp: DashMap, // ── Throughput history — only locked during sampling (cold path) ── global_throughput: Mutex, route_throughput: DashMap>, retention_seconds: usize, } impl MetricsCollector { pub fn new() -> Self { Self::with_retention(DEFAULT_RETENTION_SECONDS) } /// Create a MetricsCollector with a custom retention period for throughput history. pub fn with_retention(retention_seconds: usize) -> Self { Self { active_connections: AtomicU64::new(0), total_connections: AtomicU64::new(0), total_bytes_in: AtomicU64::new(0), total_bytes_out: AtomicU64::new(0), route_connections: DashMap::new(), route_total_connections: DashMap::new(), route_bytes_in: DashMap::new(), route_bytes_out: DashMap::new(), ip_connections: DashMap::new(), ip_total_connections: DashMap::new(), ip_bytes_in: DashMap::new(), ip_bytes_out: DashMap::new(), ip_pending_tp: DashMap::new(), ip_throughput: DashMap::new(), backend_active: DashMap::new(), backend_total: DashMap::new(), backend_protocol: DashMap::new(), backend_connect_errors: DashMap::new(), backend_handshake_errors: DashMap::new(), backend_request_errors: DashMap::new(), backend_connect_time_us: DashMap::new(), backend_connect_count: DashMap::new(), backend_pool_hits: DashMap::new(), backend_pool_misses: DashMap::new(), backend_h2_failures: DashMap::new(), total_http_requests: AtomicU64::new(0), pending_http_requests: AtomicU64::new(0), http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), global_pending_tp_in: AtomicU64::new(0), global_pending_tp_out: AtomicU64::new(0), route_pending_tp: DashMap::new(), global_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), route_throughput: DashMap::new(), retention_seconds, } } /// Record a new connection. pub fn connection_opened(&self, route_id: Option<&str>, source_ip: Option<&str>) { self.active_connections.fetch_add(1, Ordering::Relaxed); self.total_connections.fetch_add(1, Ordering::Relaxed); if let Some(route_id) = route_id { self.route_connections .entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); self.route_total_connections .entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } if let Some(ip) = source_ip { self.ip_connections .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); self.ip_total_connections .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } } /// Record a connection closing. pub fn connection_closed(&self, route_id: Option<&str>, source_ip: Option<&str>) { self.active_connections.fetch_sub(1, Ordering::Relaxed); if let Some(route_id) = route_id { if let Some(counter) = self.route_connections.get(route_id) { let val = counter.load(Ordering::Relaxed); if val > 0 { counter.fetch_sub(1, Ordering::Relaxed); } } } if let Some(ip) = source_ip { if let Some(counter) = self.ip_connections.get(ip) { let val = counter.load(Ordering::Relaxed); if val > 0 { counter.fetch_sub(1, Ordering::Relaxed); } // Clean up zero-count entries to prevent memory growth if val <= 1 { drop(counter); self.ip_connections.remove(ip); // Evict all per-IP tracking data for this IP self.ip_total_connections.remove(ip); self.ip_bytes_in.remove(ip); self.ip_bytes_out.remove(ip); self.ip_pending_tp.remove(ip); self.ip_throughput.remove(ip); } } } } /// Record bytes transferred (lock-free hot path). /// /// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters — /// no Mutex is taken. The throughput trackers are fed during `sample_all()`. pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64, route_id: Option<&str>, source_ip: Option<&str>) { self.total_bytes_in.fetch_add(bytes_in, Ordering::Relaxed); self.total_bytes_out.fetch_add(bytes_out, Ordering::Relaxed); // Accumulate into lock-free pending throughput counters self.global_pending_tp_in.fetch_add(bytes_in, Ordering::Relaxed); self.global_pending_tp_out.fetch_add(bytes_out, Ordering::Relaxed); // Per-route tracking: use get() first (zero-alloc fast path for existing entries), // fall back to entry() with to_string() only on the rare first-chunk miss. if let Some(route_id) = route_id { if let Some(counter) = self.route_bytes_in.get(route_id) { counter.fetch_add(bytes_in, Ordering::Relaxed); } else { self.route_bytes_in.entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_in, Ordering::Relaxed); } if let Some(counter) = self.route_bytes_out.get(route_id) { counter.fetch_add(bytes_out, Ordering::Relaxed); } else { self.route_bytes_out.entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_out, Ordering::Relaxed); } // Accumulate into per-route pending throughput counters (lock-free) if let Some(entry) = self.route_pending_tp.get(route_id) { entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } else { let entry = self.route_pending_tp.entry(route_id.to_string()) .or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0))); entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } } // Per-IP tracking: same get()-first pattern to avoid String allocation on hot path. if let Some(ip) = source_ip { // Only record per-IP stats if the IP still has active connections. // This prevents orphaned entries when record_bytes races with // connection_closed (which evicts all per-IP data on last close). if self.ip_connections.contains_key(ip) { if let Some(counter) = self.ip_bytes_in.get(ip) { counter.fetch_add(bytes_in, Ordering::Relaxed); } else { self.ip_bytes_in.entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_in, Ordering::Relaxed); } if let Some(counter) = self.ip_bytes_out.get(ip) { counter.fetch_add(bytes_out, Ordering::Relaxed); } else { self.ip_bytes_out.entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_out, Ordering::Relaxed); } // Accumulate into per-IP pending throughput counters (lock-free) if let Some(entry) = self.ip_pending_tp.get(ip) { entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } else { let entry = self.ip_pending_tp.entry(ip.to_string()) .or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0))); entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } } } } /// Record an HTTP request (called once per request in the HTTP proxy). pub fn record_http_request(&self) { self.total_http_requests.fetch_add(1, Ordering::Relaxed); self.pending_http_requests.fetch_add(1, Ordering::Relaxed); } // ── Per-backend recording methods ── /// Record a successful backend connection with its connect duration. pub fn backend_connection_opened(&self, key: &str, connect_time: Duration) { self.backend_active .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); self.backend_total .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); self.backend_connect_time_us .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(connect_time.as_micros() as u64, Ordering::Relaxed); self.backend_connect_count .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Record a backend connection closing. pub fn backend_connection_closed(&self, key: &str) { if let Some(counter) = self.backend_active.get(key) { let val = counter.load(Ordering::Relaxed); if val > 0 { counter.fetch_sub(1, Ordering::Relaxed); } } } /// Record a backend connect error (TCP or TLS connect failure/timeout). pub fn backend_connect_error(&self, key: &str) { self.backend_connect_errors .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Record a backend handshake error (H1 or H2 handshake failure). pub fn backend_handshake_error(&self, key: &str) { self.backend_handshake_errors .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Record a backend request error (send_request failure). pub fn backend_request_error(&self, key: &str) { self.backend_request_errors .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Record a connection pool hit for a backend. pub fn backend_pool_hit(&self, key: &str) { self.backend_pool_hits .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Record a connection pool miss for a backend. pub fn backend_pool_miss(&self, key: &str) { self.backend_pool_misses .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Record an H2 failure (h2 attempted but fell back to h1). pub fn backend_h2_failure(&self, key: &str) { self.backend_h2_failures .entry(key.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } /// Set the protocol in use for a backend ("h1" or "h2"). pub fn set_backend_protocol(&self, key: &str, protocol: &str) { self.backend_protocol .entry(key.to_string()) .and_modify(|v| { if v != protocol { *v = protocol.to_string(); } }) .or_insert_with(|| protocol.to_string()); } /// Remove per-backend metrics for backends no longer in any route target. pub fn retain_backends(&self, active_backends: &HashSet) { self.backend_active.retain(|k, _| active_backends.contains(k)); self.backend_total.retain(|k, _| active_backends.contains(k)); self.backend_protocol.retain(|k, _| active_backends.contains(k)); self.backend_connect_errors.retain(|k, _| active_backends.contains(k)); self.backend_handshake_errors.retain(|k, _| active_backends.contains(k)); self.backend_request_errors.retain(|k, _| active_backends.contains(k)); self.backend_connect_time_us.retain(|k, _| active_backends.contains(k)); self.backend_connect_count.retain(|k, _| active_backends.contains(k)); self.backend_pool_hits.retain(|k, _| active_backends.contains(k)); self.backend_pool_misses.retain(|k, _| active_backends.contains(k)); self.backend_h2_failures.retain(|k, _| active_backends.contains(k)); } /// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval). /// /// Drains the lock-free pending counters and feeds the accumulated bytes /// into the throughput trackers (under Mutex). This is the only place /// the Mutex is locked. pub fn sample_all(&self) { // Drain global pending bytes and feed into the tracker let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed); let global_out = self.global_pending_tp_out.swap(0, Ordering::Relaxed); if let Ok(mut tracker) = self.global_throughput.lock() { tracker.record_bytes(global_in, global_out); tracker.sample(); } // Drain per-route pending bytes; collect into a Vec to avoid holding DashMap shards let mut route_samples: Vec<(String, u64, u64)> = Vec::new(); for entry in self.route_pending_tp.iter() { let route_id = entry.key().clone(); let pending_in = entry.value().0.swap(0, Ordering::Relaxed); let pending_out = entry.value().1.swap(0, Ordering::Relaxed); route_samples.push((route_id, pending_in, pending_out)); } // Feed pending bytes into route trackers and sample let retention = self.retention_seconds; for (route_id, pending_in, pending_out) in &route_samples { // Ensure the tracker exists self.route_throughput .entry(route_id.clone()) .or_insert_with(|| Mutex::new(ThroughputTracker::new(retention))); // Now get a separate ref and lock it if let Some(tracker_ref) = self.route_throughput.get(route_id) { if let Ok(mut tracker) = tracker_ref.value().lock() { tracker.record_bytes(*pending_in, *pending_out); tracker.sample(); } } } // Also sample any route trackers that had no new pending bytes // (to keep their sample window advancing) for entry in self.route_throughput.iter() { if !self.route_pending_tp.contains_key(entry.key()) { if let Ok(mut tracker) = entry.value().lock() { tracker.sample(); } } } // Drain per-IP pending bytes and feed into IP throughput trackers let mut ip_samples: Vec<(String, u64, u64)> = Vec::new(); for entry in self.ip_pending_tp.iter() { let ip = entry.key().clone(); let pending_in = entry.value().0.swap(0, Ordering::Relaxed); let pending_out = entry.value().1.swap(0, Ordering::Relaxed); ip_samples.push((ip, pending_in, pending_out)); } for (ip, pending_in, pending_out) in &ip_samples { self.ip_throughput .entry(ip.clone()) .or_insert_with(|| Mutex::new(ThroughputTracker::new(retention))); if let Some(tracker_ref) = self.ip_throughput.get(ip) { if let Ok(mut tracker) = tracker_ref.value().lock() { tracker.record_bytes(*pending_in, *pending_out); tracker.sample(); } } } // Sample idle IP trackers for entry in self.ip_throughput.iter() { if !self.ip_pending_tp.contains_key(entry.key()) { if let Ok(mut tracker) = entry.value().lock() { tracker.sample(); } } } // Drain pending HTTP request count and feed into HTTP throughput tracker let pending_reqs = self.pending_http_requests.swap(0, Ordering::Relaxed); if let Ok(mut tracker) = self.http_request_throughput.lock() { // Use bytes_in field to track request count (each request = 1 "byte") tracker.record_bytes(pending_reqs, 0); tracker.sample(); } // Safety-net: prune orphaned per-IP entries that have no corresponding // ip_connections entry. This catches any entries created by a race between // record_bytes and connection_closed. self.ip_bytes_in.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_bytes_out.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k)); } /// Remove per-route metrics for route IDs that are no longer active. /// Call this after `update_routes()` to prune stale entries. pub fn retain_routes(&self, active_route_ids: &HashSet) { self.route_connections.retain(|k, _| active_route_ids.contains(k)); self.route_total_connections.retain(|k, _| active_route_ids.contains(k)); self.route_bytes_in.retain(|k, _| active_route_ids.contains(k)); self.route_bytes_out.retain(|k, _| active_route_ids.contains(k)); self.route_pending_tp.retain(|k, _| active_route_ids.contains(k)); self.route_throughput.retain(|k, _| active_route_ids.contains(k)); } /// Get current active connection count. pub fn active_connections(&self) -> u64 { self.active_connections.load(Ordering::Relaxed) } /// Get total connection count. pub fn total_connections(&self) -> u64 { self.total_connections.load(Ordering::Relaxed) } /// Get total bytes received. pub fn total_bytes_in(&self) -> u64 { self.total_bytes_in.load(Ordering::Relaxed) } /// Get total bytes sent. pub fn total_bytes_out(&self) -> u64 { self.total_bytes_out.load(Ordering::Relaxed) } /// Get a full metrics snapshot including per-route and per-IP data. pub fn snapshot(&self) -> Metrics { let mut routes = std::collections::HashMap::new(); // Get global throughput (instant = last 1 sample, recent = last 10 samples) let (global_tp_in, global_tp_out, global_recent_in, global_recent_out, throughput_history) = self.global_throughput .lock() .map(|t| { let (i_in, i_out) = t.instant(); let (r_in, r_out) = t.recent(); let history = t.history(60); (i_in, i_out, r_in, r_out, history) }) .unwrap_or((0, 0, 0, 0, Vec::new())); // Collect per-route metrics for entry in self.route_total_connections.iter() { let route_id = entry.key().clone(); let total = entry.value().load(Ordering::Relaxed); let active = self.route_connections .get(&route_id) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_in = self.route_bytes_in .get(&route_id) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_out = self.route_bytes_out .get(&route_id) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let (route_tp_in, route_tp_out, route_recent_in, route_recent_out) = self.route_throughput .get(&route_id) .and_then(|entry| entry.value().lock().ok().map(|t| { let (i_in, i_out) = t.instant(); let (r_in, r_out) = t.recent(); (i_in, i_out, r_in, r_out) })) .unwrap_or((0, 0, 0, 0)); routes.insert(route_id, RouteMetrics { active_connections: active, total_connections: total, bytes_in, bytes_out, throughput_in_bytes_per_sec: route_tp_in, throughput_out_bytes_per_sec: route_tp_out, throughput_recent_in_bytes_per_sec: route_recent_in, throughput_recent_out_bytes_per_sec: route_recent_out, }); } // Collect per-IP metrics — only IPs with active connections or total > 0, // capped at top MAX_IPS_IN_SNAPSHOT sorted by active count let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64)> = Vec::new(); for entry in self.ip_total_connections.iter() { let ip = entry.key().clone(); let total = entry.value().load(Ordering::Relaxed); let active = self.ip_connections .get(&ip) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_in = self.ip_bytes_in .get(&ip) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_out = self.ip_bytes_out .get(&ip) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let (tp_in, tp_out) = self.ip_throughput .get(&ip) .and_then(|entry| entry.value().lock().ok().map(|t| t.instant())) .unwrap_or((0, 0)); ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out)); } // Sort by active connections descending, then cap ip_entries.sort_by(|a, b| b.1.cmp(&a.1)); ip_entries.truncate(MAX_IPS_IN_SNAPSHOT); let mut ips = std::collections::HashMap::new(); for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out) in ip_entries { ips.insert(ip, IpMetrics { active_connections: active, total_connections: total, bytes_in, bytes_out, throughput_in_bytes_per_sec: tp_in, throughput_out_bytes_per_sec: tp_out, }); } // Collect per-backend metrics, capped at top MAX_BACKENDS_IN_SNAPSHOT by total connections let mut backend_entries: Vec<(String, BackendMetrics)> = Vec::new(); for entry in self.backend_total.iter() { let key = entry.key().clone(); let total = entry.value().load(Ordering::Relaxed); let active = self.backend_active .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let protocol = self.backend_protocol .get(&key) .map(|v| v.value().clone()) .unwrap_or_else(|| "unknown".to_string()); let connect_errors = self.backend_connect_errors .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let handshake_errors = self.backend_handshake_errors .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let request_errors = self.backend_request_errors .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let total_connect_time_us = self.backend_connect_time_us .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let connect_count = self.backend_connect_count .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let pool_hits = self.backend_pool_hits .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let pool_misses = self.backend_pool_misses .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let h2_failures = self.backend_h2_failures .get(&key) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); backend_entries.push((key, BackendMetrics { active_connections: active, total_connections: total, protocol, connect_errors, handshake_errors, request_errors, total_connect_time_us, connect_count, pool_hits, pool_misses, h2_failures, })); } // Sort by total connections descending, then cap backend_entries.sort_by(|a, b| b.1.total_connections.cmp(&a.1.total_connections)); backend_entries.truncate(MAX_BACKENDS_IN_SNAPSHOT); let backends: std::collections::HashMap = backend_entries.into_iter().collect(); // HTTP request rates let (http_rps, http_rps_recent) = self.http_request_throughput .lock() .map(|t| { let (instant, _) = t.instant(); let (recent, _) = t.recent(); (instant, recent) }) .unwrap_or((0, 0)); Metrics { active_connections: self.active_connections(), total_connections: self.total_connections(), bytes_in: self.total_bytes_in(), bytes_out: self.total_bytes_out(), throughput_in_bytes_per_sec: global_tp_in, throughput_out_bytes_per_sec: global_tp_out, throughput_recent_in_bytes_per_sec: global_recent_in, throughput_recent_out_bytes_per_sec: global_recent_out, routes, ips, backends, throughput_history, total_http_requests: self.total_http_requests.load(Ordering::Relaxed), http_requests_per_sec: http_rps, http_requests_per_sec_recent: http_rps_recent, } } } impl Default for MetricsCollector { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_initial_state_zeros() { let collector = MetricsCollector::new(); assert_eq!(collector.active_connections(), 0); assert_eq!(collector.total_connections(), 0); } #[test] fn test_connection_opened_increments() { let collector = MetricsCollector::new(); collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 1); assert_eq!(collector.total_connections(), 1); collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 2); assert_eq!(collector.total_connections(), 2); } #[test] fn test_connection_closed_decrements() { let collector = MetricsCollector::new(); collector.connection_opened(None, None); collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 2); collector.connection_closed(None, None); assert_eq!(collector.active_connections(), 1); // total_connections should stay at 2 assert_eq!(collector.total_connections(), 2); } #[test] fn test_route_specific_tracking() { let collector = MetricsCollector::new(); collector.connection_opened(Some("route-a"), None); collector.connection_opened(Some("route-a"), None); collector.connection_opened(Some("route-b"), None); assert_eq!(collector.active_connections(), 3); assert_eq!(collector.total_connections(), 3); collector.connection_closed(Some("route-a"), None); assert_eq!(collector.active_connections(), 2); } #[test] fn test_record_bytes() { let collector = MetricsCollector::new(); collector.record_bytes(100, 200, Some("route-a"), None); collector.record_bytes(50, 75, Some("route-a"), None); collector.record_bytes(25, 30, None, None); let total_in = collector.total_bytes_in.load(Ordering::Relaxed); let total_out = collector.total_bytes_out.load(Ordering::Relaxed); assert_eq!(total_in, 175); assert_eq!(total_out, 305); // Route-specific bytes let route_in = collector.route_bytes_in.get("route-a").unwrap(); assert_eq!(route_in.load(Ordering::Relaxed), 150); } #[test] fn test_throughput_tracking() { let collector = MetricsCollector::with_retention(60); // Open a connection so the route appears in the snapshot collector.connection_opened(Some("route-a"), None); // Record some bytes collector.record_bytes(1000, 2000, Some("route-a"), None); collector.record_bytes(500, 750, None, None); // Take a sample (simulates the 1Hz tick) collector.sample_all(); // Check global throughput let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_in_bytes_per_sec, 1500); assert_eq!(snapshot.throughput_out_bytes_per_sec, 2750); // Check per-route throughput let route_a = snapshot.routes.get("route-a").unwrap(); assert_eq!(route_a.throughput_in_bytes_per_sec, 1000); assert_eq!(route_a.throughput_out_bytes_per_sec, 2000); } #[test] fn test_throughput_zero_before_sampling() { let collector = MetricsCollector::with_retention(60); collector.record_bytes(1000, 2000, None, None); // Without sampling, throughput should be 0 let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_in_bytes_per_sec, 0); assert_eq!(snapshot.throughput_out_bytes_per_sec, 0); } #[test] fn test_per_ip_tracking() { let collector = MetricsCollector::with_retention(60); collector.connection_opened(Some("route-a"), Some("1.2.3.4")); collector.connection_opened(Some("route-a"), Some("1.2.3.4")); collector.connection_opened(Some("route-b"), Some("5.6.7.8")); // Check IP active connections (drop DashMap refs immediately to avoid deadlock) assert_eq!( collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed), 2 ); assert_eq!( collector.ip_connections.get("5.6.7.8").unwrap().load(Ordering::Relaxed), 1 ); // Record bytes per IP collector.record_bytes(100, 200, Some("route-a"), Some("1.2.3.4")); collector.record_bytes(300, 400, Some("route-b"), Some("5.6.7.8")); collector.sample_all(); let snapshot = collector.snapshot(); assert_eq!(snapshot.ips.len(), 2); let ip1_metrics = snapshot.ips.get("1.2.3.4").unwrap(); assert_eq!(ip1_metrics.active_connections, 2); assert_eq!(ip1_metrics.bytes_in, 100); // Close connections collector.connection_closed(Some("route-a"), Some("1.2.3.4")); assert_eq!( collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed), 1 ); // Close last connection for IP — should be cleaned up collector.connection_closed(Some("route-a"), Some("1.2.3.4")); assert!(collector.ip_connections.get("1.2.3.4").is_none()); } #[test] fn test_per_ip_full_eviction_on_last_close() { let collector = MetricsCollector::with_retention(60); // Open connections from two IPs collector.connection_opened(Some("route-a"), Some("10.0.0.1")); collector.connection_opened(Some("route-a"), Some("10.0.0.1")); collector.connection_opened(Some("route-b"), Some("10.0.0.2")); // Record bytes to populate per-IP DashMaps collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1")); collector.record_bytes(300, 400, Some("route-b"), Some("10.0.0.2")); collector.sample_all(); // Verify per-IP data exists assert!(collector.ip_total_connections.get("10.0.0.1").is_some()); assert!(collector.ip_bytes_in.get("10.0.0.1").is_some()); assert!(collector.ip_throughput.get("10.0.0.1").is_some()); // Close all connections for 10.0.0.1 collector.connection_closed(Some("route-a"), Some("10.0.0.1")); collector.connection_closed(Some("route-a"), Some("10.0.0.1")); // All per-IP data for 10.0.0.1 should be evicted assert!(collector.ip_connections.get("10.0.0.1").is_none()); assert!(collector.ip_total_connections.get("10.0.0.1").is_none()); assert!(collector.ip_bytes_in.get("10.0.0.1").is_none()); assert!(collector.ip_bytes_out.get("10.0.0.1").is_none()); assert!(collector.ip_pending_tp.get("10.0.0.1").is_none()); assert!(collector.ip_throughput.get("10.0.0.1").is_none()); // 10.0.0.2 should still have data assert!(collector.ip_connections.get("10.0.0.2").is_some()); assert!(collector.ip_total_connections.get("10.0.0.2").is_some()); } #[test] fn test_http_request_tracking() { let collector = MetricsCollector::with_retention(60); collector.record_http_request(); collector.record_http_request(); collector.record_http_request(); assert_eq!(collector.total_http_requests.load(Ordering::Relaxed), 3); collector.sample_all(); let snapshot = collector.snapshot(); assert_eq!(snapshot.total_http_requests, 3); assert_eq!(snapshot.http_requests_per_sec, 3); } #[test] fn test_retain_routes_prunes_stale() { let collector = MetricsCollector::with_retention(60); // Create metrics for 3 routes collector.connection_opened(Some("route-a"), None); collector.connection_opened(Some("route-b"), None); collector.connection_opened(Some("route-c"), None); collector.record_bytes(100, 200, Some("route-a"), None); collector.record_bytes(100, 200, Some("route-b"), None); collector.record_bytes(100, 200, Some("route-c"), None); collector.sample_all(); // Now "route-b" is removed from config let active = HashSet::from(["route-a".to_string(), "route-c".to_string()]); collector.retain_routes(&active); // route-b entries should be gone assert!(collector.route_connections.get("route-b").is_none()); assert!(collector.route_total_connections.get("route-b").is_none()); assert!(collector.route_bytes_in.get("route-b").is_none()); assert!(collector.route_bytes_out.get("route-b").is_none()); assert!(collector.route_throughput.get("route-b").is_none()); // route-a and route-c should still exist assert!(collector.route_total_connections.get("route-a").is_some()); assert!(collector.route_total_connections.get("route-c").is_some()); } #[test] fn test_record_bytes_after_close_no_orphan() { let collector = MetricsCollector::with_retention(60); // Open a connection, record bytes, then close collector.connection_opened(Some("route-a"), Some("10.0.0.1")); collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1")); collector.connection_closed(Some("route-a"), Some("10.0.0.1")); // IP should be fully evicted assert!(collector.ip_connections.get("10.0.0.1").is_none()); // Now record_bytes arrives late (simulates race) — should NOT re-create entries collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1")); assert!(collector.ip_bytes_in.get("10.0.0.1").is_none()); assert!(collector.ip_bytes_out.get("10.0.0.1").is_none()); assert!(collector.ip_pending_tp.get("10.0.0.1").is_none()); // Global bytes should still be counted assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150); assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275); } #[test] fn test_sample_all_prunes_orphaned_ip_entries() { let collector = MetricsCollector::with_retention(60); // Manually insert orphaned entries (simulates the race before the guard) collector.ip_bytes_in.insert("orphan-ip".to_string(), AtomicU64::new(100)); collector.ip_bytes_out.insert("orphan-ip".to_string(), AtomicU64::new(200)); collector.ip_pending_tp.insert("orphan-ip".to_string(), (AtomicU64::new(0), AtomicU64::new(0))); // No ip_connections entry for "orphan-ip" assert!(collector.ip_connections.get("orphan-ip").is_none()); // sample_all should prune the orphans collector.sample_all(); assert!(collector.ip_bytes_in.get("orphan-ip").is_none()); assert!(collector.ip_bytes_out.get("orphan-ip").is_none()); assert!(collector.ip_pending_tp.get("orphan-ip").is_none()); } #[test] fn test_throughput_history_in_snapshot() { let collector = MetricsCollector::with_retention(60); for i in 1..=5 { collector.record_bytes(i * 100, i * 200, None, None); collector.sample_all(); } let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_history.len(), 5); // History should be chronological (oldest first) assert_eq!(snapshot.throughput_history[0].bytes_in, 100); assert_eq!(snapshot.throughput_history[4].bytes_in, 500); } #[test] fn test_backend_metrics_basic() { let collector = MetricsCollector::new(); let key = "backend1:8080"; // Open connections with timing collector.backend_connection_opened(key, Duration::from_millis(15)); collector.backend_connection_opened(key, Duration::from_millis(25)); assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 2); assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2); assert_eq!(collector.backend_connect_count.get(key).unwrap().load(Ordering::Relaxed), 2); // 15ms + 25ms = 40ms = 40_000us assert_eq!(collector.backend_connect_time_us.get(key).unwrap().load(Ordering::Relaxed), 40_000); // Close one collector.backend_connection_closed(key); assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 1); // total stays assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2); // Record errors collector.backend_connect_error(key); collector.backend_handshake_error(key); collector.backend_request_error(key); collector.backend_h2_failure(key); collector.backend_pool_hit(key); collector.backend_pool_hit(key); collector.backend_pool_miss(key); assert_eq!(collector.backend_connect_errors.get(key).unwrap().load(Ordering::Relaxed), 1); assert_eq!(collector.backend_handshake_errors.get(key).unwrap().load(Ordering::Relaxed), 1); assert_eq!(collector.backend_request_errors.get(key).unwrap().load(Ordering::Relaxed), 1); assert_eq!(collector.backend_h2_failures.get(key).unwrap().load(Ordering::Relaxed), 1); assert_eq!(collector.backend_pool_hits.get(key).unwrap().load(Ordering::Relaxed), 2); assert_eq!(collector.backend_pool_misses.get(key).unwrap().load(Ordering::Relaxed), 1); // Protocol collector.set_backend_protocol(key, "h1"); assert_eq!(collector.backend_protocol.get(key).unwrap().value(), "h1"); collector.set_backend_protocol(key, "h2"); assert_eq!(collector.backend_protocol.get(key).unwrap().value(), "h2"); } #[test] fn test_backend_metrics_in_snapshot() { let collector = MetricsCollector::new(); collector.backend_connection_opened("b1:443", Duration::from_millis(10)); collector.backend_connection_opened("b2:8080", Duration::from_millis(20)); collector.set_backend_protocol("b1:443", "h2"); collector.set_backend_protocol("b2:8080", "h1"); collector.backend_connect_error("b1:443"); let snapshot = collector.snapshot(); assert_eq!(snapshot.backends.len(), 2); let b1 = snapshot.backends.get("b1:443").unwrap(); assert_eq!(b1.active_connections, 1); assert_eq!(b1.total_connections, 1); assert_eq!(b1.protocol, "h2"); assert_eq!(b1.connect_errors, 1); assert_eq!(b1.total_connect_time_us, 10_000); assert_eq!(b1.connect_count, 1); let b2 = snapshot.backends.get("b2:8080").unwrap(); assert_eq!(b2.protocol, "h1"); assert_eq!(b2.connect_errors, 0); } #[test] fn test_retain_backends_prunes_stale() { let collector = MetricsCollector::new(); collector.backend_connection_opened("active:443", Duration::from_millis(5)); collector.backend_connection_opened("stale:8080", Duration::from_millis(10)); collector.set_backend_protocol("active:443", "h1"); collector.set_backend_protocol("stale:8080", "h2"); collector.backend_connect_error("stale:8080"); let active = HashSet::from(["active:443".to_string()]); collector.retain_backends(&active); // active:443 should still exist assert!(collector.backend_total.get("active:443").is_some()); assert!(collector.backend_protocol.get("active:443").is_some()); // stale:8080 should be fully removed assert!(collector.backend_active.get("stale:8080").is_none()); assert!(collector.backend_total.get("stale:8080").is_none()); assert!(collector.backend_protocol.get("stale:8080").is_none()); assert!(collector.backend_connect_errors.get("stale:8080").is_none()); assert!(collector.backend_connect_time_us.get("stale:8080").is_none()); assert!(collector.backend_connect_count.get("stale:8080").is_none()); assert!(collector.backend_pool_hits.get("stale:8080").is_none()); assert!(collector.backend_pool_misses.get("stale:8080").is_none()); assert!(collector.backend_h2_failures.get("stale:8080").is_none()); } #[test] fn test_backend_connection_closed_saturates() { let collector = MetricsCollector::new(); let key = "b:80"; // Close without opening — should not underflow collector.backend_connection_closed(key); // No entry created assert!(collector.backend_active.get(key).is_none()); // Open one, close two — should saturate at 0 collector.backend_connection_opened(key, Duration::from_millis(1)); collector.backend_connection_closed(key); collector.backend_connection_closed(key); assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 0); } }