fix(rustproxy-metrics): retain inactive per-IP metric buckets briefly to capture final throughput before pruning

This commit is contained in:
2026-04-26 11:25:24 +00:00
parent fdb5ec59bc
commit 088ef6ab09
4 changed files with 197 additions and 36 deletions
+185 -33
View File
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker};
@@ -147,6 +147,12 @@ const DEFAULT_RETENTION_SECONDS: usize = 3600;
/// Maximum number of IPs to include in a snapshot.
const MAX_IPS_IN_SNAPSHOT: usize = 100;
/// How long to retain inactive IP metric buckets after the last connection closes.
const INACTIVE_IP_RETENTION_MS: u64 = 15_000;
/// Hard cap for inactive IP metric buckets retained between sampler passes.
const MAX_INACTIVE_IPS_RETAINED: usize = MAX_IPS_IN_SNAPSHOT * 10;
/// Maximum number of backends to include in a snapshot (top by total connections).
const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
@@ -165,6 +171,13 @@ fn canonicalize_domain_key(domain: &str) -> Option<String> {
}
}
fn current_time_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
/// Metrics collector tracking connections and throughput.
///
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
@@ -186,6 +199,7 @@ pub struct MetricsCollector {
// ── Per-IP tracking ──
ip_connections: DashMap<String, AtomicU64>,
ip_closed_at_ms: DashMap<String, AtomicU64>,
ip_total_connections: DashMap<String, AtomicU64>,
ip_bytes_in: DashMap<String, AtomicU64>,
ip_bytes_out: DashMap<String, AtomicU64>,
@@ -273,6 +287,7 @@ impl MetricsCollector {
route_bytes_in: DashMap::new(),
route_bytes_out: DashMap::new(),
ip_connections: DashMap::new(),
ip_closed_at_ms: DashMap::new(),
ip_total_connections: DashMap::new(),
ip_bytes_in: DashMap::new(),
ip_bytes_out: DashMap::new(),
@@ -348,6 +363,7 @@ impl MetricsCollector {
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.ip_closed_at_ms.remove(ip);
self.ip_total_connections
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
@@ -392,22 +408,88 @@ impl MetricsCollector {
}
})
.ok();
// Clean up zero-count entries to prevent memory growth
// Keep inactive IP buckets briefly so pending bytes can still
// be sampled into per-IP throughput after short-lived transfers.
if matches!(prev, Some(v) if v <= 1) {
drop(counter);
self.ip_connections.remove(ip);
// Evict all per-IP tracking data for this IP
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
self.ip_closed_at_ms
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.store(current_time_ms(), Ordering::Relaxed);
}
}
}
}
fn remove_inactive_ip_tracking(&self, ip: &str) {
if self
.ip_connections
.get(ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0)
> 0
{
self.ip_closed_at_ms.remove(ip);
return;
}
self.ip_connections.remove(ip);
self.ip_closed_at_ms.remove(ip);
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
}
fn prune_inactive_ip_tracking(&self, now_ms: u64) {
let cutoff_ms = now_ms.saturating_sub(INACTIVE_IP_RETENTION_MS);
let mut inactive_ips: Vec<(String, u64)> = Vec::new();
let mut active_markers: Vec<String> = Vec::new();
for entry in self.ip_closed_at_ms.iter() {
let ip = entry.key().clone();
let active = self
.ip_connections
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
if active > 0 {
active_markers.push(ip);
} else {
inactive_ips.push((ip, entry.value().load(Ordering::Relaxed)));
}
}
for ip in active_markers {
self.ip_closed_at_ms.remove(&ip);
}
let mut remove_ips: HashSet<String> = inactive_ips
.iter()
.filter(|(_, closed_at_ms)| *closed_at_ms <= cutoff_ms)
.map(|(ip, _)| ip.clone())
.collect();
let retained_after_ttl = inactive_ips.len().saturating_sub(remove_ips.len());
if retained_after_ttl > MAX_INACTIVE_IPS_RETAINED {
inactive_ips.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
let mut overflow = retained_after_ttl - MAX_INACTIVE_IPS_RETAINED;
for (ip, closed_at_ms) in inactive_ips {
if overflow == 0 {
break;
}
if closed_at_ms > cutoff_ms && remove_ips.insert(ip) {
overflow -= 1;
}
}
}
for ip in remove_ips {
self.remove_inactive_ip_tracking(&ip);
}
}
/// Record bytes transferred (lock-free hot path).
///
/// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters —
@@ -481,9 +563,8 @@ impl MetricsCollector {
// Per-IP tracking: same get()-first pattern to avoid String allocation on hot path.
if let Some(ip) = source_ip {
// Only record per-IP stats if the IP still has active connections.
// This prevents orphaned entries when record_bytes races with
// connection_closed (which evicts all per-IP data on last close).
// Only record per-IP stats if the IP is active or still within the
// bounded inactive retention window after its last connection closed.
if self.ip_connections.contains_key(ip) {
if bytes_in > 0 {
if let Some(counter) = self.ip_bytes_in.get(ip) {
@@ -918,9 +999,13 @@ impl MetricsCollector {
self.http_domain_request_rates.remove(&domain);
}
// Keep closed IP buckets only for a short, bounded window so the sampler
// can attribute short-lived transfers without leaking per-IP maps.
self.prune_inactive_ip_tracking(current_time_ms());
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
// ip_connections entry. This catches any entries created by older races or
// by code paths that manually inserted partial per-IP state.
self.ip_bytes_in
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out
@@ -933,6 +1018,8 @@ impl MetricsCollector {
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_domain_requests
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_closed_at_ms
.retain(|k, _| self.ip_connections.contains_key(k));
// Safety-net: prune orphaned backend error/stats entries for backends
// that have no active or total connections (error-only backends).
@@ -1492,9 +1579,18 @@ mod tests {
1
);
// Close last connection for IP — should be cleaned up
// Close last connection for IP — active count should drop to zero,
// while the inactive bucket is retained briefly for final sampling.
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
assert!(collector.ip_connections.get("1.2.3.4").is_none());
assert_eq!(
collector
.ip_connections
.get("1.2.3.4")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_closed_at_ms.get("1.2.3.4").is_some());
}
#[test]
@@ -1544,13 +1640,36 @@ mod tests {
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// All per-IP data for 10.0.0.1 should be evicted
// Per-IP data for 10.0.0.1 should be retained until the inactive TTL expires.
assert_eq!(
collector
.ip_connections
.get("10.0.0.1")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_total_connections.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_some());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_some());
assert!(collector.ip_throughput.get("10.0.0.1").is_some());
collector
.ip_closed_at_ms
.get("10.0.0.1")
.unwrap()
.store(0, Ordering::Relaxed);
collector.sample_all();
// Expired inactive buckets are fully evicted to prevent leaks.
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert!(collector.ip_total_connections.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert!(collector.ip_throughput.get("10.0.0.1").is_none());
assert!(collector.ip_closed_at_ms.get("10.0.0.1").is_none());
// 10.0.0.2 should still have data
assert!(collector.ip_connections.get("10.0.0.2").is_some());
@@ -1577,7 +1696,14 @@ mod tests {
.unwrap_or(0),
0
);
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert_eq!(
collector
.ip_connections
.get("10.0.0.1")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
}
#[test]
@@ -1718,26 +1844,52 @@ mod tests {
}
#[test]
fn test_record_bytes_after_close_no_orphan() {
fn test_closed_ip_keeps_pending_throughput_until_sample() {
let collector = MetricsCollector::with_retention(60);
// Open a connection, record bytes, then close
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 2000, Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// IP should be fully evicted
assert!(collector.ip_connections.get("10.0.0.1").is_none());
collector.sample_all();
let snapshot = collector.snapshot();
let ip_metrics = snapshot.ips.get("10.0.0.1").unwrap();
// Now record_bytes arrives late (simulates race) — should NOT re-create entries
collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1"));
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert_eq!(ip_metrics.active_connections, 0);
assert_eq!(ip_metrics.bytes_in, 100);
assert_eq!(ip_metrics.bytes_out, 2000);
assert_eq!(ip_metrics.throughput_in_bytes_per_sec, 100);
assert_eq!(ip_metrics.throughput_out_bytes_per_sec, 2000);
assert_eq!(snapshot.throughput_out_bytes_per_sec, 2000);
assert_eq!(
snapshot
.routes
.get("route-a")
.unwrap()
.throughput_out_bytes_per_sec,
2000
);
}
// Global bytes should still be counted
assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150);
assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275);
#[test]
fn test_inactive_ip_retention_hard_cap_prunes_oldest() {
let collector = MetricsCollector::with_retention(60);
for i in 0..(MAX_INACTIVE_IPS_RETAINED + 5) {
let ip = format!("198.51.{}.{}", i / 255, i % 255);
collector.connection_opened(Some("route-a"), Some(&ip));
collector.connection_closed(Some("route-a"), Some(&ip));
collector
.ip_closed_at_ms
.get(&ip)
.unwrap()
.store(current_time_ms().saturating_sub(1), Ordering::Relaxed);
}
collector.sample_all();
assert!(collector.ip_closed_at_ms.len() <= MAX_INACTIVE_IPS_RETAINED);
assert!(collector.ip_connections.len() <= MAX_INACTIVE_IPS_RETAINED);
}
#[test]