Compare commits

...

10 Commits

Author SHA1 Message Date
jkunz 8fa3a51b03 v27.8.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 11:25:24 +00:00
jkunz 088ef6ab09 fix(rustproxy-metrics): retain inactive per-IP metric buckets briefly to capture final throughput before pruning 2026-04-26 11:25:24 +00:00
jkunz fdb5ec59bc v27.8.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 09:17:11 +00:00
jkunz 1ea290a085 fix(rustproxy-metrics): preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated 2026-04-26 09:17:11 +00:00
jkunz cb71f32b90 v27.8.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 12:43:59 +00:00
jkunz 46155ab12c feat(metrics): add per-domain HTTP request rate metrics 2026-04-14 12:43:59 +00:00
jkunz 490a310b54 v27.7.4
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 09:17:55 +00:00
jkunz 6c5180573a fix(rustproxy metrics): use stable route metrics keys across HTTP and passthrough listeners 2026-04-14 09:17:55 +00:00
jkunz 30e5ab308f v27.7.3
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 01:14:33 +00:00
jkunz d2a54b3491 fix(repo): no changes detected 2026-04-14 01:14:33 +00:00
16 changed files with 855 additions and 65 deletions
+32
View File
@@ -1,5 +1,37 @@
# Changelog
## 2026-04-26 - 27.8.2 - fix(rustproxy-metrics)
retain inactive per-IP metric buckets briefly to capture final throughput before pruning
- adds a bounded retention window for closed IP buckets so short-lived transfers are still included in per-IP throughput sampling
- prunes expired inactive IP tracking by TTL and hard cap to prevent unbounded metric map growth
- updates Rust and throughput tests to expect zero active connections during the temporary retention period
## 2026-04-26 - 27.8.1 - fix(rustproxy-metrics)
preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated
- Select snapshot IPs using a blend of active-connection and throughput rankings instead of only active connections
- Adds a regression test to ensure a high-bandwidth IP remains included when many other IPs have more active connections
## 2026-04-14 - 27.8.0 - feat(metrics)
add per-domain HTTP request rate metrics
- Record canonicalized HTTP request rates per domain in the Rust metrics collector and expose per-second and last-minute values in snapshots.
- Add TypeScript metrics interfaces and adapter support for requests.byDomain().
- Cover HTTP domain rate tracking and ensure TLS passthrough SNI traffic does not affect HTTP request rate metrics.
## 2026-04-14 - 27.7.4 - fix(rustproxy metrics)
use stable route metrics keys across HTTP and passthrough listeners
- adds a shared RouteConfig::metrics_key helper that prefers route name and falls back to route id
- updates HTTP, TCP, UDP, and QUIC metrics labeling to use the shared route metrics key consistently
- keeps route cancellation and rate limiter indexing bound to route config ids where required
- adds tests covering metrics key selection behavior
## 2026-04-14 - 27.7.3 - fix(repo)
no changes detected
## 2026-04-14 - 27.7.2 - fix(docs)
clarify metrics documentation for domain normalization and saturating gauges
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "27.7.2",
"version": "27.8.2",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
@@ -656,6 +656,11 @@ impl RouteConfig {
self.route_match.ports.to_ports()
}
/// Stable key used for frontend route-scoped metrics.
pub fn metrics_key(&self) -> Option<&str> {
self.name.as_deref().or(self.id.as_deref())
}
/// Get the TLS mode for this route (from action-level or first target).
pub fn tls_mode(&self) -> Option<&crate::tls_types::TlsMode> {
// Check action-level TLS first
@@ -673,3 +678,63 @@ impl RouteConfig {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_route(name: Option<&str>, id: Option<&str>) -> RouteConfig {
RouteConfig {
id: id.map(str::to_string),
route_match: RouteMatch {
ports: PortRange::Single(443),
transport: None,
domains: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
protocol: None,
},
action: RouteAction {
action_type: RouteActionType::Forward,
targets: None,
tls: None,
websocket: None,
load_balancing: None,
advanced: None,
options: None,
send_proxy_protocol: None,
udp: None,
},
headers: None,
security: None,
name: name.map(str::to_string),
description: None,
priority: None,
tags: None,
enabled: None,
}
}
#[test]
fn metrics_key_prefers_name() {
let route = test_route(Some("named-route"), Some("route-id"));
assert_eq!(route.metrics_key(), Some("named-route"));
}
#[test]
fn metrics_key_falls_back_to_id() {
let route = test_route(None, Some("route-id"));
assert_eq!(route.metrics_key(), Some("route-id"));
}
#[test]
fn metrics_key_is_absent_without_name_or_id() {
let route = test_route(None, None);
assert_eq!(route.metrics_key(), None);
}
}
@@ -639,10 +639,12 @@ impl HttpProxyService {
}
};
let route_id = route_match.route.id.as_deref();
let route_config_id = route_match.route.id.as_deref();
let route_id = route_match.route.metrics_key();
let ip_str = ip_string; // reuse from above (avoid redundant to_string())
self.metrics.record_http_request();
if let Some(ref h) = host {
self.metrics.record_http_domain_request(h);
self.metrics.record_ip_domain_request(&ip_str, h);
}
@@ -654,7 +656,7 @@ impl HttpProxyService {
.as_ref()
.filter(|rl| rl.enabled)
.map(|rl| {
let route_key = route_id.unwrap_or("__default__").to_string();
let route_key = route_config_id.unwrap_or("__default__").to_string();
self.route_rate_limiters
.entry(route_key)
.or_insert_with(|| Arc::new(RateLimiter::new(rl.max_requests, rl.window)))
+368 -42
View File
@@ -3,9 +3,9 @@ use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::throughput::{ThroughputSample, ThroughputTracker};
use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker};
/// Aggregated metrics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -26,6 +26,7 @@ pub struct Metrics {
pub total_http_requests: u64,
pub http_requests_per_sec: u64,
pub http_requests_per_sec_recent: u64,
pub http_domain_requests: std::collections::HashMap<String, HttpDomainRequestMetrics>,
// UDP metrics
pub active_udp_sessions: u64,
pub total_udp_sessions: u64,
@@ -66,6 +67,14 @@ pub struct IpMetrics {
pub domain_requests: HashMap<String, u64>,
}
/// Per-domain HTTP request rate metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpDomainRequestMetrics {
pub requests_per_second: u64,
pub requests_last_minute: u64,
}
/// Per-backend metrics (keyed by "host:port").
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -135,15 +144,24 @@ pub struct Statistics {
/// Default retention for throughput samples (1 hour).
const DEFAULT_RETENTION_SECONDS: usize = 3600;
/// Maximum number of IPs to include in a snapshot (top by active connections).
/// Maximum number of IPs to include in a snapshot.
const MAX_IPS_IN_SNAPSHOT: usize = 100;
/// How long to retain inactive IP metric buckets after the last connection closes.
const INACTIVE_IP_RETENTION_MS: u64 = 15_000;
/// Hard cap for inactive IP metric buckets retained between sampler passes.
const MAX_INACTIVE_IPS_RETAINED: usize = MAX_IPS_IN_SNAPSHOT * 10;
/// Maximum number of backends to include in a snapshot (top by total connections).
const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse).
const MAX_DOMAINS_PER_IP: usize = 256;
/// Number of one-second HTTP request samples retained per domain.
const HTTP_DOMAIN_REQUEST_WINDOW_SECONDS: usize = 60;
fn canonicalize_domain_key(domain: &str) -> Option<String> {
let normalized = domain.trim().trim_end_matches('.').to_ascii_lowercase();
if normalized.is_empty() {
@@ -153,6 +171,13 @@ fn canonicalize_domain_key(domain: &str) -> Option<String> {
}
}
fn current_time_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
/// Metrics collector tracking connections and throughput.
///
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
@@ -174,6 +199,7 @@ pub struct MetricsCollector {
// ── Per-IP tracking ──
ip_connections: DashMap<String, AtomicU64>,
ip_closed_at_ms: DashMap<String, AtomicU64>,
ip_total_connections: DashMap<String, AtomicU64>,
ip_bytes_in: DashMap<String, AtomicU64>,
ip_bytes_out: DashMap<String, AtomicU64>,
@@ -201,6 +227,7 @@ pub struct MetricsCollector {
total_http_requests: AtomicU64,
pending_http_requests: AtomicU64,
http_request_throughput: Mutex<ThroughputTracker>,
http_domain_request_rates: DashMap<String, Mutex<RequestRateTracker>>,
// ── UDP metrics ──
active_udp_sessions: AtomicU64,
@@ -260,6 +287,7 @@ impl MetricsCollector {
route_bytes_in: DashMap::new(),
route_bytes_out: DashMap::new(),
ip_connections: DashMap::new(),
ip_closed_at_ms: DashMap::new(),
ip_total_connections: DashMap::new(),
ip_bytes_in: DashMap::new(),
ip_bytes_out: DashMap::new(),
@@ -284,6 +312,7 @@ impl MetricsCollector {
total_http_requests: AtomicU64::new(0),
pending_http_requests: AtomicU64::new(0),
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
http_domain_request_rates: DashMap::new(),
frontend_h1_active: AtomicU64::new(0),
frontend_h1_total: AtomicU64::new(0),
frontend_h2_active: AtomicU64::new(0),
@@ -334,6 +363,7 @@ impl MetricsCollector {
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.ip_closed_at_ms.remove(ip);
self.ip_total_connections
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
@@ -378,22 +408,88 @@ impl MetricsCollector {
}
})
.ok();
// Clean up zero-count entries to prevent memory growth
// Keep inactive IP buckets briefly so pending bytes can still
// be sampled into per-IP throughput after short-lived transfers.
if matches!(prev, Some(v) if v <= 1) {
drop(counter);
self.ip_connections.remove(ip);
// Evict all per-IP tracking data for this IP
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
self.ip_closed_at_ms
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.store(current_time_ms(), Ordering::Relaxed);
}
}
}
}
fn remove_inactive_ip_tracking(&self, ip: &str) {
if self
.ip_connections
.get(ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0)
> 0
{
self.ip_closed_at_ms.remove(ip);
return;
}
self.ip_connections.remove(ip);
self.ip_closed_at_ms.remove(ip);
self.ip_total_connections.remove(ip);
self.ip_bytes_in.remove(ip);
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
}
fn prune_inactive_ip_tracking(&self, now_ms: u64) {
let cutoff_ms = now_ms.saturating_sub(INACTIVE_IP_RETENTION_MS);
let mut inactive_ips: Vec<(String, u64)> = Vec::new();
let mut active_markers: Vec<String> = Vec::new();
for entry in self.ip_closed_at_ms.iter() {
let ip = entry.key().clone();
let active = self
.ip_connections
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
if active > 0 {
active_markers.push(ip);
} else {
inactive_ips.push((ip, entry.value().load(Ordering::Relaxed)));
}
}
for ip in active_markers {
self.ip_closed_at_ms.remove(&ip);
}
let mut remove_ips: HashSet<String> = inactive_ips
.iter()
.filter(|(_, closed_at_ms)| *closed_at_ms <= cutoff_ms)
.map(|(ip, _)| ip.clone())
.collect();
let retained_after_ttl = inactive_ips.len().saturating_sub(remove_ips.len());
if retained_after_ttl > MAX_INACTIVE_IPS_RETAINED {
inactive_ips.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
let mut overflow = retained_after_ttl - MAX_INACTIVE_IPS_RETAINED;
for (ip, closed_at_ms) in inactive_ips {
if overflow == 0 {
break;
}
if closed_at_ms > cutoff_ms && remove_ips.insert(ip) {
overflow -= 1;
}
}
}
for ip in remove_ips {
self.remove_inactive_ip_tracking(&ip);
}
}
/// Record bytes transferred (lock-free hot path).
///
/// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters —
@@ -467,9 +563,8 @@ impl MetricsCollector {
// Per-IP tracking: same get()-first pattern to avoid String allocation on hot path.
if let Some(ip) = source_ip {
// Only record per-IP stats if the IP still has active connections.
// This prevents orphaned entries when record_bytes races with
// connection_closed (which evicts all per-IP data on last close).
// Only record per-IP stats if the IP is active or still within the
// bounded inactive retention window after its last connection closed.
if self.ip_connections.contains_key(ip) {
if bytes_in > 0 {
if let Some(counter) = self.ip_bytes_in.get(ip) {
@@ -522,6 +617,24 @@ impl MetricsCollector {
self.pending_http_requests.fetch_add(1, Ordering::Relaxed);
}
/// Record a real HTTP request for a canonicalized domain.
pub fn record_http_domain_request(&self, domain: &str) {
let Some(domain) = canonicalize_domain_key(domain) else {
return;
};
self.http_domain_request_rates
.entry(domain.clone())
.or_insert_with(|| {
Mutex::new(RequestRateTracker::new(HTTP_DOMAIN_REQUEST_WINDOW_SECONDS))
});
if let Some(tracker_ref) = self.http_domain_request_rates.get(domain.as_str()) {
if let Ok(mut tracker) = tracker_ref.value().lock() {
tracker.record_event();
}
}
}
/// Record a domain request/connection for a frontend IP.
///
/// Called per HTTP request (with Host header) and per TCP passthrough
@@ -791,8 +904,7 @@ impl MetricsCollector {
/// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval).
///
/// Drains the lock-free pending counters and feeds the accumulated bytes
/// into the throughput trackers (under Mutex). This is the only place
/// the Mutex is locked.
/// into the throughput trackers under their sampling mutexes.
pub fn sample_all(&self) {
// Drain global pending bytes and feed into the tracker
let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed);
@@ -873,9 +985,27 @@ impl MetricsCollector {
tracker.sample();
}
// Advance HTTP domain request windows and prune fully idle domains.
let mut stale_http_domains = Vec::new();
for entry in self.http_domain_request_rates.iter() {
if let Ok(mut tracker) = entry.value().lock() {
tracker.advance_to_now();
if tracker.is_idle() {
stale_http_domains.push(entry.key().clone());
}
}
}
for domain in stale_http_domains {
self.http_domain_request_rates.remove(&domain);
}
// Keep closed IP buckets only for a short, bounded window so the sampler
// can attribute short-lived transfers without leaking per-IP maps.
self.prune_inactive_ip_tracking(current_time_ms());
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
// ip_connections entry. This catches any entries created by older races or
// by code paths that manually inserted partial per-IP state.
self.ip_bytes_in
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out
@@ -888,6 +1018,8 @@ impl MetricsCollector {
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_domain_requests
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_closed_at_ms
.retain(|k, _| self.ip_connections.contains_key(k));
// Safety-net: prune orphaned backend error/stats entries for backends
// that have no active or total connections (error-only backends).
@@ -1019,8 +1151,8 @@ impl MetricsCollector {
);
}
// Collect per-IP metrics — only IPs with active connections or total > 0,
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
// Collect per-IP metrics — capped to the IPs most relevant for either
// active connection visibility or bandwidth attribution.
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> =
Vec::new();
for entry in self.ip_total_connections.iter() {
@@ -1068,9 +1200,54 @@ impl MetricsCollector {
domain_requests,
));
}
// Sort by active connections descending, then cap
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
ip_entries.truncate(MAX_IPS_IN_SNAPSHOT);
if ip_entries.len() > MAX_IPS_IN_SNAPSHOT {
let mut selected = vec![false; ip_entries.len()];
let mut selected_count = 0usize;
let mut active_rank: Vec<usize> = (0..ip_entries.len()).collect();
active_rank.sort_by(|&a, &b| {
ip_entries[b]
.1
.cmp(&ip_entries[a].1)
.then_with(|| ip_entries[b].2.cmp(&ip_entries[a].2))
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
});
let mut throughput_rank: Vec<usize> = (0..ip_entries.len()).collect();
throughput_rank.sort_by(|&a, &b| {
let a_tp = ip_entries[a].5.saturating_add(ip_entries[a].6);
let b_tp = ip_entries[b].5.saturating_add(ip_entries[b].6);
let a_bytes = ip_entries[a].3.saturating_add(ip_entries[a].4);
let b_bytes = ip_entries[b].3.saturating_add(ip_entries[b].4);
b_tp.cmp(&a_tp)
.then_with(|| b_bytes.cmp(&a_bytes))
.then_with(|| ip_entries[b].1.cmp(&ip_entries[a].1))
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
});
for idx in active_rank.into_iter().take(MAX_IPS_IN_SNAPSHOT / 2) {
if !selected[idx] {
selected[idx] = true;
selected_count += 1;
}
}
for idx in throughput_rank {
if selected_count >= MAX_IPS_IN_SNAPSHOT {
break;
}
if !selected[idx] {
selected[idx] = true;
selected_count += 1;
}
}
ip_entries = ip_entries
.into_iter()
.enumerate()
.filter_map(|(idx, entry)| selected[idx].then_some(entry))
.collect();
}
let mut ips = std::collections::HashMap::new();
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries {
@@ -1179,6 +1356,24 @@ impl MetricsCollector {
})
.unwrap_or((0, 0));
let mut http_domain_requests = std::collections::HashMap::new();
for entry in self.http_domain_request_rates.iter() {
if let Ok(mut tracker) = entry.value().lock() {
tracker.advance_to_now();
let requests_per_second = tracker.last_second();
let requests_last_minute = tracker.last_minute();
if requests_per_second > 0 || requests_last_minute > 0 {
http_domain_requests.insert(
entry.key().clone(),
HttpDomainRequestMetrics {
requests_per_second,
requests_last_minute,
},
);
}
}
}
Metrics {
active_connections: self.active_connections(),
total_connections: self.total_connections(),
@@ -1195,6 +1390,7 @@ impl MetricsCollector {
total_http_requests: self.total_http_requests.load(Ordering::Relaxed),
http_requests_per_sec: http_rps,
http_requests_per_sec_recent: http_rps_recent,
http_domain_requests,
active_udp_sessions: self.active_udp_sessions.load(Ordering::Relaxed),
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
@@ -1383,9 +1579,42 @@ mod tests {
1
);
// Close last connection for IP — should be cleaned up
// Close last connection for IP — active count should drop to zero,
// while the inactive bucket is retained briefly for final sampling.
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
assert!(collector.ip_connections.get("1.2.3.4").is_none());
assert_eq!(
collector
.ip_connections
.get("1.2.3.4")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_closed_at_ms.get("1.2.3.4").is_some());
}
#[test]
fn test_snapshot_retains_high_throughput_ip_over_many_active_ips() {
let collector = MetricsCollector::with_retention(60);
for i in 1..=(MAX_IPS_IN_SNAPSHOT + 20) {
let ip = format!("10.0.0.{}", i);
collector.connection_opened(Some("scanner-route"), Some(&ip));
collector.connection_opened(Some("scanner-route"), Some(&ip));
}
let busy_ip = "203.0.113.10";
collector.connection_opened(Some("download-route"), Some(busy_ip));
collector.record_bytes(0, 900_000, Some("download-route"), Some(busy_ip));
collector.sample_all();
let snapshot = collector.snapshot();
let busy_metrics = snapshot.ips.get(busy_ip).unwrap();
assert_eq!(snapshot.ips.len(), MAX_IPS_IN_SNAPSHOT);
assert_eq!(busy_metrics.active_connections, 1);
assert_eq!(busy_metrics.bytes_out, 900_000);
assert_eq!(busy_metrics.throughput_out_bytes_per_sec, 900_000);
}
#[test]
@@ -1411,13 +1640,36 @@ mod tests {
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// All per-IP data for 10.0.0.1 should be evicted
// Per-IP data for 10.0.0.1 should be retained until the inactive TTL expires.
assert_eq!(
collector
.ip_connections
.get("10.0.0.1")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_total_connections.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_some());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_some());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_some());
assert!(collector.ip_throughput.get("10.0.0.1").is_some());
collector
.ip_closed_at_ms
.get("10.0.0.1")
.unwrap()
.store(0, Ordering::Relaxed);
collector.sample_all();
// Expired inactive buckets are fully evicted to prevent leaks.
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert!(collector.ip_total_connections.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert!(collector.ip_throughput.get("10.0.0.1").is_none());
assert!(collector.ip_closed_at_ms.get("10.0.0.1").is_none());
// 10.0.0.2 should still have data
assert!(collector.ip_connections.get("10.0.0.2").is_some());
@@ -1444,7 +1696,14 @@ mod tests {
.unwrap_or(0),
0
);
assert!(collector.ip_connections.get("10.0.0.1").is_none());
assert_eq!(
collector
.ip_connections
.get("10.0.0.1")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
}
#[test]
@@ -1514,6 +1773,47 @@ mod tests {
assert_eq!(snapshot.http_requests_per_sec, 3);
}
#[test]
fn test_http_domain_request_rates_are_canonicalized() {
let collector = MetricsCollector::with_retention(60);
collector.record_http_domain_request("Example.COM");
collector.record_http_domain_request("example.com.");
collector.record_http_domain_request(" example.com ");
let now_sec = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if let Some(tracker) = collector.http_domain_request_rates.get("example.com") {
tracker.value().lock().unwrap().advance_to(now_sec + 1);
}
let snapshot = collector.snapshot();
let metrics = snapshot.http_domain_requests.get("example.com").unwrap();
assert_eq!(snapshot.http_domain_requests.len(), 1);
assert_eq!(metrics.requests_per_second, 3);
assert_eq!(metrics.requests_last_minute, 3);
}
#[test]
fn test_ip_domain_requests_do_not_affect_http_domain_request_rates() {
let collector = MetricsCollector::with_retention(60);
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_ip_domain_request("10.0.0.1", "example.com");
let snapshot = collector.snapshot();
assert!(snapshot.http_domain_requests.is_empty());
assert_eq!(
snapshot
.ips
.get("10.0.0.1")
.and_then(|ip| ip.domain_requests.get("example.com")),
Some(&1)
);
}
#[test]
fn test_retain_routes_prunes_stale() {
let collector = MetricsCollector::with_retention(60);
@@ -1544,26 +1844,52 @@ mod tests {
}
#[test]
fn test_record_bytes_after_close_no_orphan() {
fn test_closed_ip_keeps_pending_throughput_until_sample() {
let collector = MetricsCollector::with_retention(60);
// Open a connection, record bytes, then close
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 2000, Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// IP should be fully evicted
assert!(collector.ip_connections.get("10.0.0.1").is_none());
collector.sample_all();
let snapshot = collector.snapshot();
let ip_metrics = snapshot.ips.get("10.0.0.1").unwrap();
// Now record_bytes arrives late (simulates race) — should NOT re-create entries
collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1"));
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
assert_eq!(ip_metrics.active_connections, 0);
assert_eq!(ip_metrics.bytes_in, 100);
assert_eq!(ip_metrics.bytes_out, 2000);
assert_eq!(ip_metrics.throughput_in_bytes_per_sec, 100);
assert_eq!(ip_metrics.throughput_out_bytes_per_sec, 2000);
assert_eq!(snapshot.throughput_out_bytes_per_sec, 2000);
assert_eq!(
snapshot
.routes
.get("route-a")
.unwrap()
.throughput_out_bytes_per_sec,
2000
);
}
// Global bytes should still be counted
assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150);
assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275);
#[test]
fn test_inactive_ip_retention_hard_cap_prunes_oldest() {
let collector = MetricsCollector::with_retention(60);
for i in 0..(MAX_INACTIVE_IPS_RETAINED + 5) {
let ip = format!("198.51.{}.{}", i / 255, i % 255);
collector.connection_opened(Some("route-a"), Some(&ip));
collector.connection_closed(Some("route-a"), Some(&ip));
collector
.ip_closed_at_ms
.get(&ip)
.unwrap()
.store(current_time_ms().saturating_sub(1), Ordering::Relaxed);
}
collector.sample_all();
assert!(collector.ip_closed_at_ms.len() <= MAX_INACTIVE_IPS_RETAINED);
assert!(collector.ip_connections.len() <= MAX_INACTIVE_IPS_RETAINED);
}
#[test]
+146 -1
View File
@@ -29,6 +29,113 @@ pub struct ThroughputTracker {
created_at: Instant,
}
fn unix_timestamp_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
/// Circular buffer for per-second event counts.
///
/// Unlike `ThroughputTracker`, events are recorded directly into the current
/// second so request counts remain stable even when the collector is sampled
/// more frequently than once per second.
pub(crate) struct RequestRateTracker {
samples: Vec<u64>,
write_index: usize,
count: usize,
capacity: usize,
current_second: Option<u64>,
current_count: u64,
}
impl RequestRateTracker {
pub(crate) fn new(retention_seconds: usize) -> Self {
Self {
samples: Vec::with_capacity(retention_seconds.max(1)),
write_index: 0,
count: 0,
capacity: retention_seconds.max(1),
current_second: None,
current_count: 0,
}
}
fn push_sample(&mut self, count: u64) {
if self.samples.len() < self.capacity {
self.samples.push(count);
} else {
self.samples[self.write_index] = count;
}
self.write_index = (self.write_index + 1) % self.capacity;
self.count = (self.count + 1).min(self.capacity);
}
pub(crate) fn record_event(&mut self) {
self.record_events_at(unix_timestamp_seconds(), 1);
}
pub(crate) fn record_events_at(&mut self, now_sec: u64, count: u64) {
self.advance_to(now_sec);
self.current_count = self.current_count.saturating_add(count);
}
pub(crate) fn advance_to_now(&mut self) {
self.advance_to(unix_timestamp_seconds());
}
pub(crate) fn advance_to(&mut self, now_sec: u64) {
match self.current_second {
Some(current_second) if now_sec > current_second => {
self.push_sample(self.current_count);
for _ in 1..(now_sec - current_second) {
self.push_sample(0);
}
self.current_second = Some(now_sec);
self.current_count = 0;
}
Some(_) => {}
None => {
self.current_second = Some(now_sec);
self.current_count = 0;
}
}
}
fn sum_recent(&self, window_seconds: usize) -> u64 {
let window = window_seconds.min(self.count);
if window == 0 {
return 0;
}
let mut total = 0u64;
for i in 0..window {
let idx = if self.write_index >= i + 1 {
self.write_index - i - 1
} else {
self.capacity - (i + 1 - self.write_index)
};
if idx < self.samples.len() {
total += self.samples[idx];
}
}
total
}
pub(crate) fn last_second(&self) -> u64 {
self.sum_recent(1)
}
pub(crate) fn last_minute(&self) -> u64 {
self.sum_recent(60)
}
pub(crate) fn is_idle(&self) -> bool {
self.current_count == 0 && self.sum_recent(self.capacity) == 0
}
}
impl ThroughputTracker {
/// Create a new tracker with the given capacity (seconds of retention).
pub fn new(retention_seconds: usize) -> Self {
@@ -46,7 +153,8 @@ impl ThroughputTracker {
/// Record bytes (called from data flow callbacks).
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64) {
self.pending_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.pending_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
self.pending_bytes_out
.fetch_add(bytes_out, Ordering::Relaxed);
}
/// Take a sample (called at 1Hz).
@@ -229,4 +337,41 @@ mod tests {
let history = tracker.history(10);
assert!(history.is_empty());
}
#[test]
fn test_request_rate_tracker_counts_last_second_and_last_minute() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 2);
tracker.record_events_at(100, 3);
tracker.advance_to(101);
assert_eq!(tracker.last_second(), 5);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_adds_zero_samples_for_gaps() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 4);
tracker.record_events_at(102, 1);
tracker.advance_to(103);
assert_eq!(tracker.last_second(), 1);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_decays_to_zero_over_window() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 7);
tracker.advance_to(101);
tracker.advance_to(161);
assert_eq!(tracker.last_second(), 0);
assert_eq!(tracker.last_minute(), 0);
assert!(tracker.is_idle());
}
}
@@ -420,7 +420,7 @@ pub async fn quic_accept_loop(
}
conn_tracker.connection_opened(&ip);
let route_id = route.name.clone().or(route.id.clone());
let route_id = route.metrics_key().map(str::to_string);
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
// Resolve per-route cancel token (child of global cancel)
@@ -541,7 +541,7 @@ async fn handle_quic_stream_forwarding(
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
let route_id = route.name.as_deref().or(route.id.as_deref());
let route_id = route.metrics_key();
let metrics_arc = metrics;
// Resolve backend target
@@ -715,10 +715,11 @@ impl TcpListenerManager {
} else if let Some(target) = quick_match.target {
let target_host = target.host.first().to_string();
let target_port = target.port.resolve(port);
let route_id = quick_match.route.id.as_deref();
let route_config_id = quick_match.route.id.as_deref();
let route_id = quick_match.route.metrics_key();
// Resolve per-route cancel token (child of global cancel)
let route_cancel = match route_id {
let route_cancel = match route_config_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
@@ -733,7 +734,7 @@ impl TcpListenerManager {
cancel: conn_cancel.clone(),
source_ip: peer_addr.ip(),
domain: None, // fast path has no domain
route_id: route_id.map(|s| s.to_string()),
route_id: route_config_id.map(|s| s.to_string()),
},
);
@@ -905,12 +906,13 @@ impl TcpListenerManager {
}
};
let route_id = route_match.route.id.as_deref();
let route_config_id = route_match.route.id.as_deref();
let route_id = route_match.route.metrics_key();
// Resolve per-route cancel token (child of global cancel).
// When this route is removed via updateRoutes, the token is cancelled,
// terminating all connections on this route.
let route_cancel = match route_id {
let route_cancel = match route_config_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
@@ -925,7 +927,7 @@ impl TcpListenerManager {
cancel: cancel.clone(),
source_ip: peer_addr.ip(),
domain: domain.clone(),
route_id: route_id.map(|s| s.to_string()),
route_id: route_config_id.map(|s| s.to_string()),
},
);
@@ -1314,9 +1316,7 @@ impl TcpListenerManager {
};
// Build metadata JSON
let route_key = route_match.route.name.as_deref()
.or(route_match.route.id.as_deref())
.unwrap_or("unknown");
let route_key = route_match.route.metrics_key().unwrap_or("unknown");
let metadata = serde_json::json!({
"routeKey": route_key,
@@ -617,7 +617,7 @@ impl UdpListenerManager {
};
let route = route_match.route;
let route_id = route.name.as_deref().or(route.id.as_deref());
let route_id = route.metrics_key();
// Socket handler routes → relay datagram to TS via persistent Unix socket
if route.action.action_type == RouteActionType::SocketHandler {
+191
View File
@@ -0,0 +1,191 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { SmartProxy } from '../ts/index.js';
import * as http from 'http';
import * as net from 'net';
import * as tls from 'tls';
import * as fs from 'fs';
import * as path from 'path';
import { fileURLToPath } from 'url';
import { assertPortsFree, findFreePorts } from './helpers/port-allocator.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const CERT_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'cert.pem'), 'utf8');
const KEY_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'key.pem'), 'utf8');
let httpBackendPort: number;
let tlsBackendPort: number;
let httpProxyPort: number;
let tlsProxyPort: number;
let httpBackend: http.Server;
let tlsBackend: tls.Server;
let proxy: SmartProxy;
async function pollMetrics(proxyToPoll: SmartProxy): Promise<void> {
await (proxyToPoll as any).metricsAdapter.poll();
}
async function waitForCondition(
callback: () => Promise<boolean>,
timeoutMs: number = 5000,
stepMs: number = 100,
): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (await callback()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, stepMs));
}
throw new Error(`Condition not met within ${timeoutMs}ms`);
}
function hasIpDomainRequest(domain: string): boolean {
const byIp = proxy.getMetrics().connections.domainRequestsByIP();
for (const domainMap of byIp.values()) {
if (domainMap.has(domain)) {
return true;
}
}
return false;
}
tap.test('setup - backend servers for HTTP domain rate metrics', async () => {
[httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort] = await findFreePorts(4);
httpBackend = http.createServer((req, res) => {
let body = '';
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(`ok:${body}`);
});
});
await new Promise<void>((resolve) => {
httpBackend.listen(httpBackendPort, () => resolve());
});
tlsBackend = tls.createServer({ cert: CERT_PEM, key: KEY_PEM }, (socket) => {
socket.on('data', (data) => {
socket.write(data);
});
socket.on('error', () => {});
});
await new Promise<void>((resolve) => {
tlsBackend.listen(tlsBackendPort, () => resolve());
});
});
tap.test('setup - start proxy with HTTP and TLS passthrough routes', async () => {
proxy = new SmartProxy({
routes: [
{
id: 'http-domain-rates',
name: 'http-domain-rates',
match: { ports: httpProxyPort, domains: 'example.com' },
action: {
type: 'forward',
targets: [{ host: 'localhost', port: httpBackendPort }],
},
},
{
id: 'tls-passthrough-domain-rates',
name: 'tls-passthrough-domain-rates',
match: { ports: tlsProxyPort, domains: 'passthrough.example.com' },
action: {
type: 'forward',
tls: { mode: 'passthrough' },
targets: [{ host: 'localhost', port: tlsBackendPort }],
},
},
],
metrics: { enabled: true, sampleIntervalMs: 100, retentionSeconds: 60 },
});
await proxy.start();
await new Promise((resolve) => setTimeout(resolve, 300));
});
tap.test('HTTP requests populate per-domain HTTP request rates', async () => {
for (let i = 0; i < 3; i++) {
await new Promise<void>((resolve, reject) => {
const body = `payload-${i}`;
const req = http.request(
{
hostname: 'localhost',
port: httpProxyPort,
path: '/echo',
method: 'POST',
headers: {
Host: 'Example.COM',
'Content-Type': 'text/plain',
'Content-Length': String(body.length),
},
},
(res) => {
res.resume();
res.on('end', () => resolve());
},
);
req.on('error', reject);
req.end(body);
});
}
await waitForCondition(async () => {
await pollMetrics(proxy);
const domainMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
return (domainMetrics?.lastMinute ?? 0) >= 3 && (domainMetrics?.perSecond ?? 0) > 0;
});
const exampleMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
expect(exampleMetrics).toBeTruthy();
expect(exampleMetrics?.lastMinute).toEqual(3);
expect(exampleMetrics?.perSecond).toBeGreaterThan(0);
});
tap.test('TLS passthrough SNI does not inflate HTTP domain request rates', async () => {
const tlsClient = tls.connect({
host: 'localhost',
port: tlsProxyPort,
servername: 'passthrough.example.com',
rejectUnauthorized: false,
});
await new Promise<void>((resolve, reject) => {
tlsClient.once('secureConnect', () => resolve());
tlsClient.once('error', reject);
});
const echoPromise = new Promise<void>((resolve, reject) => {
tlsClient.once('data', () => resolve());
tlsClient.once('error', reject);
});
tlsClient.write(Buffer.from('hello over tls passthrough'));
await echoPromise;
await waitForCondition(async () => {
await pollMetrics(proxy);
return hasIpDomainRequest('passthrough.example.com');
});
const requestRates = proxy.getMetrics().requests.byDomain();
expect(requestRates.has('passthrough.example.com')).toBeFalse();
expect(requestRates.get('example.com')?.lastMinute).toEqual(3);
expect(hasIpDomainRequest('passthrough.example.com')).toBeTrue();
tlsClient.destroy();
});
tap.test('cleanup - stop proxy and close backend servers', async () => {
await proxy.stop();
await new Promise<void>((resolve) => httpBackend.close(() => resolve()));
await new Promise<void>((resolve) => tlsBackend.close(() => resolve()));
await assertPortsFree([httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort]);
});
export default tap.start()
+4 -1
View File
@@ -83,6 +83,9 @@ tap.test('should verify new metrics API structure', async () => {
expect(metrics.throughput).toHaveProperty('history');
expect(metrics.throughput).toHaveProperty('byRoute');
expect(metrics.throughput).toHaveProperty('byIP');
// Check request methods
expect(metrics.requests).toHaveProperty('byDomain');
});
tap.test('should track active connections', async (tools) => {
@@ -273,4 +276,4 @@ tap.test('should clean up resources', async () => {
await assertPortsFree([echoServerPort, proxyPort]);
});
export default tap.start();
export default tap.start();
+4 -2
View File
@@ -188,10 +188,12 @@ tap.test('TCP forward - real-time byte tracking', async (tools) => {
const byRoute = m.throughput.byRoute();
console.log('TCP forward — throughput byRoute:', Array.from(byRoute.entries()));
// After close, per-IP data should be evicted (memory leak fix)
// After close, per-IP buckets are retained briefly for final throughput sampling,
// but active connection counts must already be zero.
const byIPAfter = m.connections.byIP();
console.log('TCP forward — connections byIP after close:', Array.from(byIPAfter.entries()));
expect(byIPAfter.size).toEqual(0);
expect(byIPAfter.size).toBeGreaterThan(0);
expect(Array.from(byIPAfter.values()).every((count) => count === 0)).toEqual(true);
await proxy.stop();
await tools.delayFor(200);
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '27.7.2',
version: '27.8.2',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}
@@ -29,6 +29,11 @@ export interface IThroughputHistoryPoint {
out: number;
}
export interface IRequestRateMetrics {
perSecond: number;
lastMinute: number;
}
/**
* Main metrics interface with clean, grouped API
*/
@@ -81,6 +86,7 @@ export interface IMetrics {
perSecond(): number;
perMinute(): number;
total(): number;
byDomain(): Map<string, IRequestRateMetrics>;
};
// Cumulative totals
@@ -185,4 +191,4 @@ export interface IByteTracker {
bytesOut: number;
startTime: number;
lastUpdate: number;
}
}
@@ -134,6 +134,11 @@ export interface IRustBackendMetrics {
h2Failures: number;
}
export interface IRustHttpDomainRequestMetrics {
requestsPerSecond: number;
requestsLastMinute: number;
}
export interface IRustMetricsSnapshot {
activeConnections: number;
totalConnections: number;
@@ -150,6 +155,7 @@ export interface IRustMetricsSnapshot {
totalHttpRequests: number;
httpRequestsPerSec: number;
httpRequestsPerSecRecent: number;
httpDomainRequests: Record<string, IRustHttpDomainRequestMetrics>;
activeUdpSessions: number;
totalUdpSessions: number;
totalDatagramsIn: number;
+14 -2
View File
@@ -1,6 +1,6 @@
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IRequestRateMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { RustProxyBridge } from './rust-proxy-bridge.js';
import type { IRustBackendMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
import type { IRustBackendMetrics, IRustHttpDomainRequestMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
/**
* Adapts Rust JSON metrics to the IMetrics interface.
@@ -219,6 +219,18 @@ export class RustMetricsAdapter implements IMetrics {
total: (): number => {
return this.cache?.totalHttpRequests ?? this.cache?.totalConnections ?? 0;
},
byDomain: (): Map<string, IRequestRateMetrics> => {
const result = new Map<string, IRequestRateMetrics>();
if (this.cache?.httpDomainRequests) {
for (const [domain, metrics] of Object.entries(this.cache.httpDomainRequests) as Array<[string, IRustHttpDomainRequestMetrics]>) {
result.set(domain, {
perSecond: metrics.requestsPerSecond ?? 0,
lastMinute: metrics.requestsLastMinute ?? 0,
});
}
}
return result;
},
};
public totals = {