diff --git a/changelog.md b/changelog.md index c657a93..cdc3d7f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-26 - 27.8.2 - fix(rustproxy-metrics) +retain inactive per-IP metric buckets briefly to capture final throughput before pruning + +- adds a bounded retention window for closed IP buckets so short-lived transfers are still included in per-IP throughput sampling +- prunes expired inactive IP tracking by TTL and hard cap to prevent unbounded metric map growth +- updates Rust and throughput tests to expect zero active connections during the temporary retention period + ## 2026-04-26 - 27.8.1 - fix(rustproxy-metrics) preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index da37b68..758c8a2 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker}; @@ -147,6 +147,12 @@ const DEFAULT_RETENTION_SECONDS: usize = 3600; /// Maximum number of IPs to include in a snapshot. const MAX_IPS_IN_SNAPSHOT: usize = 100; +/// How long to retain inactive IP metric buckets after the last connection closes. +const INACTIVE_IP_RETENTION_MS: u64 = 15_000; + +/// Hard cap for inactive IP metric buckets retained between sampler passes. +const MAX_INACTIVE_IPS_RETAINED: usize = MAX_IPS_IN_SNAPSHOT * 10; + /// Maximum number of backends to include in a snapshot (top by total connections). const MAX_BACKENDS_IN_SNAPSHOT: usize = 100; @@ -165,6 +171,13 @@ fn canonicalize_domain_key(domain: &str) -> Option { } } +fn current_time_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + /// Metrics collector tracking connections and throughput. /// /// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches @@ -186,6 +199,7 @@ pub struct MetricsCollector { // ── Per-IP tracking ── ip_connections: DashMap, + ip_closed_at_ms: DashMap, ip_total_connections: DashMap, ip_bytes_in: DashMap, ip_bytes_out: DashMap, @@ -273,6 +287,7 @@ impl MetricsCollector { route_bytes_in: DashMap::new(), route_bytes_out: DashMap::new(), ip_connections: DashMap::new(), + ip_closed_at_ms: DashMap::new(), ip_total_connections: DashMap::new(), ip_bytes_in: DashMap::new(), ip_bytes_out: DashMap::new(), @@ -348,6 +363,7 @@ impl MetricsCollector { .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); + self.ip_closed_at_ms.remove(ip); self.ip_total_connections .entry(ip.to_string()) .or_insert_with(|| AtomicU64::new(0)) @@ -392,22 +408,88 @@ impl MetricsCollector { } }) .ok(); - // Clean up zero-count entries to prevent memory growth + // Keep inactive IP buckets briefly so pending bytes can still + // be sampled into per-IP throughput after short-lived transfers. if matches!(prev, Some(v) if v <= 1) { - drop(counter); - self.ip_connections.remove(ip); - // Evict all per-IP tracking data for this IP - self.ip_total_connections.remove(ip); - self.ip_bytes_in.remove(ip); - self.ip_bytes_out.remove(ip); - self.ip_pending_tp.remove(ip); - self.ip_throughput.remove(ip); - self.ip_domain_requests.remove(ip); + self.ip_closed_at_ms + .entry(ip.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .store(current_time_ms(), Ordering::Relaxed); } } } } + fn remove_inactive_ip_tracking(&self, ip: &str) { + if self + .ip_connections + .get(ip) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0) + > 0 + { + self.ip_closed_at_ms.remove(ip); + return; + } + + self.ip_connections.remove(ip); + self.ip_closed_at_ms.remove(ip); + self.ip_total_connections.remove(ip); + self.ip_bytes_in.remove(ip); + self.ip_bytes_out.remove(ip); + self.ip_pending_tp.remove(ip); + self.ip_throughput.remove(ip); + self.ip_domain_requests.remove(ip); + } + + fn prune_inactive_ip_tracking(&self, now_ms: u64) { + let cutoff_ms = now_ms.saturating_sub(INACTIVE_IP_RETENTION_MS); + let mut inactive_ips: Vec<(String, u64)> = Vec::new(); + let mut active_markers: Vec = Vec::new(); + + for entry in self.ip_closed_at_ms.iter() { + let ip = entry.key().clone(); + let active = self + .ip_connections + .get(&ip) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + if active > 0 { + active_markers.push(ip); + } else { + inactive_ips.push((ip, entry.value().load(Ordering::Relaxed))); + } + } + + for ip in active_markers { + self.ip_closed_at_ms.remove(&ip); + } + + let mut remove_ips: HashSet = inactive_ips + .iter() + .filter(|(_, closed_at_ms)| *closed_at_ms <= cutoff_ms) + .map(|(ip, _)| ip.clone()) + .collect(); + + let retained_after_ttl = inactive_ips.len().saturating_sub(remove_ips.len()); + if retained_after_ttl > MAX_INACTIVE_IPS_RETAINED { + inactive_ips.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0))); + let mut overflow = retained_after_ttl - MAX_INACTIVE_IPS_RETAINED; + for (ip, closed_at_ms) in inactive_ips { + if overflow == 0 { + break; + } + if closed_at_ms > cutoff_ms && remove_ips.insert(ip) { + overflow -= 1; + } + } + } + + for ip in remove_ips { + self.remove_inactive_ip_tracking(&ip); + } + } + /// Record bytes transferred (lock-free hot path). /// /// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters — @@ -481,9 +563,8 @@ impl MetricsCollector { // Per-IP tracking: same get()-first pattern to avoid String allocation on hot path. if let Some(ip) = source_ip { - // Only record per-IP stats if the IP still has active connections. - // This prevents orphaned entries when record_bytes races with - // connection_closed (which evicts all per-IP data on last close). + // Only record per-IP stats if the IP is active or still within the + // bounded inactive retention window after its last connection closed. if self.ip_connections.contains_key(ip) { if bytes_in > 0 { if let Some(counter) = self.ip_bytes_in.get(ip) { @@ -918,9 +999,13 @@ impl MetricsCollector { self.http_domain_request_rates.remove(&domain); } + // Keep closed IP buckets only for a short, bounded window so the sampler + // can attribute short-lived transfers without leaking per-IP maps. + self.prune_inactive_ip_tracking(current_time_ms()); + // Safety-net: prune orphaned per-IP entries that have no corresponding - // ip_connections entry. This catches any entries created by a race between - // record_bytes and connection_closed. + // ip_connections entry. This catches any entries created by older races or + // by code paths that manually inserted partial per-IP state. self.ip_bytes_in .retain(|k, _| self.ip_connections.contains_key(k)); self.ip_bytes_out @@ -933,6 +1018,8 @@ impl MetricsCollector { .retain(|k, _| self.ip_connections.contains_key(k)); self.ip_domain_requests .retain(|k, _| self.ip_connections.contains_key(k)); + self.ip_closed_at_ms + .retain(|k, _| self.ip_connections.contains_key(k)); // Safety-net: prune orphaned backend error/stats entries for backends // that have no active or total connections (error-only backends). @@ -1492,9 +1579,18 @@ mod tests { 1 ); - // Close last connection for IP — should be cleaned up + // Close last connection for IP — active count should drop to zero, + // while the inactive bucket is retained briefly for final sampling. collector.connection_closed(Some("route-a"), Some("1.2.3.4")); - assert!(collector.ip_connections.get("1.2.3.4").is_none()); + assert_eq!( + collector + .ip_connections + .get("1.2.3.4") + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0), + 0 + ); + assert!(collector.ip_closed_at_ms.get("1.2.3.4").is_some()); } #[test] @@ -1544,13 +1640,36 @@ mod tests { collector.connection_closed(Some("route-a"), Some("10.0.0.1")); collector.connection_closed(Some("route-a"), Some("10.0.0.1")); - // All per-IP data for 10.0.0.1 should be evicted + // Per-IP data for 10.0.0.1 should be retained until the inactive TTL expires. + assert_eq!( + collector + .ip_connections + .get("10.0.0.1") + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0), + 0 + ); + assert!(collector.ip_total_connections.get("10.0.0.1").is_some()); + assert!(collector.ip_bytes_in.get("10.0.0.1").is_some()); + assert!(collector.ip_bytes_out.get("10.0.0.1").is_some()); + assert!(collector.ip_pending_tp.get("10.0.0.1").is_some()); + assert!(collector.ip_throughput.get("10.0.0.1").is_some()); + + collector + .ip_closed_at_ms + .get("10.0.0.1") + .unwrap() + .store(0, Ordering::Relaxed); + collector.sample_all(); + + // Expired inactive buckets are fully evicted to prevent leaks. assert!(collector.ip_connections.get("10.0.0.1").is_none()); assert!(collector.ip_total_connections.get("10.0.0.1").is_none()); assert!(collector.ip_bytes_in.get("10.0.0.1").is_none()); assert!(collector.ip_bytes_out.get("10.0.0.1").is_none()); assert!(collector.ip_pending_tp.get("10.0.0.1").is_none()); assert!(collector.ip_throughput.get("10.0.0.1").is_none()); + assert!(collector.ip_closed_at_ms.get("10.0.0.1").is_none()); // 10.0.0.2 should still have data assert!(collector.ip_connections.get("10.0.0.2").is_some()); @@ -1577,7 +1696,14 @@ mod tests { .unwrap_or(0), 0 ); - assert!(collector.ip_connections.get("10.0.0.1").is_none()); + assert_eq!( + collector + .ip_connections + .get("10.0.0.1") + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0), + 0 + ); } #[test] @@ -1718,26 +1844,52 @@ mod tests { } #[test] - fn test_record_bytes_after_close_no_orphan() { + fn test_closed_ip_keeps_pending_throughput_until_sample() { let collector = MetricsCollector::with_retention(60); - // Open a connection, record bytes, then close collector.connection_opened(Some("route-a"), Some("10.0.0.1")); - collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1")); + collector.record_bytes(100, 2000, Some("route-a"), Some("10.0.0.1")); collector.connection_closed(Some("route-a"), Some("10.0.0.1")); - // IP should be fully evicted - assert!(collector.ip_connections.get("10.0.0.1").is_none()); + collector.sample_all(); + let snapshot = collector.snapshot(); + let ip_metrics = snapshot.ips.get("10.0.0.1").unwrap(); - // Now record_bytes arrives late (simulates race) — should NOT re-create entries - collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1")); - assert!(collector.ip_bytes_in.get("10.0.0.1").is_none()); - assert!(collector.ip_bytes_out.get("10.0.0.1").is_none()); - assert!(collector.ip_pending_tp.get("10.0.0.1").is_none()); + assert_eq!(ip_metrics.active_connections, 0); + assert_eq!(ip_metrics.bytes_in, 100); + assert_eq!(ip_metrics.bytes_out, 2000); + assert_eq!(ip_metrics.throughput_in_bytes_per_sec, 100); + assert_eq!(ip_metrics.throughput_out_bytes_per_sec, 2000); + assert_eq!(snapshot.throughput_out_bytes_per_sec, 2000); + assert_eq!( + snapshot + .routes + .get("route-a") + .unwrap() + .throughput_out_bytes_per_sec, + 2000 + ); + } - // Global bytes should still be counted - assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150); - assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275); + #[test] + fn test_inactive_ip_retention_hard_cap_prunes_oldest() { + let collector = MetricsCollector::with_retention(60); + + for i in 0..(MAX_INACTIVE_IPS_RETAINED + 5) { + let ip = format!("198.51.{}.{}", i / 255, i % 255); + collector.connection_opened(Some("route-a"), Some(&ip)); + collector.connection_closed(Some("route-a"), Some(&ip)); + collector + .ip_closed_at_ms + .get(&ip) + .unwrap() + .store(current_time_ms().saturating_sub(1), Ordering::Relaxed); + } + + collector.sample_all(); + + assert!(collector.ip_closed_at_ms.len() <= MAX_INACTIVE_IPS_RETAINED); + assert!(collector.ip_connections.len() <= MAX_INACTIVE_IPS_RETAINED); } #[test] diff --git a/test/test.throughput.ts b/test/test.throughput.ts index 4dc9e0a..21d7e60 100644 --- a/test/test.throughput.ts +++ b/test/test.throughput.ts @@ -188,10 +188,12 @@ tap.test('TCP forward - real-time byte tracking', async (tools) => { const byRoute = m.throughput.byRoute(); console.log('TCP forward — throughput byRoute:', Array.from(byRoute.entries())); - // After close, per-IP data should be evicted (memory leak fix) + // After close, per-IP buckets are retained briefly for final throughput sampling, + // but active connection counts must already be zero. const byIPAfter = m.connections.byIP(); console.log('TCP forward — connections byIP after close:', Array.from(byIPAfter.entries())); - expect(byIPAfter.size).toEqual(0); + expect(byIPAfter.size).toBeGreaterThan(0); + expect(Array.from(byIPAfter.values()).every((count) => count === 0)).toEqual(true); await proxy.stop(); await tools.delayFor(200); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index a194c75..c6f5aa7 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '27.8.1', + version: '27.8.2', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }