From 1ea290a085261d8d7d39540d18aaccd7951def20 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 26 Apr 2026 09:17:11 +0000 Subject: [PATCH] fix(rustproxy-metrics): preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated --- changelog.md | 6 ++ .../crates/rustproxy-metrics/src/collector.rs | 81 +++++++++++++++++-- ts/00_commitinfo_data.ts | 2 +- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/changelog.md b/changelog.md index 5221205..c657a93 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2026-04-26 - 27.8.1 - fix(rustproxy-metrics) +preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated + +- Select snapshot IPs using a blend of active-connection and throughput rankings instead of only active connections +- Adds a regression test to ensure a high-bandwidth IP remains included when many other IPs have more active connections + ## 2026-04-14 - 27.8.0 - feat(metrics) add per-domain HTTP request rate metrics diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 803a553..da37b68 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -144,7 +144,7 @@ pub struct Statistics { /// Default retention for throughput samples (1 hour). const DEFAULT_RETENTION_SECONDS: usize = 3600; -/// Maximum number of IPs to include in a snapshot (top by active connections). +/// Maximum number of IPs to include in a snapshot. const MAX_IPS_IN_SNAPSHOT: usize = 100; /// Maximum number of backends to include in a snapshot (top by total connections). @@ -1064,8 +1064,8 @@ impl MetricsCollector { ); } - // Collect per-IP metrics — only IPs with active connections or total > 0, - // capped at top MAX_IPS_IN_SNAPSHOT sorted by active count + // Collect per-IP metrics — capped to the IPs most relevant for either + // active connection visibility or bandwidth attribution. let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap)> = Vec::new(); for entry in self.ip_total_connections.iter() { @@ -1113,9 +1113,54 @@ impl MetricsCollector { domain_requests, )); } - // Sort by active connections descending, then cap - ip_entries.sort_by(|a, b| b.1.cmp(&a.1)); - ip_entries.truncate(MAX_IPS_IN_SNAPSHOT); + if ip_entries.len() > MAX_IPS_IN_SNAPSHOT { + let mut selected = vec![false; ip_entries.len()]; + let mut selected_count = 0usize; + + let mut active_rank: Vec = (0..ip_entries.len()).collect(); + active_rank.sort_by(|&a, &b| { + ip_entries[b] + .1 + .cmp(&ip_entries[a].1) + .then_with(|| ip_entries[b].2.cmp(&ip_entries[a].2)) + .then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0)) + }); + + let mut throughput_rank: Vec = (0..ip_entries.len()).collect(); + throughput_rank.sort_by(|&a, &b| { + let a_tp = ip_entries[a].5.saturating_add(ip_entries[a].6); + let b_tp = ip_entries[b].5.saturating_add(ip_entries[b].6); + let a_bytes = ip_entries[a].3.saturating_add(ip_entries[a].4); + let b_bytes = ip_entries[b].3.saturating_add(ip_entries[b].4); + b_tp.cmp(&a_tp) + .then_with(|| b_bytes.cmp(&a_bytes)) + .then_with(|| ip_entries[b].1.cmp(&ip_entries[a].1)) + .then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0)) + }); + + for idx in active_rank.into_iter().take(MAX_IPS_IN_SNAPSHOT / 2) { + if !selected[idx] { + selected[idx] = true; + selected_count += 1; + } + } + + for idx in throughput_rank { + if selected_count >= MAX_IPS_IN_SNAPSHOT { + break; + } + if !selected[idx] { + selected[idx] = true; + selected_count += 1; + } + } + + ip_entries = ip_entries + .into_iter() + .enumerate() + .filter_map(|(idx, entry)| selected[idx].then_some(entry)) + .collect(); + } let mut ips = std::collections::HashMap::new(); for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries { @@ -1452,6 +1497,30 @@ mod tests { assert!(collector.ip_connections.get("1.2.3.4").is_none()); } + #[test] + fn test_snapshot_retains_high_throughput_ip_over_many_active_ips() { + let collector = MetricsCollector::with_retention(60); + + for i in 1..=(MAX_IPS_IN_SNAPSHOT + 20) { + let ip = format!("10.0.0.{}", i); + collector.connection_opened(Some("scanner-route"), Some(&ip)); + collector.connection_opened(Some("scanner-route"), Some(&ip)); + } + + let busy_ip = "203.0.113.10"; + collector.connection_opened(Some("download-route"), Some(busy_ip)); + collector.record_bytes(0, 900_000, Some("download-route"), Some(busy_ip)); + collector.sample_all(); + + let snapshot = collector.snapshot(); + let busy_metrics = snapshot.ips.get(busy_ip).unwrap(); + + assert_eq!(snapshot.ips.len(), MAX_IPS_IN_SNAPSHOT); + assert_eq!(busy_metrics.active_connections, 1); + assert_eq!(busy_metrics.bytes_out, 900_000); + assert_eq!(busy_metrics.throughput_out_bytes_per_sec, 900_000); + } + #[test] fn test_per_ip_full_eviction_on_last_close() { let collector = MetricsCollector::with_retention(60); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index db9b86a..a194c75 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '27.8.0', + version: '27.8.1', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }