Files
smartproxy/rust/crates/rustproxy-metrics/src/collector.rs

1168 lines
48 KiB
Rust

use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use crate::throughput::{ThroughputSample, ThroughputTracker};
/// Aggregated metrics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Metrics {
pub active_connections: u64,
pub total_connections: u64,
pub bytes_in: u64,
pub bytes_out: u64,
pub throughput_in_bytes_per_sec: u64,
pub throughput_out_bytes_per_sec: u64,
pub throughput_recent_in_bytes_per_sec: u64,
pub throughput_recent_out_bytes_per_sec: u64,
pub routes: std::collections::HashMap<String, RouteMetrics>,
pub ips: std::collections::HashMap<String, IpMetrics>,
pub backends: std::collections::HashMap<String, BackendMetrics>,
pub throughput_history: Vec<ThroughputSample>,
pub total_http_requests: u64,
pub http_requests_per_sec: u64,
pub http_requests_per_sec_recent: u64,
}
/// Per-route metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RouteMetrics {
pub active_connections: u64,
pub total_connections: u64,
pub bytes_in: u64,
pub bytes_out: u64,
pub throughput_in_bytes_per_sec: u64,
pub throughput_out_bytes_per_sec: u64,
pub throughput_recent_in_bytes_per_sec: u64,
pub throughput_recent_out_bytes_per_sec: u64,
}
/// Per-IP metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IpMetrics {
pub active_connections: u64,
pub total_connections: u64,
pub bytes_in: u64,
pub bytes_out: u64,
pub throughput_in_bytes_per_sec: u64,
pub throughput_out_bytes_per_sec: u64,
}
/// Per-backend metrics (keyed by "host:port").
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BackendMetrics {
pub active_connections: u64,
pub total_connections: u64,
pub protocol: String,
pub connect_errors: u64,
pub handshake_errors: u64,
pub request_errors: u64,
pub total_connect_time_us: u64,
pub connect_count: u64,
pub pool_hits: u64,
pub pool_misses: u64,
pub h2_failures: u64,
}
/// Statistics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Statistics {
pub active_connections: u64,
pub total_connections: u64,
pub routes_count: u64,
pub listening_ports: Vec<u16>,
pub uptime_seconds: u64,
}
/// Default retention for throughput samples (1 hour).
const DEFAULT_RETENTION_SECONDS: usize = 3600;
/// Maximum number of IPs to include in a snapshot (top by active connections).
const MAX_IPS_IN_SNAPSHOT: usize = 100;
/// Maximum number of backends to include in a snapshot (top by total connections).
const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
/// Metrics collector tracking connections and throughput.
///
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
/// `AtomicU64` counters. The cold path (`sample_all`, called at 1Hz) drains
/// those atomics and feeds the throughput trackers under a Mutex. This avoids
/// contention when `record_bytes` is called per-chunk in the TCP copy loop.
pub struct MetricsCollector {
active_connections: AtomicU64,
total_connections: AtomicU64,
total_bytes_in: AtomicU64,
total_bytes_out: AtomicU64,
/// Per-route active connection counts
route_connections: DashMap<String, AtomicU64>,
/// Per-route total connection counts
route_total_connections: DashMap<String, AtomicU64>,
/// Per-route byte counters
route_bytes_in: DashMap<String, AtomicU64>,
route_bytes_out: DashMap<String, AtomicU64>,
// ── Per-IP tracking ──
ip_connections: DashMap<String, AtomicU64>,
ip_total_connections: DashMap<String, AtomicU64>,
ip_bytes_in: DashMap<String, AtomicU64>,
ip_bytes_out: DashMap<String, AtomicU64>,
ip_pending_tp: DashMap<String, (AtomicU64, AtomicU64)>,
ip_throughput: DashMap<String, Mutex<ThroughputTracker>>,
// ── Per-backend tracking (keyed by "host:port") ──
backend_active: DashMap<String, AtomicU64>,
backend_total: DashMap<String, AtomicU64>,
backend_protocol: DashMap<String, String>,
backend_connect_errors: DashMap<String, AtomicU64>,
backend_handshake_errors: DashMap<String, AtomicU64>,
backend_request_errors: DashMap<String, AtomicU64>,
backend_connect_time_us: DashMap<String, AtomicU64>,
backend_connect_count: DashMap<String, AtomicU64>,
backend_pool_hits: DashMap<String, AtomicU64>,
backend_pool_misses: DashMap<String, AtomicU64>,
backend_h2_failures: DashMap<String, AtomicU64>,
// ── HTTP request tracking ──
total_http_requests: AtomicU64,
pending_http_requests: AtomicU64,
http_request_throughput: Mutex<ThroughputTracker>,
// ── Lock-free pending throughput counters (hot path) ──
global_pending_tp_in: AtomicU64,
global_pending_tp_out: AtomicU64,
route_pending_tp: DashMap<String, (AtomicU64, AtomicU64)>,
// ── Throughput history — only locked during sampling (cold path) ──
global_throughput: Mutex<ThroughputTracker>,
route_throughput: DashMap<String, Mutex<ThroughputTracker>>,
retention_seconds: usize,
}
impl MetricsCollector {
pub fn new() -> Self {
Self::with_retention(DEFAULT_RETENTION_SECONDS)
}
/// Create a MetricsCollector with a custom retention period for throughput history.
pub fn with_retention(retention_seconds: usize) -> Self {
Self {
active_connections: AtomicU64::new(0),
total_connections: AtomicU64::new(0),
total_bytes_in: AtomicU64::new(0),
total_bytes_out: AtomicU64::new(0),
route_connections: DashMap::new(),
route_total_connections: DashMap::new(),
route_bytes_in: DashMap::new(),
route_bytes_out: DashMap::new(),
ip_connections: DashMap::new(),
ip_total_connections: DashMap::new(),
ip_bytes_in: DashMap::new(),
ip_bytes_out: DashMap::new(),
ip_pending_tp: DashMap::new(),
ip_throughput: DashMap::new(),
backend_active: DashMap::new(),
backend_total: DashMap::new(),
backend_protocol: DashMap::new(),
backend_connect_errors: DashMap::new(),
backend_handshake_errors: DashMap::new(),
backend_request_errors: DashMap::new(),
backend_connect_time_us: DashMap::new(),
backend_connect_count: DashMap::new(),
backend_pool_hits: DashMap::new(),
backend_pool_misses: DashMap::new(),
backend_h2_failures: DashMap::new(),
total_http_requests: AtomicU64::new(0),
pending_http_requests: AtomicU64::new(0),
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
global_pending_tp_in: AtomicU64::new(0),
global_pending_tp_out: AtomicU64::new(0),
route_pending_tp: DashMap::new(),
global_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
route_throughput: DashMap::new(),
retention_seconds,
}
}
/// Record a new connection.
pub fn connection_opened(&self, route_id: Option<&str>, source_ip: Option<&str>) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.total_connections.fetch_add(1, Ordering::Relaxed);
if let Some(route_id) = route_id {
self.route_connections
.entry(route_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.route_total_connections
.entry(route_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
if let Some(ip) = source_ip {
self.ip_connections
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.ip_total_connections
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
}
/// Record a connection closing.
pub fn connection_closed(&self, route_id: Option<&str>, source_ip: Option<&str>) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
if let Some(route_id) = route_id {
if let Some(counter) = self.route_connections.get(route_id) {
let val = counter.load(Ordering::Relaxed);
if val > 0 {
counter.fetch_sub(1, Ordering::Relaxed);
}
}
}
if let Some(ip) = source_ip {
if let Some(counter) = self.ip_connections.get(ip) {
let val = counter.load(Ordering::Relaxed);
if val > 0 {
counter.fetch_sub(1, Ordering::Relaxed);
}
// Clean up zero-count entries to prevent memory growth
if val <= 1 {
drop(counter);
self.ip_connections.remove(ip);
// Evict all per-IP tracking data for this IP
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
}
}
}
}
/// Record bytes transferred (lock-free hot path).
///
/// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters —
/// no Mutex is taken. The throughput trackers are fed during `sample_all()`.
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64, route_id: Option<&str>, source_ip: Option<&str>) {
self.total_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.total_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
// Accumulate into lock-free pending throughput counters
self.global_pending_tp_in.fetch_add(bytes_in, Ordering::Relaxed);
self.global_pending_tp_out.fetch_add(bytes_out, Ordering::Relaxed);
// Per-route tracking: use get() first (zero-alloc fast path for existing entries),
// fall back to entry() with to_string() only on the rare first-chunk miss.
if let Some(route_id) = route_id {
if let Some(counter) = self.route_bytes_in.get(route_id) {
counter.fetch_add(bytes_in, Ordering::Relaxed);
} else {
self.route_bytes_in.entry(route_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_in, Ordering::Relaxed);
}
if let Some(counter) = self.route_bytes_out.get(route_id) {
counter.fetch_add(bytes_out, Ordering::Relaxed);
} else {
self.route_bytes_out.entry(route_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_out, Ordering::Relaxed);
}
// Accumulate into per-route pending throughput counters (lock-free)
if let Some(entry) = self.route_pending_tp.get(route_id) {
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
} else {
let entry = self.route_pending_tp.entry(route_id.to_string())
.or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0)));
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
}
// Per-IP tracking: same get()-first pattern to avoid String allocation on hot path.
if let Some(ip) = source_ip {
// Only record per-IP stats if the IP still has active connections.
// This prevents orphaned entries when record_bytes races with
// connection_closed (which evicts all per-IP data on last close).
if self.ip_connections.contains_key(ip) {
if let Some(counter) = self.ip_bytes_in.get(ip) {
counter.fetch_add(bytes_in, Ordering::Relaxed);
} else {
self.ip_bytes_in.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_in, Ordering::Relaxed);
}
if let Some(counter) = self.ip_bytes_out.get(ip) {
counter.fetch_add(bytes_out, Ordering::Relaxed);
} else {
self.ip_bytes_out.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_out, Ordering::Relaxed);
}
// Accumulate into per-IP pending throughput counters (lock-free)
if let Some(entry) = self.ip_pending_tp.get(ip) {
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
} else {
let entry = self.ip_pending_tp.entry(ip.to_string())
.or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0)));
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
}
}
}
/// Record an HTTP request (called once per request in the HTTP proxy).
pub fn record_http_request(&self) {
self.total_http_requests.fetch_add(1, Ordering::Relaxed);
self.pending_http_requests.fetch_add(1, Ordering::Relaxed);
}
// ── Per-backend recording methods ──
/// Record a successful backend connection with its connect duration.
pub fn backend_connection_opened(&self, key: &str, connect_time: Duration) {
self.backend_active
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.backend_total
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.backend_connect_time_us
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(connect_time.as_micros() as u64, Ordering::Relaxed);
self.backend_connect_count
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Record a backend connection closing.
pub fn backend_connection_closed(&self, key: &str) {
if let Some(counter) = self.backend_active.get(key) {
let val = counter.load(Ordering::Relaxed);
if val > 0 {
counter.fetch_sub(1, Ordering::Relaxed);
}
}
}
/// Record a backend connect error (TCP or TLS connect failure/timeout).
pub fn backend_connect_error(&self, key: &str) {
self.backend_connect_errors
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Record a backend handshake error (H1 or H2 handshake failure).
pub fn backend_handshake_error(&self, key: &str) {
self.backend_handshake_errors
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Record a backend request error (send_request failure).
pub fn backend_request_error(&self, key: &str) {
self.backend_request_errors
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Record a connection pool hit for a backend.
pub fn backend_pool_hit(&self, key: &str) {
self.backend_pool_hits
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Record a connection pool miss for a backend.
pub fn backend_pool_miss(&self, key: &str) {
self.backend_pool_misses
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Record an H2 failure (h2 attempted but fell back to h1).
pub fn backend_h2_failure(&self, key: &str) {
self.backend_h2_failures
.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
/// Set the protocol in use for a backend ("h1" or "h2").
pub fn set_backend_protocol(&self, key: &str, protocol: &str) {
self.backend_protocol
.entry(key.to_string())
.and_modify(|v| {
if v != protocol {
*v = protocol.to_string();
}
})
.or_insert_with(|| protocol.to_string());
}
/// Remove per-backend metrics for backends no longer in any route target.
pub fn retain_backends(&self, active_backends: &HashSet<String>) {
self.backend_active.retain(|k, _| active_backends.contains(k));
self.backend_total.retain(|k, _| active_backends.contains(k));
self.backend_protocol.retain(|k, _| active_backends.contains(k));
self.backend_connect_errors.retain(|k, _| active_backends.contains(k));
self.backend_handshake_errors.retain(|k, _| active_backends.contains(k));
self.backend_request_errors.retain(|k, _| active_backends.contains(k));
self.backend_connect_time_us.retain(|k, _| active_backends.contains(k));
self.backend_connect_count.retain(|k, _| active_backends.contains(k));
self.backend_pool_hits.retain(|k, _| active_backends.contains(k));
self.backend_pool_misses.retain(|k, _| active_backends.contains(k));
self.backend_h2_failures.retain(|k, _| active_backends.contains(k));
}
/// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval).
///
/// Drains the lock-free pending counters and feeds the accumulated bytes
/// into the throughput trackers (under Mutex). This is the only place
/// the Mutex is locked.
pub fn sample_all(&self) {
// Drain global pending bytes and feed into the tracker
let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed);
let global_out = self.global_pending_tp_out.swap(0, Ordering::Relaxed);
if let Ok(mut tracker) = self.global_throughput.lock() {
tracker.record_bytes(global_in, global_out);
tracker.sample();
}
// Drain per-route pending bytes; collect into a Vec to avoid holding DashMap shards
let mut route_samples: Vec<(String, u64, u64)> = Vec::new();
for entry in self.route_pending_tp.iter() {
let route_id = entry.key().clone();
let pending_in = entry.value().0.swap(0, Ordering::Relaxed);
let pending_out = entry.value().1.swap(0, Ordering::Relaxed);
route_samples.push((route_id, pending_in, pending_out));
}
// Feed pending bytes into route trackers and sample
let retention = self.retention_seconds;
for (route_id, pending_in, pending_out) in &route_samples {
// Ensure the tracker exists
self.route_throughput
.entry(route_id.clone())
.or_insert_with(|| Mutex::new(ThroughputTracker::new(retention)));
// Now get a separate ref and lock it
if let Some(tracker_ref) = self.route_throughput.get(route_id) {
if let Ok(mut tracker) = tracker_ref.value().lock() {
tracker.record_bytes(*pending_in, *pending_out);
tracker.sample();
}
}
}
// Also sample any route trackers that had no new pending bytes
// (to keep their sample window advancing)
for entry in self.route_throughput.iter() {
if !self.route_pending_tp.contains_key(entry.key()) {
if let Ok(mut tracker) = entry.value().lock() {
tracker.sample();
}
}
}
// Drain per-IP pending bytes and feed into IP throughput trackers
let mut ip_samples: Vec<(String, u64, u64)> = Vec::new();
for entry in self.ip_pending_tp.iter() {
let ip = entry.key().clone();
let pending_in = entry.value().0.swap(0, Ordering::Relaxed);
let pending_out = entry.value().1.swap(0, Ordering::Relaxed);
ip_samples.push((ip, pending_in, pending_out));
}
for (ip, pending_in, pending_out) in &ip_samples {
self.ip_throughput
.entry(ip.clone())
.or_insert_with(|| Mutex::new(ThroughputTracker::new(retention)));
if let Some(tracker_ref) = self.ip_throughput.get(ip) {
if let Ok(mut tracker) = tracker_ref.value().lock() {
tracker.record_bytes(*pending_in, *pending_out);
tracker.sample();
}
}
}
// Sample idle IP trackers
for entry in self.ip_throughput.iter() {
if !self.ip_pending_tp.contains_key(entry.key()) {
if let Ok(mut tracker) = entry.value().lock() {
tracker.sample();
}
}
}
// Drain pending HTTP request count and feed into HTTP throughput tracker
let pending_reqs = self.pending_http_requests.swap(0, Ordering::Relaxed);
if let Ok(mut tracker) = self.http_request_throughput.lock() {
// Use bytes_in field to track request count (each request = 1 "byte")
tracker.record_bytes(pending_reqs, 0);
tracker.sample();
}
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
self.ip_bytes_in.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k));
}
/// Remove per-route metrics for route IDs that are no longer active.
/// Call this after `update_routes()` to prune stale entries.
pub fn retain_routes(&self, active_route_ids: &HashSet<String>) {
self.route_connections.retain(|k, _| active_route_ids.contains(k));
self.route_total_connections.retain(|k, _| active_route_ids.contains(k));
self.route_bytes_in.retain(|k, _| active_route_ids.contains(k));
self.route_bytes_out.retain(|k, _| active_route_ids.contains(k));
self.route_pending_tp.retain(|k, _| active_route_ids.contains(k));
self.route_throughput.retain(|k, _| active_route_ids.contains(k));
}
/// Get current active connection count.
pub fn active_connections(&self) -> u64 {
self.active_connections.load(Ordering::Relaxed)
}
/// Get total connection count.
pub fn total_connections(&self) -> u64 {
self.total_connections.load(Ordering::Relaxed)
}
/// Get total bytes received.
pub fn total_bytes_in(&self) -> u64 {
self.total_bytes_in.load(Ordering::Relaxed)
}
/// Get total bytes sent.
pub fn total_bytes_out(&self) -> u64 {
self.total_bytes_out.load(Ordering::Relaxed)
}
/// Get a full metrics snapshot including per-route and per-IP data.
pub fn snapshot(&self) -> Metrics {
let mut routes = std::collections::HashMap::new();
// Get global throughput (instant = last 1 sample, recent = last 10 samples)
let (global_tp_in, global_tp_out, global_recent_in, global_recent_out, throughput_history) =
self.global_throughput
.lock()
.map(|t| {
let (i_in, i_out) = t.instant();
let (r_in, r_out) = t.recent();
let history = t.history(60);
(i_in, i_out, r_in, r_out, history)
})
.unwrap_or((0, 0, 0, 0, Vec::new()));
// Collect per-route metrics
for entry in self.route_total_connections.iter() {
let route_id = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
let active = self.route_connections
.get(&route_id)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_in = self.route_bytes_in
.get(&route_id)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_out = self.route_bytes_out
.get(&route_id)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let (route_tp_in, route_tp_out, route_recent_in, route_recent_out) = self.route_throughput
.get(&route_id)
.and_then(|entry| entry.value().lock().ok().map(|t| {
let (i_in, i_out) = t.instant();
let (r_in, r_out) = t.recent();
(i_in, i_out, r_in, r_out)
}))
.unwrap_or((0, 0, 0, 0));
routes.insert(route_id, RouteMetrics {
active_connections: active,
total_connections: total,
bytes_in,
bytes_out,
throughput_in_bytes_per_sec: route_tp_in,
throughput_out_bytes_per_sec: route_tp_out,
throughput_recent_in_bytes_per_sec: route_recent_in,
throughput_recent_out_bytes_per_sec: route_recent_out,
});
}
// Collect per-IP metrics — only IPs with active connections or total > 0,
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64)> = Vec::new();
for entry in self.ip_total_connections.iter() {
let ip = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
let active = self.ip_connections
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_in = self.ip_bytes_in
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_out = self.ip_bytes_out
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let (tp_in, tp_out) = self.ip_throughput
.get(&ip)
.and_then(|entry| entry.value().lock().ok().map(|t| t.instant()))
.unwrap_or((0, 0));
ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out));
}
// Sort by active connections descending, then cap
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
ip_entries.truncate(MAX_IPS_IN_SNAPSHOT);
let mut ips = std::collections::HashMap::new();
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out) in ip_entries {
ips.insert(ip, IpMetrics {
active_connections: active,
total_connections: total,
bytes_in,
bytes_out,
throughput_in_bytes_per_sec: tp_in,
throughput_out_bytes_per_sec: tp_out,
});
}
// Collect per-backend metrics, capped at top MAX_BACKENDS_IN_SNAPSHOT by total connections
let mut backend_entries: Vec<(String, BackendMetrics)> = Vec::new();
for entry in self.backend_total.iter() {
let key = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
let active = self.backend_active
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let protocol = self.backend_protocol
.get(&key)
.map(|v| v.value().clone())
.unwrap_or_else(|| "unknown".to_string());
let connect_errors = self.backend_connect_errors
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let handshake_errors = self.backend_handshake_errors
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let request_errors = self.backend_request_errors
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let total_connect_time_us = self.backend_connect_time_us
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let connect_count = self.backend_connect_count
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let pool_hits = self.backend_pool_hits
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let pool_misses = self.backend_pool_misses
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let h2_failures = self.backend_h2_failures
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
backend_entries.push((key, BackendMetrics {
active_connections: active,
total_connections: total,
protocol,
connect_errors,
handshake_errors,
request_errors,
total_connect_time_us,
connect_count,
pool_hits,
pool_misses,
h2_failures,
}));
}
// Sort by total connections descending, then cap
backend_entries.sort_by(|a, b| b.1.total_connections.cmp(&a.1.total_connections));
backend_entries.truncate(MAX_BACKENDS_IN_SNAPSHOT);
let backends: std::collections::HashMap<String, BackendMetrics> = backend_entries.into_iter().collect();
// HTTP request rates
let (http_rps, http_rps_recent) = self.http_request_throughput
.lock()
.map(|t| {
let (instant, _) = t.instant();
let (recent, _) = t.recent();
(instant, recent)
})
.unwrap_or((0, 0));
Metrics {
active_connections: self.active_connections(),
total_connections: self.total_connections(),
bytes_in: self.total_bytes_in(),
bytes_out: self.total_bytes_out(),
throughput_in_bytes_per_sec: global_tp_in,
throughput_out_bytes_per_sec: global_tp_out,
throughput_recent_in_bytes_per_sec: global_recent_in,
throughput_recent_out_bytes_per_sec: global_recent_out,
routes,
ips,
backends,
throughput_history,
total_http_requests: self.total_http_requests.load(Ordering::Relaxed),
http_requests_per_sec: http_rps,
http_requests_per_sec_recent: http_rps_recent,
}
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initial_state_zeros() {
let collector = MetricsCollector::new();
assert_eq!(collector.active_connections(), 0);
assert_eq!(collector.total_connections(), 0);
}
#[test]
fn test_connection_opened_increments() {
let collector = MetricsCollector::new();
collector.connection_opened(None, None);
assert_eq!(collector.active_connections(), 1);
assert_eq!(collector.total_connections(), 1);
collector.connection_opened(None, None);
assert_eq!(collector.active_connections(), 2);
assert_eq!(collector.total_connections(), 2);
}
#[test]
fn test_connection_closed_decrements() {
let collector = MetricsCollector::new();
collector.connection_opened(None, None);
collector.connection_opened(None, None);
assert_eq!(collector.active_connections(), 2);
collector.connection_closed(None, None);
assert_eq!(collector.active_connections(), 1);
// total_connections should stay at 2
assert_eq!(collector.total_connections(), 2);
}
#[test]
fn test_route_specific_tracking() {
let collector = MetricsCollector::new();
collector.connection_opened(Some("route-a"), None);
collector.connection_opened(Some("route-a"), None);
collector.connection_opened(Some("route-b"), None);
assert_eq!(collector.active_connections(), 3);
assert_eq!(collector.total_connections(), 3);
collector.connection_closed(Some("route-a"), None);
assert_eq!(collector.active_connections(), 2);
}
#[test]
fn test_record_bytes() {
let collector = MetricsCollector::new();
collector.record_bytes(100, 200, Some("route-a"), None);
collector.record_bytes(50, 75, Some("route-a"), None);
collector.record_bytes(25, 30, None, None);
let total_in = collector.total_bytes_in.load(Ordering::Relaxed);
let total_out = collector.total_bytes_out.load(Ordering::Relaxed);
assert_eq!(total_in, 175);
assert_eq!(total_out, 305);
// Route-specific bytes
let route_in = collector.route_bytes_in.get("route-a").unwrap();
assert_eq!(route_in.load(Ordering::Relaxed), 150);
}
#[test]
fn test_throughput_tracking() {
let collector = MetricsCollector::with_retention(60);
// Open a connection so the route appears in the snapshot
collector.connection_opened(Some("route-a"), None);
// Record some bytes
collector.record_bytes(1000, 2000, Some("route-a"), None);
collector.record_bytes(500, 750, None, None);
// Take a sample (simulates the 1Hz tick)
collector.sample_all();
// Check global throughput
let snapshot = collector.snapshot();
assert_eq!(snapshot.throughput_in_bytes_per_sec, 1500);
assert_eq!(snapshot.throughput_out_bytes_per_sec, 2750);
// Check per-route throughput
let route_a = snapshot.routes.get("route-a").unwrap();
assert_eq!(route_a.throughput_in_bytes_per_sec, 1000);
assert_eq!(route_a.throughput_out_bytes_per_sec, 2000);
}
#[test]
fn test_throughput_zero_before_sampling() {
let collector = MetricsCollector::with_retention(60);
collector.record_bytes(1000, 2000, None, None);
// Without sampling, throughput should be 0
let snapshot = collector.snapshot();
assert_eq!(snapshot.throughput_in_bytes_per_sec, 0);
assert_eq!(snapshot.throughput_out_bytes_per_sec, 0);
}
#[test]
fn test_per_ip_tracking() {
let collector = MetricsCollector::with_retention(60);
collector.connection_opened(Some("route-a"), Some("1.2.3.4"));
collector.connection_opened(Some("route-a"), Some("1.2.3.4"));
collector.connection_opened(Some("route-b"), Some("5.6.7.8"));
// Check IP active connections (drop DashMap refs immediately to avoid deadlock)
assert_eq!(
collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed),
2
);
assert_eq!(
collector.ip_connections.get("5.6.7.8").unwrap().load(Ordering::Relaxed),
1
);
// Record bytes per IP
collector.record_bytes(100, 200, Some("route-a"), Some("1.2.3.4"));
collector.record_bytes(300, 400, Some("route-b"), Some("5.6.7.8"));
collector.sample_all();
let snapshot = collector.snapshot();
assert_eq!(snapshot.ips.len(), 2);
let ip1_metrics = snapshot.ips.get("1.2.3.4").unwrap();
assert_eq!(ip1_metrics.active_connections, 2);
assert_eq!(ip1_metrics.bytes_in, 100);
// Close connections
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
assert_eq!(
collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed),
1
);
// Close last connection for IP — should be cleaned up
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
assert!(collector.ip_connections.get("1.2.3.4").is_none());
}
#[test]
fn test_per_ip_full_eviction_on_last_close() {
let collector = MetricsCollector::with_retention(60);
// Open connections from two IPs
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.connection_opened(Some("route-b"), Some("10.0.0.2"));
// Record bytes to populate per-IP DashMaps
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(300, 400, Some("route-b"), Some("10.0.0.2"));
collector.sample_all();
// Verify per-IP data exists
assert!(collector.ip_total_connections.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_some());
assert!(collector.ip_throughput.get("10.0.0.1").is_some());
// Close all connections for 10.0.0.1
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// All per-IP data for 10.0.0.1 should be evicted
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert!(collector.ip_total_connections.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert!(collector.ip_throughput.get("10.0.0.1").is_none());
// 10.0.0.2 should still have data
assert!(collector.ip_connections.get("10.0.0.2").is_some());
assert!(collector.ip_total_connections.get("10.0.0.2").is_some());
}
#[test]
fn test_http_request_tracking() {
let collector = MetricsCollector::with_retention(60);
collector.record_http_request();
collector.record_http_request();
collector.record_http_request();
assert_eq!(collector.total_http_requests.load(Ordering::Relaxed), 3);
collector.sample_all();
let snapshot = collector.snapshot();
assert_eq!(snapshot.total_http_requests, 3);
assert_eq!(snapshot.http_requests_per_sec, 3);
}
#[test]
fn test_retain_routes_prunes_stale() {
let collector = MetricsCollector::with_retention(60);
// Create metrics for 3 routes
collector.connection_opened(Some("route-a"), None);
collector.connection_opened(Some("route-b"), None);
collector.connection_opened(Some("route-c"), None);
collector.record_bytes(100, 200, Some("route-a"), None);
collector.record_bytes(100, 200, Some("route-b"), None);
collector.record_bytes(100, 200, Some("route-c"), None);
collector.sample_all();
// Now "route-b" is removed from config
let active = HashSet::from(["route-a".to_string(), "route-c".to_string()]);
collector.retain_routes(&active);
// route-b entries should be gone
assert!(collector.route_connections.get("route-b").is_none());
assert!(collector.route_total_connections.get("route-b").is_none());
assert!(collector.route_bytes_in.get("route-b").is_none());
assert!(collector.route_bytes_out.get("route-b").is_none());
assert!(collector.route_throughput.get("route-b").is_none());
// route-a and route-c should still exist
assert!(collector.route_total_connections.get("route-a").is_some());
assert!(collector.route_total_connections.get("route-c").is_some());
}
#[test]
fn test_record_bytes_after_close_no_orphan() {
let collector = MetricsCollector::with_retention(60);
// Open a connection, record bytes, then close
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// IP should be fully evicted
assert!(collector.ip_connections.get("10.0.0.1").is_none());
// Now record_bytes arrives late (simulates race) — should NOT re-create entries
collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1"));
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
// Global bytes should still be counted
assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150);
assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275);
}
#[test]
fn test_sample_all_prunes_orphaned_ip_entries() {
let collector = MetricsCollector::with_retention(60);
// Manually insert orphaned entries (simulates the race before the guard)
collector.ip_bytes_in.insert("orphan-ip".to_string(), AtomicU64::new(100));
collector.ip_bytes_out.insert("orphan-ip".to_string(), AtomicU64::new(200));
collector.ip_pending_tp.insert("orphan-ip".to_string(), (AtomicU64::new(0), AtomicU64::new(0)));
// No ip_connections entry for "orphan-ip"
assert!(collector.ip_connections.get("orphan-ip").is_none());
// sample_all should prune the orphans
collector.sample_all();
assert!(collector.ip_bytes_in.get("orphan-ip").is_none());
assert!(collector.ip_bytes_out.get("orphan-ip").is_none());
assert!(collector.ip_pending_tp.get("orphan-ip").is_none());
}
#[test]
fn test_throughput_history_in_snapshot() {
let collector = MetricsCollector::with_retention(60);
for i in 1..=5 {
collector.record_bytes(i * 100, i * 200, None, None);
collector.sample_all();
}
let snapshot = collector.snapshot();
assert_eq!(snapshot.throughput_history.len(), 5);
// History should be chronological (oldest first)
assert_eq!(snapshot.throughput_history[0].bytes_in, 100);
assert_eq!(snapshot.throughput_history[4].bytes_in, 500);
}
#[test]
fn test_backend_metrics_basic() {
let collector = MetricsCollector::new();
let key = "backend1:8080";
// Open connections with timing
collector.backend_connection_opened(key, Duration::from_millis(15));
collector.backend_connection_opened(key, Duration::from_millis(25));
assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(collector.backend_connect_count.get(key).unwrap().load(Ordering::Relaxed), 2);
// 15ms + 25ms = 40ms = 40_000us
assert_eq!(collector.backend_connect_time_us.get(key).unwrap().load(Ordering::Relaxed), 40_000);
// Close one
collector.backend_connection_closed(key);
assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 1);
// total stays
assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2);
// Record errors
collector.backend_connect_error(key);
collector.backend_handshake_error(key);
collector.backend_request_error(key);
collector.backend_h2_failure(key);
collector.backend_pool_hit(key);
collector.backend_pool_hit(key);
collector.backend_pool_miss(key);
assert_eq!(collector.backend_connect_errors.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_handshake_errors.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_request_errors.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_h2_failures.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_pool_hits.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(collector.backend_pool_misses.get(key).unwrap().load(Ordering::Relaxed), 1);
// Protocol
collector.set_backend_protocol(key, "h1");
assert_eq!(collector.backend_protocol.get(key).unwrap().value(), "h1");
collector.set_backend_protocol(key, "h2");
assert_eq!(collector.backend_protocol.get(key).unwrap().value(), "h2");
}
#[test]
fn test_backend_metrics_in_snapshot() {
let collector = MetricsCollector::new();
collector.backend_connection_opened("b1:443", Duration::from_millis(10));
collector.backend_connection_opened("b2:8080", Duration::from_millis(20));
collector.set_backend_protocol("b1:443", "h2");
collector.set_backend_protocol("b2:8080", "h1");
collector.backend_connect_error("b1:443");
let snapshot = collector.snapshot();
assert_eq!(snapshot.backends.len(), 2);
let b1 = snapshot.backends.get("b1:443").unwrap();
assert_eq!(b1.active_connections, 1);
assert_eq!(b1.total_connections, 1);
assert_eq!(b1.protocol, "h2");
assert_eq!(b1.connect_errors, 1);
assert_eq!(b1.total_connect_time_us, 10_000);
assert_eq!(b1.connect_count, 1);
let b2 = snapshot.backends.get("b2:8080").unwrap();
assert_eq!(b2.protocol, "h1");
assert_eq!(b2.connect_errors, 0);
}
#[test]
fn test_retain_backends_prunes_stale() {
let collector = MetricsCollector::new();
collector.backend_connection_opened("active:443", Duration::from_millis(5));
collector.backend_connection_opened("stale:8080", Duration::from_millis(10));
collector.set_backend_protocol("active:443", "h1");
collector.set_backend_protocol("stale:8080", "h2");
collector.backend_connect_error("stale:8080");
let active = HashSet::from(["active:443".to_string()]);
collector.retain_backends(&active);
// active:443 should still exist
assert!(collector.backend_total.get("active:443").is_some());
assert!(collector.backend_protocol.get("active:443").is_some());
// stale:8080 should be fully removed
assert!(collector.backend_active.get("stale:8080").is_none());
assert!(collector.backend_total.get("stale:8080").is_none());
assert!(collector.backend_protocol.get("stale:8080").is_none());
assert!(collector.backend_connect_errors.get("stale:8080").is_none());
assert!(collector.backend_connect_time_us.get("stale:8080").is_none());
assert!(collector.backend_connect_count.get("stale:8080").is_none());
assert!(collector.backend_pool_hits.get("stale:8080").is_none());
assert!(collector.backend_pool_misses.get("stale:8080").is_none());
assert!(collector.backend_h2_failures.get("stale:8080").is_none());
}
#[test]
fn test_backend_connection_closed_saturates() {
let collector = MetricsCollector::new();
let key = "b:80";
// Close without opening — should not underflow
collector.backend_connection_closed(key);
// No entry created
assert!(collector.backend_active.get(key).is_none());
// Open one, close two — should saturate at 0
collector.backend_connection_opened(key, Duration::from_millis(1));
collector.backend_connection_closed(key);
collector.backend_connection_closed(key);
assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 0);
}
}