Compare commits

...

4 Commits

Author SHA1 Message Date
jkunz 8fa3a51b03 v27.8.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 11:25:24 +00:00
jkunz 088ef6ab09 fix(rustproxy-metrics): retain inactive per-IP metric buckets briefly to capture final throughput before pruning 2026-04-26 11:25:24 +00:00
jkunz fdb5ec59bc v27.8.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 09:17:11 +00:00
jkunz 1ea290a085 fix(rustproxy-metrics): preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated 2026-04-26 09:17:11 +00:00
5 changed files with 279 additions and 43 deletions
+13
View File
@@ -1,5 +1,18 @@
# Changelog
## 2026-04-26 - 27.8.2 - fix(rustproxy-metrics)
retain inactive per-IP metric buckets briefly to capture final throughput before pruning
- adds a bounded retention window for closed IP buckets so short-lived transfers are still included in per-IP throughput sampling
- prunes expired inactive IP tracking by TTL and hard cap to prevent unbounded metric map growth
- updates Rust and throughput tests to expect zero active connections during the temporary retention period
## 2026-04-26 - 27.8.1 - fix(rustproxy-metrics)
preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated
- Select snapshot IPs using a blend of active-connection and throughput rankings instead of only active connections
- Adds a regression test to ensure a high-bandwidth IP remains included when many other IPs have more active connections
## 2026-04-14 - 27.8.0 - feat(metrics)
add per-domain HTTP request rate metrics
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "27.8.0",
"version": "27.8.2",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
+260 -39
View File
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker};
@@ -144,9 +144,15 @@ pub struct Statistics {
/// Default retention for throughput samples (1 hour).
const DEFAULT_RETENTION_SECONDS: usize = 3600;
/// Maximum number of IPs to include in a snapshot (top by active connections).
/// Maximum number of IPs to include in a snapshot.
const MAX_IPS_IN_SNAPSHOT: usize = 100;
/// How long to retain inactive IP metric buckets after the last connection closes.
const INACTIVE_IP_RETENTION_MS: u64 = 15_000;
/// Hard cap for inactive IP metric buckets retained between sampler passes.
const MAX_INACTIVE_IPS_RETAINED: usize = MAX_IPS_IN_SNAPSHOT * 10;
/// Maximum number of backends to include in a snapshot (top by total connections).
const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
@@ -165,6 +171,13 @@ fn canonicalize_domain_key(domain: &str) -> Option<String> {
}
}
fn current_time_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
/// Metrics collector tracking connections and throughput.
///
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
@@ -186,6 +199,7 @@ pub struct MetricsCollector {
// ── Per-IP tracking ──
ip_connections: DashMap<String, AtomicU64>,
ip_closed_at_ms: DashMap<String, AtomicU64>,
ip_total_connections: DashMap<String, AtomicU64>,
ip_bytes_in: DashMap<String, AtomicU64>,
ip_bytes_out: DashMap<String, AtomicU64>,
@@ -273,6 +287,7 @@ impl MetricsCollector {
route_bytes_in: DashMap::new(),
route_bytes_out: DashMap::new(),
ip_connections: DashMap::new(),
ip_closed_at_ms: DashMap::new(),
ip_total_connections: DashMap::new(),
ip_bytes_in: DashMap::new(),
ip_bytes_out: DashMap::new(),
@@ -348,6 +363,7 @@ impl MetricsCollector {
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.ip_closed_at_ms.remove(ip);
self.ip_total_connections
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
@@ -392,22 +408,88 @@ impl MetricsCollector {
}
})
.ok();
// Clean up zero-count entries to prevent memory growth
// Keep inactive IP buckets briefly so pending bytes can still
// be sampled into per-IP throughput after short-lived transfers.
if matches!(prev, Some(v) if v <= 1) {
drop(counter);
self.ip_connections.remove(ip);
// Evict all per-IP tracking data for this IP
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
self.ip_closed_at_ms
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.store(current_time_ms(), Ordering::Relaxed);
}
}
}
}
fn remove_inactive_ip_tracking(&self, ip: &str) {
if self
.ip_connections
.get(ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0)
> 0
{
self.ip_closed_at_ms.remove(ip);
return;
}
self.ip_connections.remove(ip);
self.ip_closed_at_ms.remove(ip);
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
}
fn prune_inactive_ip_tracking(&self, now_ms: u64) {
let cutoff_ms = now_ms.saturating_sub(INACTIVE_IP_RETENTION_MS);
let mut inactive_ips: Vec<(String, u64)> = Vec::new();
let mut active_markers: Vec<String> = Vec::new();
for entry in self.ip_closed_at_ms.iter() {
let ip = entry.key().clone();
let active = self
.ip_connections
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
if active > 0 {
active_markers.push(ip);
} else {
inactive_ips.push((ip, entry.value().load(Ordering::Relaxed)));
}
}
for ip in active_markers {
self.ip_closed_at_ms.remove(&ip);
}
let mut remove_ips: HashSet<String> = inactive_ips
.iter()
.filter(|(_, closed_at_ms)| *closed_at_ms <= cutoff_ms)
.map(|(ip, _)| ip.clone())
.collect();
let retained_after_ttl = inactive_ips.len().saturating_sub(remove_ips.len());
if retained_after_ttl > MAX_INACTIVE_IPS_RETAINED {
inactive_ips.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
let mut overflow = retained_after_ttl - MAX_INACTIVE_IPS_RETAINED;
for (ip, closed_at_ms) in inactive_ips {
if overflow == 0 {
break;
}
if closed_at_ms > cutoff_ms && remove_ips.insert(ip) {
overflow -= 1;
}
}
}
for ip in remove_ips {
self.remove_inactive_ip_tracking(&ip);
}
}
/// Record bytes transferred (lock-free hot path).
///
/// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters —
@@ -481,9 +563,8 @@ impl MetricsCollector {
// Per-IP tracking: same get()-first pattern to avoid String allocation on hot path.
if let Some(ip) = source_ip {
// Only record per-IP stats if the IP still has active connections.
// This prevents orphaned entries when record_bytes races with
// connection_closed (which evicts all per-IP data on last close).
// Only record per-IP stats if the IP is active or still within the
// bounded inactive retention window after its last connection closed.
if self.ip_connections.contains_key(ip) {
if bytes_in > 0 {
if let Some(counter) = self.ip_bytes_in.get(ip) {
@@ -918,9 +999,13 @@ impl MetricsCollector {
self.http_domain_request_rates.remove(&domain);
}
// Keep closed IP buckets only for a short, bounded window so the sampler
// can attribute short-lived transfers without leaking per-IP maps.
self.prune_inactive_ip_tracking(current_time_ms());
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
// ip_connections entry. This catches any entries created by older races or
// by code paths that manually inserted partial per-IP state.
self.ip_bytes_in
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out
@@ -933,6 +1018,8 @@ impl MetricsCollector {
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_domain_requests
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_closed_at_ms
.retain(|k, _| self.ip_connections.contains_key(k));
// Safety-net: prune orphaned backend error/stats entries for backends
// that have no active or total connections (error-only backends).
@@ -1064,8 +1151,8 @@ impl MetricsCollector {
);
}
// Collect per-IP metrics — only IPs with active connections or total > 0,
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
// Collect per-IP metrics — capped to the IPs most relevant for either
// active connection visibility or bandwidth attribution.
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> =
Vec::new();
for entry in self.ip_total_connections.iter() {
@@ -1113,9 +1200,54 @@ impl MetricsCollector {
domain_requests,
));
}
// Sort by active connections descending, then cap
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
ip_entries.truncate(MAX_IPS_IN_SNAPSHOT);
if ip_entries.len() > MAX_IPS_IN_SNAPSHOT {
let mut selected = vec![false; ip_entries.len()];
let mut selected_count = 0usize;
let mut active_rank: Vec<usize> = (0..ip_entries.len()).collect();
active_rank.sort_by(|&a, &b| {
ip_entries[b]
.1
.cmp(&ip_entries[a].1)
.then_with(|| ip_entries[b].2.cmp(&ip_entries[a].2))
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
});
let mut throughput_rank: Vec<usize> = (0..ip_entries.len()).collect();
throughput_rank.sort_by(|&a, &b| {
let a_tp = ip_entries[a].5.saturating_add(ip_entries[a].6);
let b_tp = ip_entries[b].5.saturating_add(ip_entries[b].6);
let a_bytes = ip_entries[a].3.saturating_add(ip_entries[a].4);
let b_bytes = ip_entries[b].3.saturating_add(ip_entries[b].4);
b_tp.cmp(&a_tp)
.then_with(|| b_bytes.cmp(&a_bytes))
.then_with(|| ip_entries[b].1.cmp(&ip_entries[a].1))
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
});
for idx in active_rank.into_iter().take(MAX_IPS_IN_SNAPSHOT / 2) {
if !selected[idx] {
selected[idx] = true;
selected_count += 1;
}
}
for idx in throughput_rank {
if selected_count >= MAX_IPS_IN_SNAPSHOT {
break;
}
if !selected[idx] {
selected[idx] = true;
selected_count += 1;
}
}
ip_entries = ip_entries
.into_iter()
.enumerate()
.filter_map(|(idx, entry)| selected[idx].then_some(entry))
.collect();
}
let mut ips = std::collections::HashMap::new();
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries {
@@ -1447,9 +1579,42 @@ mod tests {
1
);
// Close last connection for IP — should be cleaned up
// Close last connection for IP — active count should drop to zero,
// while the inactive bucket is retained briefly for final sampling.
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
assert!(collector.ip_connections.get("1.2.3.4").is_none());
assert_eq!(
collector
.ip_connections
.get("1.2.3.4")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_closed_at_ms.get("1.2.3.4").is_some());
}
#[test]
fn test_snapshot_retains_high_throughput_ip_over_many_active_ips() {
let collector = MetricsCollector::with_retention(60);
for i in 1..=(MAX_IPS_IN_SNAPSHOT + 20) {
let ip = format!("10.0.0.{}", i);
collector.connection_opened(Some("scanner-route"), Some(&ip));
collector.connection_opened(Some("scanner-route"), Some(&ip));
}
let busy_ip = "203.0.113.10";
collector.connection_opened(Some("download-route"), Some(busy_ip));
collector.record_bytes(0, 900_000, Some("download-route"), Some(busy_ip));
collector.sample_all();
let snapshot = collector.snapshot();
let busy_metrics = snapshot.ips.get(busy_ip).unwrap();
assert_eq!(snapshot.ips.len(), MAX_IPS_IN_SNAPSHOT);
assert_eq!(busy_metrics.active_connections, 1);
assert_eq!(busy_metrics.bytes_out, 900_000);
assert_eq!(busy_metrics.throughput_out_bytes_per_sec, 900_000);
}
#[test]
@@ -1475,13 +1640,36 @@ mod tests {
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// All per-IP data for 10.0.0.1 should be evicted
// Per-IP data for 10.0.0.1 should be retained until the inactive TTL expires.
assert_eq!(
collector
.ip_connections
.get("10.0.0.1")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_total_connections.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_some());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_some());
assert!(collector.ip_throughput.get("10.0.0.1").is_some());
collector
.ip_closed_at_ms
.get("10.0.0.1")
.unwrap()
.store(0, Ordering::Relaxed);
collector.sample_all();
// Expired inactive buckets are fully evicted to prevent leaks.
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert!(collector.ip_total_connections.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert!(collector.ip_throughput.get("10.0.0.1").is_none());
assert!(collector.ip_closed_at_ms.get("10.0.0.1").is_none());
// 10.0.0.2 should still have data
assert!(collector.ip_connections.get("10.0.0.2").is_some());
@@ -1508,7 +1696,14 @@ mod tests {
.unwrap_or(0),
0
);
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert_eq!(
collector
.ip_connections
.get("10.0.0.1")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
}
#[test]
@@ -1649,26 +1844,52 @@ mod tests {
}
#[test]
fn test_record_bytes_after_close_no_orphan() {
fn test_closed_ip_keeps_pending_throughput_until_sample() {
let collector = MetricsCollector::with_retention(60);
// Open a connection, record bytes, then close
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 2000, Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// IP should be fully evicted
assert!(collector.ip_connections.get("10.0.0.1").is_none());
collector.sample_all();
let snapshot = collector.snapshot();
let ip_metrics = snapshot.ips.get("10.0.0.1").unwrap();
// Now record_bytes arrives late (simulates race) — should NOT re-create entries
collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1"));
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert_eq!(ip_metrics.active_connections, 0);
assert_eq!(ip_metrics.bytes_in, 100);
assert_eq!(ip_metrics.bytes_out, 2000);
assert_eq!(ip_metrics.throughput_in_bytes_per_sec, 100);
assert_eq!(ip_metrics.throughput_out_bytes_per_sec, 2000);
assert_eq!(snapshot.throughput_out_bytes_per_sec, 2000);
assert_eq!(
snapshot
.routes
.get("route-a")
.unwrap()
.throughput_out_bytes_per_sec,
2000
);
}
// Global bytes should still be counted
assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150);
assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275);
#[test]
fn test_inactive_ip_retention_hard_cap_prunes_oldest() {
let collector = MetricsCollector::with_retention(60);
for i in 0..(MAX_INACTIVE_IPS_RETAINED + 5) {
let ip = format!("198.51.{}.{}", i / 255, i % 255);
collector.connection_opened(Some("route-a"), Some(&ip));
collector.connection_closed(Some("route-a"), Some(&ip));
collector
.ip_closed_at_ms
.get(&ip)
.unwrap()
.store(current_time_ms().saturating_sub(1), Ordering::Relaxed);
}
collector.sample_all();
assert!(collector.ip_closed_at_ms.len() <= MAX_INACTIVE_IPS_RETAINED);
assert!(collector.ip_connections.len() <= MAX_INACTIVE_IPS_RETAINED);
}
#[test]
+4 -2
View File
@@ -188,10 +188,12 @@ tap.test('TCP forward - real-time byte tracking', async (tools) => {
const byRoute = m.throughput.byRoute();
console.log('TCP forward — throughput byRoute:', Array.from(byRoute.entries()));
// After close, per-IP data should be evicted (memory leak fix)
// After close, per-IP buckets are retained briefly for final throughput sampling,
// but active connection counts must already be zero.
const byIPAfter = m.connections.byIP();
console.log('TCP forward — connections byIP after close:', Array.from(byIPAfter.entries()));
expect(byIPAfter.size).toEqual(0);
expect(byIPAfter.size).toBeGreaterThan(0);
expect(Array.from(byIPAfter.values()).every((count) => count === 0)).toEqual(true);
await proxy.stop();
await tools.delayFor(200);
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '27.8.0',
version: '27.8.2',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}