use dashmap::DashMap; use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; use crate::throughput::{ThroughputSample, ThroughputTracker}; /// Aggregated metrics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Metrics { pub active_connections: u64, pub total_connections: u64, pub bytes_in: u64, pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, pub throughput_recent_in_bytes_per_sec: u64, pub throughput_recent_out_bytes_per_sec: u64, pub routes: std::collections::HashMap, pub ips: std::collections::HashMap, pub throughput_history: Vec, pub total_http_requests: u64, pub http_requests_per_sec: u64, pub http_requests_per_sec_recent: u64, } /// Per-route metrics. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RouteMetrics { pub active_connections: u64, pub total_connections: u64, pub bytes_in: u64, pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, pub throughput_recent_in_bytes_per_sec: u64, pub throughput_recent_out_bytes_per_sec: u64, } /// Per-IP metrics. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct IpMetrics { pub active_connections: u64, pub total_connections: u64, pub bytes_in: u64, pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, } /// Statistics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Statistics { pub active_connections: u64, pub total_connections: u64, pub routes_count: u64, pub listening_ports: Vec, pub uptime_seconds: u64, } /// Default retention for throughput samples (1 hour). const DEFAULT_RETENTION_SECONDS: usize = 3600; /// Maximum number of IPs to include in a snapshot (top by active connections). const MAX_IPS_IN_SNAPSHOT: usize = 100; /// Metrics collector tracking connections and throughput. /// /// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches /// `AtomicU64` counters. The cold path (`sample_all`, called at 1Hz) drains /// those atomics and feeds the throughput trackers under a Mutex. This avoids /// contention when `record_bytes` is called per-chunk in the TCP copy loop. pub struct MetricsCollector { active_connections: AtomicU64, total_connections: AtomicU64, total_bytes_in: AtomicU64, total_bytes_out: AtomicU64, /// Per-route active connection counts route_connections: DashMap, /// Per-route total connection counts route_total_connections: DashMap, /// Per-route byte counters route_bytes_in: DashMap, route_bytes_out: DashMap, // ── Per-IP tracking ── ip_connections: DashMap, ip_total_connections: DashMap, ip_bytes_in: DashMap, ip_bytes_out: DashMap, ip_pending_tp: DashMap, ip_throughput: DashMap>, // ── HTTP request tracking ── total_http_requests: AtomicU64, pending_http_requests: AtomicU64, http_request_throughput: Mutex, // ── Lock-free pending throughput counters (hot path) ── global_pending_tp_in: AtomicU64, global_pending_tp_out: AtomicU64, route_pending_tp: DashMap, // ── Throughput history — only locked during sampling (cold path) ── global_throughput: Mutex, route_throughput: DashMap>, retention_seconds: usize, } impl MetricsCollector { pub fn new() -> Self { Self::with_retention(DEFAULT_RETENTION_SECONDS) } /// Create a MetricsCollector with a custom retention period for throughput history. pub fn with_retention(retention_seconds: usize) -> Self { Self { active_connections: AtomicU64::new(0), total_connections: AtomicU64::new(0), total_bytes_in: AtomicU64::new(0), total_bytes_out: AtomicU64::new(0), route_connections: DashMap::new(), route_total_connections: DashMap::new(), route_bytes_in: DashMap::new(), route_bytes_out: DashMap::new(), ip_connections: DashMap::new(), ip_total_connections: DashMap::new(), ip_bytes_in: DashMap::new(), ip_bytes_out: DashMap::new(), ip_pending_tp: DashMap::new(), ip_throughput: DashMap::new(), total_http_requests: AtomicU64::new(0), pending_http_requests: AtomicU64::new(0), http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), global_pending_tp_in: AtomicU64::new(0), global_pending_tp_out: AtomicU64::new(0), route_pending_tp: DashMap::new(), global_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), route_throughput: DashMap::new(), retention_seconds, } } /// Record a new connection. pub fn connection_opened(&self, route_id: Option<&str>, source_ip: Option<&str>) { self.active_connections.fetch_add(1, Ordering::Relaxed); self.total_connections.fetch_add(1, Ordering::Relaxed); if let Some(route_id) = route_id { self.route_connections .entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); self.route_total_connections .entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } if let Some(ip) = source_ip { self.ip_connections .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); self.ip_total_connections .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } } /// Record a connection closing. pub fn connection_closed(&self, route_id: Option<&str>, source_ip: Option<&str>) { self.active_connections.fetch_sub(1, Ordering::Relaxed); if let Some(route_id) = route_id { if let Some(counter) = self.route_connections.get(route_id) { let val = counter.load(Ordering::Relaxed); if val > 0 { counter.fetch_sub(1, Ordering::Relaxed); } } } if let Some(ip) = source_ip { if let Some(counter) = self.ip_connections.get(ip) { let val = counter.load(Ordering::Relaxed); if val > 0 { counter.fetch_sub(1, Ordering::Relaxed); } // Clean up zero-count entries to prevent memory growth if val <= 1 { drop(counter); self.ip_connections.remove(ip); } } } } /// Record bytes transferred (lock-free hot path). /// /// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters — /// no Mutex is taken. The throughput trackers are fed during `sample_all()`. pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64, route_id: Option<&str>, source_ip: Option<&str>) { self.total_bytes_in.fetch_add(bytes_in, Ordering::Relaxed); self.total_bytes_out.fetch_add(bytes_out, Ordering::Relaxed); // Accumulate into lock-free pending throughput counters self.global_pending_tp_in.fetch_add(bytes_in, Ordering::Relaxed); self.global_pending_tp_out.fetch_add(bytes_out, Ordering::Relaxed); if let Some(route_id) = route_id { self.route_bytes_in .entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_in, Ordering::Relaxed); self.route_bytes_out .entry(route_id.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_out, Ordering::Relaxed); // Accumulate into per-route pending throughput counters (lock-free) let entry = self.route_pending_tp .entry(route_id.to_string()) .or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0))); entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } if let Some(ip) = source_ip { self.ip_bytes_in .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_in, Ordering::Relaxed); self.ip_bytes_out .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(bytes_out, Ordering::Relaxed); // Accumulate into per-IP pending throughput counters (lock-free) let entry = self.ip_pending_tp .entry(ip.to_string()) .or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0))); entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } } /// Record an HTTP request (called once per request in the HTTP proxy). pub fn record_http_request(&self) { self.total_http_requests.fetch_add(1, Ordering::Relaxed); self.pending_http_requests.fetch_add(1, Ordering::Relaxed); } /// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval). /// /// Drains the lock-free pending counters and feeds the accumulated bytes /// into the throughput trackers (under Mutex). This is the only place /// the Mutex is locked. pub fn sample_all(&self) { // Drain global pending bytes and feed into the tracker let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed); let global_out = self.global_pending_tp_out.swap(0, Ordering::Relaxed); if let Ok(mut tracker) = self.global_throughput.lock() { tracker.record_bytes(global_in, global_out); tracker.sample(); } // Drain per-route pending bytes; collect into a Vec to avoid holding DashMap shards let mut route_samples: Vec<(String, u64, u64)> = Vec::new(); for entry in self.route_pending_tp.iter() { let route_id = entry.key().clone(); let pending_in = entry.value().0.swap(0, Ordering::Relaxed); let pending_out = entry.value().1.swap(0, Ordering::Relaxed); route_samples.push((route_id, pending_in, pending_out)); } // Feed pending bytes into route trackers and sample let retention = self.retention_seconds; for (route_id, pending_in, pending_out) in &route_samples { // Ensure the tracker exists self.route_throughput .entry(route_id.clone()) .or_insert_with(|| Mutex::new(ThroughputTracker::new(retention))); // Now get a separate ref and lock it if let Some(tracker_ref) = self.route_throughput.get(route_id) { if let Ok(mut tracker) = tracker_ref.value().lock() { tracker.record_bytes(*pending_in, *pending_out); tracker.sample(); } } } // Also sample any route trackers that had no new pending bytes // (to keep their sample window advancing) for entry in self.route_throughput.iter() { if !self.route_pending_tp.contains_key(entry.key()) { if let Ok(mut tracker) = entry.value().lock() { tracker.sample(); } } } // Drain per-IP pending bytes and feed into IP throughput trackers let mut ip_samples: Vec<(String, u64, u64)> = Vec::new(); for entry in self.ip_pending_tp.iter() { let ip = entry.key().clone(); let pending_in = entry.value().0.swap(0, Ordering::Relaxed); let pending_out = entry.value().1.swap(0, Ordering::Relaxed); ip_samples.push((ip, pending_in, pending_out)); } for (ip, pending_in, pending_out) in &ip_samples { self.ip_throughput .entry(ip.clone()) .or_insert_with(|| Mutex::new(ThroughputTracker::new(retention))); if let Some(tracker_ref) = self.ip_throughput.get(ip) { if let Ok(mut tracker) = tracker_ref.value().lock() { tracker.record_bytes(*pending_in, *pending_out); tracker.sample(); } } } // Sample idle IP trackers for entry in self.ip_throughput.iter() { if !self.ip_pending_tp.contains_key(entry.key()) { if let Ok(mut tracker) = entry.value().lock() { tracker.sample(); } } } // Drain pending HTTP request count and feed into HTTP throughput tracker let pending_reqs = self.pending_http_requests.swap(0, Ordering::Relaxed); if let Ok(mut tracker) = self.http_request_throughput.lock() { // Use bytes_in field to track request count (each request = 1 "byte") tracker.record_bytes(pending_reqs, 0); tracker.sample(); } } /// Get current active connection count. pub fn active_connections(&self) -> u64 { self.active_connections.load(Ordering::Relaxed) } /// Get total connection count. pub fn total_connections(&self) -> u64 { self.total_connections.load(Ordering::Relaxed) } /// Get total bytes received. pub fn total_bytes_in(&self) -> u64 { self.total_bytes_in.load(Ordering::Relaxed) } /// Get total bytes sent. pub fn total_bytes_out(&self) -> u64 { self.total_bytes_out.load(Ordering::Relaxed) } /// Get a full metrics snapshot including per-route and per-IP data. pub fn snapshot(&self) -> Metrics { let mut routes = std::collections::HashMap::new(); // Get global throughput (instant = last 1 sample, recent = last 10 samples) let (global_tp_in, global_tp_out, global_recent_in, global_recent_out, throughput_history) = self.global_throughput .lock() .map(|t| { let (i_in, i_out) = t.instant(); let (r_in, r_out) = t.recent(); let history = t.history(60); (i_in, i_out, r_in, r_out, history) }) .unwrap_or((0, 0, 0, 0, Vec::new())); // Collect per-route metrics for entry in self.route_total_connections.iter() { let route_id = entry.key().clone(); let total = entry.value().load(Ordering::Relaxed); let active = self.route_connections .get(&route_id) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_in = self.route_bytes_in .get(&route_id) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_out = self.route_bytes_out .get(&route_id) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let (route_tp_in, route_tp_out, route_recent_in, route_recent_out) = self.route_throughput .get(&route_id) .and_then(|entry| entry.value().lock().ok().map(|t| { let (i_in, i_out) = t.instant(); let (r_in, r_out) = t.recent(); (i_in, i_out, r_in, r_out) })) .unwrap_or((0, 0, 0, 0)); routes.insert(route_id, RouteMetrics { active_connections: active, total_connections: total, bytes_in, bytes_out, throughput_in_bytes_per_sec: route_tp_in, throughput_out_bytes_per_sec: route_tp_out, throughput_recent_in_bytes_per_sec: route_recent_in, throughput_recent_out_bytes_per_sec: route_recent_out, }); } // Collect per-IP metrics — only IPs with active connections or total > 0, // capped at top MAX_IPS_IN_SNAPSHOT sorted by active count let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64)> = Vec::new(); for entry in self.ip_total_connections.iter() { let ip = entry.key().clone(); let total = entry.value().load(Ordering::Relaxed); let active = self.ip_connections .get(&ip) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_in = self.ip_bytes_in .get(&ip) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let bytes_out = self.ip_bytes_out .get(&ip) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0); let (tp_in, tp_out) = self.ip_throughput .get(&ip) .and_then(|entry| entry.value().lock().ok().map(|t| t.instant())) .unwrap_or((0, 0)); ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out)); } // Sort by active connections descending, then cap ip_entries.sort_by(|a, b| b.1.cmp(&a.1)); ip_entries.truncate(MAX_IPS_IN_SNAPSHOT); let mut ips = std::collections::HashMap::new(); for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out) in ip_entries { ips.insert(ip, IpMetrics { active_connections: active, total_connections: total, bytes_in, bytes_out, throughput_in_bytes_per_sec: tp_in, throughput_out_bytes_per_sec: tp_out, }); } // HTTP request rates let (http_rps, http_rps_recent) = self.http_request_throughput .lock() .map(|t| { let (instant, _) = t.instant(); let (recent, _) = t.recent(); (instant, recent) }) .unwrap_or((0, 0)); Metrics { active_connections: self.active_connections(), total_connections: self.total_connections(), bytes_in: self.total_bytes_in(), bytes_out: self.total_bytes_out(), throughput_in_bytes_per_sec: global_tp_in, throughput_out_bytes_per_sec: global_tp_out, throughput_recent_in_bytes_per_sec: global_recent_in, throughput_recent_out_bytes_per_sec: global_recent_out, routes, ips, throughput_history, total_http_requests: self.total_http_requests.load(Ordering::Relaxed), http_requests_per_sec: http_rps, http_requests_per_sec_recent: http_rps_recent, } } } impl Default for MetricsCollector { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_initial_state_zeros() { let collector = MetricsCollector::new(); assert_eq!(collector.active_connections(), 0); assert_eq!(collector.total_connections(), 0); } #[test] fn test_connection_opened_increments() { let collector = MetricsCollector::new(); collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 1); assert_eq!(collector.total_connections(), 1); collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 2); assert_eq!(collector.total_connections(), 2); } #[test] fn test_connection_closed_decrements() { let collector = MetricsCollector::new(); collector.connection_opened(None, None); collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 2); collector.connection_closed(None, None); assert_eq!(collector.active_connections(), 1); // total_connections should stay at 2 assert_eq!(collector.total_connections(), 2); } #[test] fn test_route_specific_tracking() { let collector = MetricsCollector::new(); collector.connection_opened(Some("route-a"), None); collector.connection_opened(Some("route-a"), None); collector.connection_opened(Some("route-b"), None); assert_eq!(collector.active_connections(), 3); assert_eq!(collector.total_connections(), 3); collector.connection_closed(Some("route-a"), None); assert_eq!(collector.active_connections(), 2); } #[test] fn test_record_bytes() { let collector = MetricsCollector::new(); collector.record_bytes(100, 200, Some("route-a"), None); collector.record_bytes(50, 75, Some("route-a"), None); collector.record_bytes(25, 30, None, None); let total_in = collector.total_bytes_in.load(Ordering::Relaxed); let total_out = collector.total_bytes_out.load(Ordering::Relaxed); assert_eq!(total_in, 175); assert_eq!(total_out, 305); // Route-specific bytes let route_in = collector.route_bytes_in.get("route-a").unwrap(); assert_eq!(route_in.load(Ordering::Relaxed), 150); } #[test] fn test_throughput_tracking() { let collector = MetricsCollector::with_retention(60); // Open a connection so the route appears in the snapshot collector.connection_opened(Some("route-a"), None); // Record some bytes collector.record_bytes(1000, 2000, Some("route-a"), None); collector.record_bytes(500, 750, None, None); // Take a sample (simulates the 1Hz tick) collector.sample_all(); // Check global throughput let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_in_bytes_per_sec, 1500); assert_eq!(snapshot.throughput_out_bytes_per_sec, 2750); // Check per-route throughput let route_a = snapshot.routes.get("route-a").unwrap(); assert_eq!(route_a.throughput_in_bytes_per_sec, 1000); assert_eq!(route_a.throughput_out_bytes_per_sec, 2000); } #[test] fn test_throughput_zero_before_sampling() { let collector = MetricsCollector::with_retention(60); collector.record_bytes(1000, 2000, None, None); // Without sampling, throughput should be 0 let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_in_bytes_per_sec, 0); assert_eq!(snapshot.throughput_out_bytes_per_sec, 0); } #[test] fn test_per_ip_tracking() { let collector = MetricsCollector::with_retention(60); collector.connection_opened(Some("route-a"), Some("1.2.3.4")); collector.connection_opened(Some("route-a"), Some("1.2.3.4")); collector.connection_opened(Some("route-b"), Some("5.6.7.8")); // Check IP active connections (drop DashMap refs immediately to avoid deadlock) assert_eq!( collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed), 2 ); assert_eq!( collector.ip_connections.get("5.6.7.8").unwrap().load(Ordering::Relaxed), 1 ); // Record bytes per IP collector.record_bytes(100, 200, Some("route-a"), Some("1.2.3.4")); collector.record_bytes(300, 400, Some("route-b"), Some("5.6.7.8")); collector.sample_all(); let snapshot = collector.snapshot(); assert_eq!(snapshot.ips.len(), 2); let ip1_metrics = snapshot.ips.get("1.2.3.4").unwrap(); assert_eq!(ip1_metrics.active_connections, 2); assert_eq!(ip1_metrics.bytes_in, 100); // Close connections collector.connection_closed(Some("route-a"), Some("1.2.3.4")); assert_eq!( collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed), 1 ); // Close last connection for IP — should be cleaned up collector.connection_closed(Some("route-a"), Some("1.2.3.4")); assert!(collector.ip_connections.get("1.2.3.4").is_none()); } #[test] fn test_http_request_tracking() { let collector = MetricsCollector::with_retention(60); collector.record_http_request(); collector.record_http_request(); collector.record_http_request(); assert_eq!(collector.total_http_requests.load(Ordering::Relaxed), 3); collector.sample_all(); let snapshot = collector.snapshot(); assert_eq!(snapshot.total_http_requests, 3); assert_eq!(snapshot.http_requests_per_sec, 3); } #[test] fn test_throughput_history_in_snapshot() { let collector = MetricsCollector::with_retention(60); for i in 1..=5 { collector.record_bytes(i * 100, i * 200, None, None); collector.sample_all(); } let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_history.len(), 5); // History should be chronological (oldest first) assert_eq!(snapshot.throughput_history[0].bytes_in, 100); assert_eq!(snapshot.throughput_history[4].bytes_in, 500); } }