Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8fa3a51b03 | |||
| 088ef6ab09 | |||
| fdb5ec59bc | |||
| 1ea290a085 | |||
| cb71f32b90 | |||
| 46155ab12c | |||
| 490a310b54 | |||
| 6c5180573a | |||
| 30e5ab308f | |||
| d2a54b3491 | |||
| dc922c97df | |||
| 8d1bae7604 |
@@ -1,5 +1,43 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-26 - 27.8.2 - fix(rustproxy-metrics)
|
||||
retain inactive per-IP metric buckets briefly to capture final throughput before pruning
|
||||
|
||||
- adds a bounded retention window for closed IP buckets so short-lived transfers are still included in per-IP throughput sampling
|
||||
- prunes expired inactive IP tracking by TTL and hard cap to prevent unbounded metric map growth
|
||||
- updates Rust and throughput tests to expect zero active connections during the temporary retention period
|
||||
|
||||
## 2026-04-26 - 27.8.1 - fix(rustproxy-metrics)
|
||||
preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated
|
||||
|
||||
- Select snapshot IPs using a blend of active-connection and throughput rankings instead of only active connections
|
||||
- Adds a regression test to ensure a high-bandwidth IP remains included when many other IPs have more active connections
|
||||
|
||||
## 2026-04-14 - 27.8.0 - feat(metrics)
|
||||
add per-domain HTTP request rate metrics
|
||||
|
||||
- Record canonicalized HTTP request rates per domain in the Rust metrics collector and expose per-second and last-minute values in snapshots.
|
||||
- Add TypeScript metrics interfaces and adapter support for requests.byDomain().
|
||||
- Cover HTTP domain rate tracking and ensure TLS passthrough SNI traffic does not affect HTTP request rate metrics.
|
||||
|
||||
## 2026-04-14 - 27.7.4 - fix(rustproxy metrics)
|
||||
use stable route metrics keys across HTTP and passthrough listeners
|
||||
|
||||
- adds a shared RouteConfig::metrics_key helper that prefers route name and falls back to route id
|
||||
- updates HTTP, TCP, UDP, and QUIC metrics labeling to use the shared route metrics key consistently
|
||||
- keeps route cancellation and rate limiter indexing bound to route config ids where required
|
||||
- adds tests covering metrics key selection behavior
|
||||
|
||||
## 2026-04-14 - 27.7.3 - fix(repo)
|
||||
no changes detected
|
||||
|
||||
|
||||
## 2026-04-14 - 27.7.2 - fix(docs)
|
||||
clarify metrics documentation for domain normalization and saturating gauges
|
||||
|
||||
- Document that per-IP domain keys are normalized to lowercase and have trailing dots stripped before counting.
|
||||
- Clarify that the saturating close pattern also applies to connection and UDP active gauges.
|
||||
|
||||
## 2026-04-14 - 27.7.1 - fix(rustproxy-http,rustproxy-metrics)
|
||||
fix domain-scoped request host detection and harden connection metrics cleanup
|
||||
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "27.7.1",
|
||||
"version": "27.8.2",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
+2
-2
@@ -78,7 +78,7 @@ Entries are pruned via `retain_routes()` when routes are removed.
|
||||
|
||||
All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in `sample_all()` catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending.
|
||||
|
||||
**Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.
|
||||
**Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Domain keys are normalized to lowercase with any trailing dot stripped so the same hostname does not fragment across multiple counters. Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.
|
||||
|
||||
### Per-Backend Metrics (keyed by "host:port")
|
||||
|
||||
@@ -110,7 +110,7 @@ Tracked via `ProtocolGuard` RAII guards and `FrontendProtocolTracker`. Five prot
|
||||
| ws | `ProtocolGuard::frontend("ws")` on WebSocket upgrade |
|
||||
| other | Fallback (TCP passthrough without HTTP) |
|
||||
|
||||
Uses `fetch_update` for saturating decrements to prevent underflow races.
|
||||
Uses `fetch_update` for saturating decrements to prevent underflow races. The same saturating-close pattern is also used for connection and UDP active gauges.
|
||||
|
||||
### Backend Protocol Distribution
|
||||
|
||||
|
||||
@@ -656,6 +656,11 @@ impl RouteConfig {
|
||||
self.route_match.ports.to_ports()
|
||||
}
|
||||
|
||||
/// Stable key used for frontend route-scoped metrics.
|
||||
pub fn metrics_key(&self) -> Option<&str> {
|
||||
self.name.as_deref().or(self.id.as_deref())
|
||||
}
|
||||
|
||||
/// Get the TLS mode for this route (from action-level or first target).
|
||||
pub fn tls_mode(&self) -> Option<&crate::tls_types::TlsMode> {
|
||||
// Check action-level TLS first
|
||||
@@ -673,3 +678,63 @@ impl RouteConfig {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn test_route(name: Option<&str>, id: Option<&str>) -> RouteConfig {
|
||||
RouteConfig {
|
||||
id: id.map(str::to_string),
|
||||
route_match: RouteMatch {
|
||||
ports: PortRange::Single(443),
|
||||
transport: None,
|
||||
domains: None,
|
||||
path: None,
|
||||
client_ip: None,
|
||||
tls_version: None,
|
||||
headers: None,
|
||||
protocol: None,
|
||||
},
|
||||
action: RouteAction {
|
||||
action_type: RouteActionType::Forward,
|
||||
targets: None,
|
||||
tls: None,
|
||||
websocket: None,
|
||||
load_balancing: None,
|
||||
advanced: None,
|
||||
options: None,
|
||||
send_proxy_protocol: None,
|
||||
udp: None,
|
||||
},
|
||||
headers: None,
|
||||
security: None,
|
||||
name: name.map(str::to_string),
|
||||
description: None,
|
||||
priority: None,
|
||||
tags: None,
|
||||
enabled: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_key_prefers_name() {
|
||||
let route = test_route(Some("named-route"), Some("route-id"));
|
||||
|
||||
assert_eq!(route.metrics_key(), Some("named-route"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_key_falls_back_to_id() {
|
||||
let route = test_route(None, Some("route-id"));
|
||||
|
||||
assert_eq!(route.metrics_key(), Some("route-id"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_key_is_absent_without_name_or_id() {
|
||||
let route = test_route(None, None);
|
||||
|
||||
assert_eq!(route.metrics_key(), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,10 +639,12 @@ impl HttpProxyService {
|
||||
}
|
||||
};
|
||||
|
||||
let route_id = route_match.route.id.as_deref();
|
||||
let route_config_id = route_match.route.id.as_deref();
|
||||
let route_id = route_match.route.metrics_key();
|
||||
let ip_str = ip_string; // reuse from above (avoid redundant to_string())
|
||||
self.metrics.record_http_request();
|
||||
if let Some(ref h) = host {
|
||||
self.metrics.record_http_domain_request(h);
|
||||
self.metrics.record_ip_domain_request(&ip_str, h);
|
||||
}
|
||||
|
||||
@@ -654,7 +656,7 @@ impl HttpProxyService {
|
||||
.as_ref()
|
||||
.filter(|rl| rl.enabled)
|
||||
.map(|rl| {
|
||||
let route_key = route_id.unwrap_or("__default__").to_string();
|
||||
let route_key = route_config_id.unwrap_or("__default__").to_string();
|
||||
self.route_rate_limiters
|
||||
.entry(route_key)
|
||||
.or_insert_with(|| Arc::new(RateLimiter::new(rl.max_requests, rl.window)))
|
||||
|
||||
@@ -3,9 +3,9 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::throughput::{ThroughputSample, ThroughputTracker};
|
||||
use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker};
|
||||
|
||||
/// Aggregated metrics snapshot.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -26,6 +26,7 @@ pub struct Metrics {
|
||||
pub total_http_requests: u64,
|
||||
pub http_requests_per_sec: u64,
|
||||
pub http_requests_per_sec_recent: u64,
|
||||
pub http_domain_requests: std::collections::HashMap<String, HttpDomainRequestMetrics>,
|
||||
// UDP metrics
|
||||
pub active_udp_sessions: u64,
|
||||
pub total_udp_sessions: u64,
|
||||
@@ -66,6 +67,14 @@ pub struct IpMetrics {
|
||||
pub domain_requests: HashMap<String, u64>,
|
||||
}
|
||||
|
||||
/// Per-domain HTTP request rate metrics.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct HttpDomainRequestMetrics {
|
||||
pub requests_per_second: u64,
|
||||
pub requests_last_minute: u64,
|
||||
}
|
||||
|
||||
/// Per-backend metrics (keyed by "host:port").
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -135,15 +144,24 @@ pub struct Statistics {
|
||||
/// Default retention for throughput samples (1 hour).
|
||||
const DEFAULT_RETENTION_SECONDS: usize = 3600;
|
||||
|
||||
/// Maximum number of IPs to include in a snapshot (top by active connections).
|
||||
/// Maximum number of IPs to include in a snapshot.
|
||||
const MAX_IPS_IN_SNAPSHOT: usize = 100;
|
||||
|
||||
/// How long to retain inactive IP metric buckets after the last connection closes.
|
||||
const INACTIVE_IP_RETENTION_MS: u64 = 15_000;
|
||||
|
||||
/// Hard cap for inactive IP metric buckets retained between sampler passes.
|
||||
const MAX_INACTIVE_IPS_RETAINED: usize = MAX_IPS_IN_SNAPSHOT * 10;
|
||||
|
||||
/// Maximum number of backends to include in a snapshot (top by total connections).
|
||||
const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
|
||||
|
||||
/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse).
|
||||
const MAX_DOMAINS_PER_IP: usize = 256;
|
||||
|
||||
/// Number of one-second HTTP request samples retained per domain.
|
||||
const HTTP_DOMAIN_REQUEST_WINDOW_SECONDS: usize = 60;
|
||||
|
||||
fn canonicalize_domain_key(domain: &str) -> Option<String> {
|
||||
let normalized = domain.trim().trim_end_matches('.').to_ascii_lowercase();
|
||||
if normalized.is_empty() {
|
||||
@@ -153,6 +171,13 @@ fn canonicalize_domain_key(domain: &str) -> Option<String> {
|
||||
}
|
||||
}
|
||||
|
||||
fn current_time_ms() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64
|
||||
}
|
||||
|
||||
/// Metrics collector tracking connections and throughput.
|
||||
///
|
||||
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
|
||||
@@ -174,6 +199,7 @@ pub struct MetricsCollector {
|
||||
|
||||
// ── Per-IP tracking ──
|
||||
ip_connections: DashMap<String, AtomicU64>,
|
||||
ip_closed_at_ms: DashMap<String, AtomicU64>,
|
||||
ip_total_connections: DashMap<String, AtomicU64>,
|
||||
ip_bytes_in: DashMap<String, AtomicU64>,
|
||||
ip_bytes_out: DashMap<String, AtomicU64>,
|
||||
@@ -201,6 +227,7 @@ pub struct MetricsCollector {
|
||||
total_http_requests: AtomicU64,
|
||||
pending_http_requests: AtomicU64,
|
||||
http_request_throughput: Mutex<ThroughputTracker>,
|
||||
http_domain_request_rates: DashMap<String, Mutex<RequestRateTracker>>,
|
||||
|
||||
// ── UDP metrics ──
|
||||
active_udp_sessions: AtomicU64,
|
||||
@@ -260,6 +287,7 @@ impl MetricsCollector {
|
||||
route_bytes_in: DashMap::new(),
|
||||
route_bytes_out: DashMap::new(),
|
||||
ip_connections: DashMap::new(),
|
||||
ip_closed_at_ms: DashMap::new(),
|
||||
ip_total_connections: DashMap::new(),
|
||||
ip_bytes_in: DashMap::new(),
|
||||
ip_bytes_out: DashMap::new(),
|
||||
@@ -284,6 +312,7 @@ impl MetricsCollector {
|
||||
total_http_requests: AtomicU64::new(0),
|
||||
pending_http_requests: AtomicU64::new(0),
|
||||
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
|
||||
http_domain_request_rates: DashMap::new(),
|
||||
frontend_h1_active: AtomicU64::new(0),
|
||||
frontend_h1_total: AtomicU64::new(0),
|
||||
frontend_h2_active: AtomicU64::new(0),
|
||||
@@ -334,6 +363,7 @@ impl MetricsCollector {
|
||||
.entry(ip.to_string())
|
||||
.or_insert_with(|| AtomicU64::new(0))
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
self.ip_closed_at_ms.remove(ip);
|
||||
self.ip_total_connections
|
||||
.entry(ip.to_string())
|
||||
.or_insert_with(|| AtomicU64::new(0))
|
||||
@@ -378,22 +408,88 @@ impl MetricsCollector {
|
||||
}
|
||||
})
|
||||
.ok();
|
||||
// Clean up zero-count entries to prevent memory growth
|
||||
// Keep inactive IP buckets briefly so pending bytes can still
|
||||
// be sampled into per-IP throughput after short-lived transfers.
|
||||
if matches!(prev, Some(v) if v <= 1) {
|
||||
drop(counter);
|
||||
self.ip_connections.remove(ip);
|
||||
// Evict all per-IP tracking data for this IP
|
||||
self.ip_total_connections.remove(ip);
|
||||
self.ip_bytes_in.remove(ip);
|
||||
self.ip_bytes_out.remove(ip);
|
||||
self.ip_pending_tp.remove(ip);
|
||||
self.ip_throughput.remove(ip);
|
||||
self.ip_domain_requests.remove(ip);
|
||||
self.ip_closed_at_ms
|
||||
.entry(ip.to_string())
|
||||
.or_insert_with(|| AtomicU64::new(0))
|
||||
.store(current_time_ms(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_inactive_ip_tracking(&self, ip: &str) {
|
||||
if self
|
||||
.ip_connections
|
||||
.get(ip)
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.unwrap_or(0)
|
||||
> 0
|
||||
{
|
||||
self.ip_closed_at_ms.remove(ip);
|
||||
return;
|
||||
}
|
||||
|
||||
self.ip_connections.remove(ip);
|
||||
self.ip_closed_at_ms.remove(ip);
|
||||
self.ip_total_connections.remove(ip);
|
||||
self.ip_bytes_in.remove(ip);
|
||||
self.ip_bytes_out.remove(ip);
|
||||
self.ip_pending_tp.remove(ip);
|
||||
self.ip_throughput.remove(ip);
|
||||
self.ip_domain_requests.remove(ip);
|
||||
}
|
||||
|
||||
fn prune_inactive_ip_tracking(&self, now_ms: u64) {
|
||||
let cutoff_ms = now_ms.saturating_sub(INACTIVE_IP_RETENTION_MS);
|
||||
let mut inactive_ips: Vec<(String, u64)> = Vec::new();
|
||||
let mut active_markers: Vec<String> = Vec::new();
|
||||
|
||||
for entry in self.ip_closed_at_ms.iter() {
|
||||
let ip = entry.key().clone();
|
||||
let active = self
|
||||
.ip_connections
|
||||
.get(&ip)
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.unwrap_or(0);
|
||||
if active > 0 {
|
||||
active_markers.push(ip);
|
||||
} else {
|
||||
inactive_ips.push((ip, entry.value().load(Ordering::Relaxed)));
|
||||
}
|
||||
}
|
||||
|
||||
for ip in active_markers {
|
||||
self.ip_closed_at_ms.remove(&ip);
|
||||
}
|
||||
|
||||
let mut remove_ips: HashSet<String> = inactive_ips
|
||||
.iter()
|
||||
.filter(|(_, closed_at_ms)| *closed_at_ms <= cutoff_ms)
|
||||
.map(|(ip, _)| ip.clone())
|
||||
.collect();
|
||||
|
||||
let retained_after_ttl = inactive_ips.len().saturating_sub(remove_ips.len());
|
||||
if retained_after_ttl > MAX_INACTIVE_IPS_RETAINED {
|
||||
inactive_ips.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
|
||||
let mut overflow = retained_after_ttl - MAX_INACTIVE_IPS_RETAINED;
|
||||
for (ip, closed_at_ms) in inactive_ips {
|
||||
if overflow == 0 {
|
||||
break;
|
||||
}
|
||||
if closed_at_ms > cutoff_ms && remove_ips.insert(ip) {
|
||||
overflow -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for ip in remove_ips {
|
||||
self.remove_inactive_ip_tracking(&ip);
|
||||
}
|
||||
}
|
||||
|
||||
/// Record bytes transferred (lock-free hot path).
|
||||
///
|
||||
/// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters —
|
||||
@@ -467,9 +563,8 @@ impl MetricsCollector {
|
||||
|
||||
// Per-IP tracking: same get()-first pattern to avoid String allocation on hot path.
|
||||
if let Some(ip) = source_ip {
|
||||
// Only record per-IP stats if the IP still has active connections.
|
||||
// This prevents orphaned entries when record_bytes races with
|
||||
// connection_closed (which evicts all per-IP data on last close).
|
||||
// Only record per-IP stats if the IP is active or still within the
|
||||
// bounded inactive retention window after its last connection closed.
|
||||
if self.ip_connections.contains_key(ip) {
|
||||
if bytes_in > 0 {
|
||||
if let Some(counter) = self.ip_bytes_in.get(ip) {
|
||||
@@ -522,6 +617,24 @@ impl MetricsCollector {
|
||||
self.pending_http_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Record a real HTTP request for a canonicalized domain.
|
||||
pub fn record_http_domain_request(&self, domain: &str) {
|
||||
let Some(domain) = canonicalize_domain_key(domain) else {
|
||||
return;
|
||||
};
|
||||
|
||||
self.http_domain_request_rates
|
||||
.entry(domain.clone())
|
||||
.or_insert_with(|| {
|
||||
Mutex::new(RequestRateTracker::new(HTTP_DOMAIN_REQUEST_WINDOW_SECONDS))
|
||||
});
|
||||
if let Some(tracker_ref) = self.http_domain_request_rates.get(domain.as_str()) {
|
||||
if let Ok(mut tracker) = tracker_ref.value().lock() {
|
||||
tracker.record_event();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a domain request/connection for a frontend IP.
|
||||
///
|
||||
/// Called per HTTP request (with Host header) and per TCP passthrough
|
||||
@@ -791,8 +904,7 @@ impl MetricsCollector {
|
||||
/// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval).
|
||||
///
|
||||
/// Drains the lock-free pending counters and feeds the accumulated bytes
|
||||
/// into the throughput trackers (under Mutex). This is the only place
|
||||
/// the Mutex is locked.
|
||||
/// into the throughput trackers under their sampling mutexes.
|
||||
pub fn sample_all(&self) {
|
||||
// Drain global pending bytes and feed into the tracker
|
||||
let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed);
|
||||
@@ -873,9 +985,27 @@ impl MetricsCollector {
|
||||
tracker.sample();
|
||||
}
|
||||
|
||||
// Advance HTTP domain request windows and prune fully idle domains.
|
||||
let mut stale_http_domains = Vec::new();
|
||||
for entry in self.http_domain_request_rates.iter() {
|
||||
if let Ok(mut tracker) = entry.value().lock() {
|
||||
tracker.advance_to_now();
|
||||
if tracker.is_idle() {
|
||||
stale_http_domains.push(entry.key().clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
for domain in stale_http_domains {
|
||||
self.http_domain_request_rates.remove(&domain);
|
||||
}
|
||||
|
||||
// Keep closed IP buckets only for a short, bounded window so the sampler
|
||||
// can attribute short-lived transfers without leaking per-IP maps.
|
||||
self.prune_inactive_ip_tracking(current_time_ms());
|
||||
|
||||
// Safety-net: prune orphaned per-IP entries that have no corresponding
|
||||
// ip_connections entry. This catches any entries created by a race between
|
||||
// record_bytes and connection_closed.
|
||||
// ip_connections entry. This catches any entries created by older races or
|
||||
// by code paths that manually inserted partial per-IP state.
|
||||
self.ip_bytes_in
|
||||
.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
self.ip_bytes_out
|
||||
@@ -888,6 +1018,8 @@ impl MetricsCollector {
|
||||
.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
self.ip_domain_requests
|
||||
.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
self.ip_closed_at_ms
|
||||
.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
|
||||
// Safety-net: prune orphaned backend error/stats entries for backends
|
||||
// that have no active or total connections (error-only backends).
|
||||
@@ -1019,8 +1151,8 @@ impl MetricsCollector {
|
||||
);
|
||||
}
|
||||
|
||||
// Collect per-IP metrics — only IPs with active connections or total > 0,
|
||||
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
|
||||
// Collect per-IP metrics — capped to the IPs most relevant for either
|
||||
// active connection visibility or bandwidth attribution.
|
||||
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> =
|
||||
Vec::new();
|
||||
for entry in self.ip_total_connections.iter() {
|
||||
@@ -1068,9 +1200,54 @@ impl MetricsCollector {
|
||||
domain_requests,
|
||||
));
|
||||
}
|
||||
// Sort by active connections descending, then cap
|
||||
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
|
||||
ip_entries.truncate(MAX_IPS_IN_SNAPSHOT);
|
||||
if ip_entries.len() > MAX_IPS_IN_SNAPSHOT {
|
||||
let mut selected = vec![false; ip_entries.len()];
|
||||
let mut selected_count = 0usize;
|
||||
|
||||
let mut active_rank: Vec<usize> = (0..ip_entries.len()).collect();
|
||||
active_rank.sort_by(|&a, &b| {
|
||||
ip_entries[b]
|
||||
.1
|
||||
.cmp(&ip_entries[a].1)
|
||||
.then_with(|| ip_entries[b].2.cmp(&ip_entries[a].2))
|
||||
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
|
||||
});
|
||||
|
||||
let mut throughput_rank: Vec<usize> = (0..ip_entries.len()).collect();
|
||||
throughput_rank.sort_by(|&a, &b| {
|
||||
let a_tp = ip_entries[a].5.saturating_add(ip_entries[a].6);
|
||||
let b_tp = ip_entries[b].5.saturating_add(ip_entries[b].6);
|
||||
let a_bytes = ip_entries[a].3.saturating_add(ip_entries[a].4);
|
||||
let b_bytes = ip_entries[b].3.saturating_add(ip_entries[b].4);
|
||||
b_tp.cmp(&a_tp)
|
||||
.then_with(|| b_bytes.cmp(&a_bytes))
|
||||
.then_with(|| ip_entries[b].1.cmp(&ip_entries[a].1))
|
||||
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
|
||||
});
|
||||
|
||||
for idx in active_rank.into_iter().take(MAX_IPS_IN_SNAPSHOT / 2) {
|
||||
if !selected[idx] {
|
||||
selected[idx] = true;
|
||||
selected_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for idx in throughput_rank {
|
||||
if selected_count >= MAX_IPS_IN_SNAPSHOT {
|
||||
break;
|
||||
}
|
||||
if !selected[idx] {
|
||||
selected[idx] = true;
|
||||
selected_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
ip_entries = ip_entries
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, entry)| selected[idx].then_some(entry))
|
||||
.collect();
|
||||
}
|
||||
|
||||
let mut ips = std::collections::HashMap::new();
|
||||
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries {
|
||||
@@ -1179,6 +1356,24 @@ impl MetricsCollector {
|
||||
})
|
||||
.unwrap_or((0, 0));
|
||||
|
||||
let mut http_domain_requests = std::collections::HashMap::new();
|
||||
for entry in self.http_domain_request_rates.iter() {
|
||||
if let Ok(mut tracker) = entry.value().lock() {
|
||||
tracker.advance_to_now();
|
||||
let requests_per_second = tracker.last_second();
|
||||
let requests_last_minute = tracker.last_minute();
|
||||
if requests_per_second > 0 || requests_last_minute > 0 {
|
||||
http_domain_requests.insert(
|
||||
entry.key().clone(),
|
||||
HttpDomainRequestMetrics {
|
||||
requests_per_second,
|
||||
requests_last_minute,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Metrics {
|
||||
active_connections: self.active_connections(),
|
||||
total_connections: self.total_connections(),
|
||||
@@ -1195,6 +1390,7 @@ impl MetricsCollector {
|
||||
total_http_requests: self.total_http_requests.load(Ordering::Relaxed),
|
||||
http_requests_per_sec: http_rps,
|
||||
http_requests_per_sec_recent: http_rps_recent,
|
||||
http_domain_requests,
|
||||
active_udp_sessions: self.active_udp_sessions.load(Ordering::Relaxed),
|
||||
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
|
||||
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
|
||||
@@ -1383,9 +1579,42 @@ mod tests {
|
||||
1
|
||||
);
|
||||
|
||||
// Close last connection for IP — should be cleaned up
|
||||
// Close last connection for IP — active count should drop to zero,
|
||||
// while the inactive bucket is retained briefly for final sampling.
|
||||
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
|
||||
assert!(collector.ip_connections.get("1.2.3.4").is_none());
|
||||
assert_eq!(
|
||||
collector
|
||||
.ip_connections
|
||||
.get("1.2.3.4")
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.unwrap_or(0),
|
||||
0
|
||||
);
|
||||
assert!(collector.ip_closed_at_ms.get("1.2.3.4").is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_snapshot_retains_high_throughput_ip_over_many_active_ips() {
|
||||
let collector = MetricsCollector::with_retention(60);
|
||||
|
||||
for i in 1..=(MAX_IPS_IN_SNAPSHOT + 20) {
|
||||
let ip = format!("10.0.0.{}", i);
|
||||
collector.connection_opened(Some("scanner-route"), Some(&ip));
|
||||
collector.connection_opened(Some("scanner-route"), Some(&ip));
|
||||
}
|
||||
|
||||
let busy_ip = "203.0.113.10";
|
||||
collector.connection_opened(Some("download-route"), Some(busy_ip));
|
||||
collector.record_bytes(0, 900_000, Some("download-route"), Some(busy_ip));
|
||||
collector.sample_all();
|
||||
|
||||
let snapshot = collector.snapshot();
|
||||
let busy_metrics = snapshot.ips.get(busy_ip).unwrap();
|
||||
|
||||
assert_eq!(snapshot.ips.len(), MAX_IPS_IN_SNAPSHOT);
|
||||
assert_eq!(busy_metrics.active_connections, 1);
|
||||
assert_eq!(busy_metrics.bytes_out, 900_000);
|
||||
assert_eq!(busy_metrics.throughput_out_bytes_per_sec, 900_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1411,13 +1640,36 @@ mod tests {
|
||||
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
|
||||
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
|
||||
|
||||
// All per-IP data for 10.0.0.1 should be evicted
|
||||
// Per-IP data for 10.0.0.1 should be retained until the inactive TTL expires.
|
||||
assert_eq!(
|
||||
collector
|
||||
.ip_connections
|
||||
.get("10.0.0.1")
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.unwrap_or(0),
|
||||
0
|
||||
);
|
||||
assert!(collector.ip_total_connections.get("10.0.0.1").is_some());
|
||||
assert!(collector.ip_bytes_in.get("10.0.0.1").is_some());
|
||||
assert!(collector.ip_bytes_out.get("10.0.0.1").is_some());
|
||||
assert!(collector.ip_pending_tp.get("10.0.0.1").is_some());
|
||||
assert!(collector.ip_throughput.get("10.0.0.1").is_some());
|
||||
|
||||
collector
|
||||
.ip_closed_at_ms
|
||||
.get("10.0.0.1")
|
||||
.unwrap()
|
||||
.store(0, Ordering::Relaxed);
|
||||
collector.sample_all();
|
||||
|
||||
// Expired inactive buckets are fully evicted to prevent leaks.
|
||||
assert!(collector.ip_connections.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_total_connections.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_throughput.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_closed_at_ms.get("10.0.0.1").is_none());
|
||||
|
||||
// 10.0.0.2 should still have data
|
||||
assert!(collector.ip_connections.get("10.0.0.2").is_some());
|
||||
@@ -1444,7 +1696,14 @@ mod tests {
|
||||
.unwrap_or(0),
|
||||
0
|
||||
);
|
||||
assert!(collector.ip_connections.get("10.0.0.1").is_none());
|
||||
assert_eq!(
|
||||
collector
|
||||
.ip_connections
|
||||
.get("10.0.0.1")
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.unwrap_or(0),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1514,6 +1773,47 @@ mod tests {
|
||||
assert_eq!(snapshot.http_requests_per_sec, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_http_domain_request_rates_are_canonicalized() {
|
||||
let collector = MetricsCollector::with_retention(60);
|
||||
|
||||
collector.record_http_domain_request("Example.COM");
|
||||
collector.record_http_domain_request("example.com.");
|
||||
collector.record_http_domain_request(" example.com ");
|
||||
|
||||
let now_sec = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
if let Some(tracker) = collector.http_domain_request_rates.get("example.com") {
|
||||
tracker.value().lock().unwrap().advance_to(now_sec + 1);
|
||||
}
|
||||
|
||||
let snapshot = collector.snapshot();
|
||||
let metrics = snapshot.http_domain_requests.get("example.com").unwrap();
|
||||
assert_eq!(snapshot.http_domain_requests.len(), 1);
|
||||
assert_eq!(metrics.requests_per_second, 3);
|
||||
assert_eq!(metrics.requests_last_minute, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ip_domain_requests_do_not_affect_http_domain_request_rates() {
|
||||
let collector = MetricsCollector::with_retention(60);
|
||||
|
||||
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
|
||||
collector.record_ip_domain_request("10.0.0.1", "example.com");
|
||||
|
||||
let snapshot = collector.snapshot();
|
||||
assert!(snapshot.http_domain_requests.is_empty());
|
||||
assert_eq!(
|
||||
snapshot
|
||||
.ips
|
||||
.get("10.0.0.1")
|
||||
.and_then(|ip| ip.domain_requests.get("example.com")),
|
||||
Some(&1)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retain_routes_prunes_stale() {
|
||||
let collector = MetricsCollector::with_retention(60);
|
||||
@@ -1544,26 +1844,52 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_bytes_after_close_no_orphan() {
|
||||
fn test_closed_ip_keeps_pending_throughput_until_sample() {
|
||||
let collector = MetricsCollector::with_retention(60);
|
||||
|
||||
// Open a connection, record bytes, then close
|
||||
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
|
||||
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
|
||||
collector.record_bytes(100, 2000, Some("route-a"), Some("10.0.0.1"));
|
||||
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
|
||||
|
||||
// IP should be fully evicted
|
||||
assert!(collector.ip_connections.get("10.0.0.1").is_none());
|
||||
collector.sample_all();
|
||||
let snapshot = collector.snapshot();
|
||||
let ip_metrics = snapshot.ips.get("10.0.0.1").unwrap();
|
||||
|
||||
// Now record_bytes arrives late (simulates race) — should NOT re-create entries
|
||||
collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1"));
|
||||
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
|
||||
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
|
||||
assert_eq!(ip_metrics.active_connections, 0);
|
||||
assert_eq!(ip_metrics.bytes_in, 100);
|
||||
assert_eq!(ip_metrics.bytes_out, 2000);
|
||||
assert_eq!(ip_metrics.throughput_in_bytes_per_sec, 100);
|
||||
assert_eq!(ip_metrics.throughput_out_bytes_per_sec, 2000);
|
||||
assert_eq!(snapshot.throughput_out_bytes_per_sec, 2000);
|
||||
assert_eq!(
|
||||
snapshot
|
||||
.routes
|
||||
.get("route-a")
|
||||
.unwrap()
|
||||
.throughput_out_bytes_per_sec,
|
||||
2000
|
||||
);
|
||||
}
|
||||
|
||||
// Global bytes should still be counted
|
||||
assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150);
|
||||
assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275);
|
||||
#[test]
|
||||
fn test_inactive_ip_retention_hard_cap_prunes_oldest() {
|
||||
let collector = MetricsCollector::with_retention(60);
|
||||
|
||||
for i in 0..(MAX_INACTIVE_IPS_RETAINED + 5) {
|
||||
let ip = format!("198.51.{}.{}", i / 255, i % 255);
|
||||
collector.connection_opened(Some("route-a"), Some(&ip));
|
||||
collector.connection_closed(Some("route-a"), Some(&ip));
|
||||
collector
|
||||
.ip_closed_at_ms
|
||||
.get(&ip)
|
||||
.unwrap()
|
||||
.store(current_time_ms().saturating_sub(1), Ordering::Relaxed);
|
||||
}
|
||||
|
||||
collector.sample_all();
|
||||
|
||||
assert!(collector.ip_closed_at_ms.len() <= MAX_INACTIVE_IPS_RETAINED);
|
||||
assert!(collector.ip_connections.len() <= MAX_INACTIVE_IPS_RETAINED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -29,6 +29,113 @@ pub struct ThroughputTracker {
|
||||
created_at: Instant,
|
||||
}
|
||||
|
||||
fn unix_timestamp_seconds() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
/// Circular buffer for per-second event counts.
|
||||
///
|
||||
/// Unlike `ThroughputTracker`, events are recorded directly into the current
|
||||
/// second so request counts remain stable even when the collector is sampled
|
||||
/// more frequently than once per second.
|
||||
pub(crate) struct RequestRateTracker {
|
||||
samples: Vec<u64>,
|
||||
write_index: usize,
|
||||
count: usize,
|
||||
capacity: usize,
|
||||
current_second: Option<u64>,
|
||||
current_count: u64,
|
||||
}
|
||||
|
||||
impl RequestRateTracker {
|
||||
pub(crate) fn new(retention_seconds: usize) -> Self {
|
||||
Self {
|
||||
samples: Vec::with_capacity(retention_seconds.max(1)),
|
||||
write_index: 0,
|
||||
count: 0,
|
||||
capacity: retention_seconds.max(1),
|
||||
current_second: None,
|
||||
current_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn push_sample(&mut self, count: u64) {
|
||||
if self.samples.len() < self.capacity {
|
||||
self.samples.push(count);
|
||||
} else {
|
||||
self.samples[self.write_index] = count;
|
||||
}
|
||||
self.write_index = (self.write_index + 1) % self.capacity;
|
||||
self.count = (self.count + 1).min(self.capacity);
|
||||
}
|
||||
|
||||
pub(crate) fn record_event(&mut self) {
|
||||
self.record_events_at(unix_timestamp_seconds(), 1);
|
||||
}
|
||||
|
||||
pub(crate) fn record_events_at(&mut self, now_sec: u64, count: u64) {
|
||||
self.advance_to(now_sec);
|
||||
self.current_count = self.current_count.saturating_add(count);
|
||||
}
|
||||
|
||||
pub(crate) fn advance_to_now(&mut self) {
|
||||
self.advance_to(unix_timestamp_seconds());
|
||||
}
|
||||
|
||||
pub(crate) fn advance_to(&mut self, now_sec: u64) {
|
||||
match self.current_second {
|
||||
Some(current_second) if now_sec > current_second => {
|
||||
self.push_sample(self.current_count);
|
||||
for _ in 1..(now_sec - current_second) {
|
||||
self.push_sample(0);
|
||||
}
|
||||
self.current_second = Some(now_sec);
|
||||
self.current_count = 0;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => {
|
||||
self.current_second = Some(now_sec);
|
||||
self.current_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sum_recent(&self, window_seconds: usize) -> u64 {
|
||||
let window = window_seconds.min(self.count);
|
||||
if window == 0 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let mut total = 0u64;
|
||||
for i in 0..window {
|
||||
let idx = if self.write_index >= i + 1 {
|
||||
self.write_index - i - 1
|
||||
} else {
|
||||
self.capacity - (i + 1 - self.write_index)
|
||||
};
|
||||
if idx < self.samples.len() {
|
||||
total += self.samples[idx];
|
||||
}
|
||||
}
|
||||
total
|
||||
}
|
||||
|
||||
pub(crate) fn last_second(&self) -> u64 {
|
||||
self.sum_recent(1)
|
||||
}
|
||||
|
||||
pub(crate) fn last_minute(&self) -> u64 {
|
||||
self.sum_recent(60)
|
||||
}
|
||||
|
||||
pub(crate) fn is_idle(&self) -> bool {
|
||||
self.current_count == 0 && self.sum_recent(self.capacity) == 0
|
||||
}
|
||||
}
|
||||
|
||||
impl ThroughputTracker {
|
||||
/// Create a new tracker with the given capacity (seconds of retention).
|
||||
pub fn new(retention_seconds: usize) -> Self {
|
||||
@@ -46,7 +153,8 @@ impl ThroughputTracker {
|
||||
/// Record bytes (called from data flow callbacks).
|
||||
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64) {
|
||||
self.pending_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
|
||||
self.pending_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
|
||||
self.pending_bytes_out
|
||||
.fetch_add(bytes_out, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Take a sample (called at 1Hz).
|
||||
@@ -229,4 +337,41 @@ mod tests {
|
||||
let history = tracker.history(10);
|
||||
assert!(history.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_rate_tracker_counts_last_second_and_last_minute() {
|
||||
let mut tracker = RequestRateTracker::new(60);
|
||||
|
||||
tracker.record_events_at(100, 2);
|
||||
tracker.record_events_at(100, 3);
|
||||
tracker.advance_to(101);
|
||||
|
||||
assert_eq!(tracker.last_second(), 5);
|
||||
assert_eq!(tracker.last_minute(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_rate_tracker_adds_zero_samples_for_gaps() {
|
||||
let mut tracker = RequestRateTracker::new(60);
|
||||
|
||||
tracker.record_events_at(100, 4);
|
||||
tracker.record_events_at(102, 1);
|
||||
tracker.advance_to(103);
|
||||
|
||||
assert_eq!(tracker.last_second(), 1);
|
||||
assert_eq!(tracker.last_minute(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_rate_tracker_decays_to_zero_over_window() {
|
||||
let mut tracker = RequestRateTracker::new(60);
|
||||
|
||||
tracker.record_events_at(100, 7);
|
||||
tracker.advance_to(101);
|
||||
tracker.advance_to(161);
|
||||
|
||||
assert_eq!(tracker.last_second(), 0);
|
||||
assert_eq!(tracker.last_minute(), 0);
|
||||
assert!(tracker.is_idle());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -420,7 +420,7 @@ pub async fn quic_accept_loop(
|
||||
}
|
||||
|
||||
conn_tracker.connection_opened(&ip);
|
||||
let route_id = route.name.clone().or(route.id.clone());
|
||||
let route_id = route.metrics_key().map(str::to_string);
|
||||
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
|
||||
|
||||
// Resolve per-route cancel token (child of global cancel)
|
||||
@@ -541,7 +541,7 @@ async fn handle_quic_stream_forwarding(
|
||||
real_client_addr: Option<SocketAddr>,
|
||||
) -> anyhow::Result<()> {
|
||||
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||
let route_id = route.metrics_key();
|
||||
let metrics_arc = metrics;
|
||||
|
||||
// Resolve backend target
|
||||
|
||||
@@ -715,10 +715,11 @@ impl TcpListenerManager {
|
||||
} else if let Some(target) = quick_match.target {
|
||||
let target_host = target.host.first().to_string();
|
||||
let target_port = target.port.resolve(port);
|
||||
let route_id = quick_match.route.id.as_deref();
|
||||
let route_config_id = quick_match.route.id.as_deref();
|
||||
let route_id = quick_match.route.metrics_key();
|
||||
|
||||
// Resolve per-route cancel token (child of global cancel)
|
||||
let route_cancel = match route_id {
|
||||
let route_cancel = match route_config_id {
|
||||
Some(id) => route_cancels.entry(id.to_string())
|
||||
.or_insert_with(|| cancel.child_token())
|
||||
.clone(),
|
||||
@@ -733,7 +734,7 @@ impl TcpListenerManager {
|
||||
cancel: conn_cancel.clone(),
|
||||
source_ip: peer_addr.ip(),
|
||||
domain: None, // fast path has no domain
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
route_id: route_config_id.map(|s| s.to_string()),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -905,12 +906,13 @@ impl TcpListenerManager {
|
||||
}
|
||||
};
|
||||
|
||||
let route_id = route_match.route.id.as_deref();
|
||||
let route_config_id = route_match.route.id.as_deref();
|
||||
let route_id = route_match.route.metrics_key();
|
||||
|
||||
// Resolve per-route cancel token (child of global cancel).
|
||||
// When this route is removed via updateRoutes, the token is cancelled,
|
||||
// terminating all connections on this route.
|
||||
let route_cancel = match route_id {
|
||||
let route_cancel = match route_config_id {
|
||||
Some(id) => route_cancels.entry(id.to_string())
|
||||
.or_insert_with(|| cancel.child_token())
|
||||
.clone(),
|
||||
@@ -925,7 +927,7 @@ impl TcpListenerManager {
|
||||
cancel: cancel.clone(),
|
||||
source_ip: peer_addr.ip(),
|
||||
domain: domain.clone(),
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
route_id: route_config_id.map(|s| s.to_string()),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1314,9 +1316,7 @@ impl TcpListenerManager {
|
||||
};
|
||||
|
||||
// Build metadata JSON
|
||||
let route_key = route_match.route.name.as_deref()
|
||||
.or(route_match.route.id.as_deref())
|
||||
.unwrap_or("unknown");
|
||||
let route_key = route_match.route.metrics_key().unwrap_or("unknown");
|
||||
|
||||
let metadata = serde_json::json!({
|
||||
"routeKey": route_key,
|
||||
|
||||
@@ -617,7 +617,7 @@ impl UdpListenerManager {
|
||||
};
|
||||
|
||||
let route = route_match.route;
|
||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||
let route_id = route.metrics_key();
|
||||
|
||||
// Socket handler routes → relay datagram to TS via persistent Unix socket
|
||||
if route.action.action_type == RouteActionType::SocketHandler {
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import { SmartProxy } from '../ts/index.js';
|
||||
import * as http from 'http';
|
||||
import * as net from 'net';
|
||||
import * as tls from 'tls';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { assertPortsFree, findFreePorts } from './helpers/port-allocator.js';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
|
||||
const CERT_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'cert.pem'), 'utf8');
|
||||
const KEY_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'key.pem'), 'utf8');
|
||||
|
||||
let httpBackendPort: number;
|
||||
let tlsBackendPort: number;
|
||||
let httpProxyPort: number;
|
||||
let tlsProxyPort: number;
|
||||
|
||||
let httpBackend: http.Server;
|
||||
let tlsBackend: tls.Server;
|
||||
let proxy: SmartProxy;
|
||||
|
||||
async function pollMetrics(proxyToPoll: SmartProxy): Promise<void> {
|
||||
await (proxyToPoll as any).metricsAdapter.poll();
|
||||
}
|
||||
|
||||
async function waitForCondition(
|
||||
callback: () => Promise<boolean>,
|
||||
timeoutMs: number = 5000,
|
||||
stepMs: number = 100,
|
||||
): Promise<void> {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
if (await callback()) {
|
||||
return;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, stepMs));
|
||||
}
|
||||
throw new Error(`Condition not met within ${timeoutMs}ms`);
|
||||
}
|
||||
|
||||
function hasIpDomainRequest(domain: string): boolean {
|
||||
const byIp = proxy.getMetrics().connections.domainRequestsByIP();
|
||||
for (const domainMap of byIp.values()) {
|
||||
if (domainMap.has(domain)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
tap.test('setup - backend servers for HTTP domain rate metrics', async () => {
|
||||
[httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort] = await findFreePorts(4);
|
||||
|
||||
httpBackend = http.createServer((req, res) => {
|
||||
let body = '';
|
||||
req.on('data', (chunk) => {
|
||||
body += chunk;
|
||||
});
|
||||
req.on('end', () => {
|
||||
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
||||
res.end(`ok:${body}`);
|
||||
});
|
||||
});
|
||||
await new Promise<void>((resolve) => {
|
||||
httpBackend.listen(httpBackendPort, () => resolve());
|
||||
});
|
||||
|
||||
tlsBackend = tls.createServer({ cert: CERT_PEM, key: KEY_PEM }, (socket) => {
|
||||
socket.on('data', (data) => {
|
||||
socket.write(data);
|
||||
});
|
||||
socket.on('error', () => {});
|
||||
});
|
||||
await new Promise<void>((resolve) => {
|
||||
tlsBackend.listen(tlsBackendPort, () => resolve());
|
||||
});
|
||||
});
|
||||
|
||||
tap.test('setup - start proxy with HTTP and TLS passthrough routes', async () => {
|
||||
proxy = new SmartProxy({
|
||||
routes: [
|
||||
{
|
||||
id: 'http-domain-rates',
|
||||
name: 'http-domain-rates',
|
||||
match: { ports: httpProxyPort, domains: 'example.com' },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: 'localhost', port: httpBackendPort }],
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'tls-passthrough-domain-rates',
|
||||
name: 'tls-passthrough-domain-rates',
|
||||
match: { ports: tlsProxyPort, domains: 'passthrough.example.com' },
|
||||
action: {
|
||||
type: 'forward',
|
||||
tls: { mode: 'passthrough' },
|
||||
targets: [{ host: 'localhost', port: tlsBackendPort }],
|
||||
},
|
||||
},
|
||||
],
|
||||
metrics: { enabled: true, sampleIntervalMs: 100, retentionSeconds: 60 },
|
||||
});
|
||||
|
||||
await proxy.start();
|
||||
await new Promise((resolve) => setTimeout(resolve, 300));
|
||||
});
|
||||
|
||||
tap.test('HTTP requests populate per-domain HTTP request rates', async () => {
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const body = `payload-${i}`;
|
||||
const req = http.request(
|
||||
{
|
||||
hostname: 'localhost',
|
||||
port: httpProxyPort,
|
||||
path: '/echo',
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Host: 'Example.COM',
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': String(body.length),
|
||||
},
|
||||
},
|
||||
(res) => {
|
||||
res.resume();
|
||||
res.on('end', () => resolve());
|
||||
},
|
||||
);
|
||||
req.on('error', reject);
|
||||
req.end(body);
|
||||
});
|
||||
}
|
||||
|
||||
await waitForCondition(async () => {
|
||||
await pollMetrics(proxy);
|
||||
const domainMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
|
||||
return (domainMetrics?.lastMinute ?? 0) >= 3 && (domainMetrics?.perSecond ?? 0) > 0;
|
||||
});
|
||||
|
||||
const exampleMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
|
||||
expect(exampleMetrics).toBeTruthy();
|
||||
expect(exampleMetrics?.lastMinute).toEqual(3);
|
||||
expect(exampleMetrics?.perSecond).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
tap.test('TLS passthrough SNI does not inflate HTTP domain request rates', async () => {
|
||||
const tlsClient = tls.connect({
|
||||
host: 'localhost',
|
||||
port: tlsProxyPort,
|
||||
servername: 'passthrough.example.com',
|
||||
rejectUnauthorized: false,
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
tlsClient.once('secureConnect', () => resolve());
|
||||
tlsClient.once('error', reject);
|
||||
});
|
||||
|
||||
const echoPromise = new Promise<void>((resolve, reject) => {
|
||||
tlsClient.once('data', () => resolve());
|
||||
tlsClient.once('error', reject);
|
||||
});
|
||||
tlsClient.write(Buffer.from('hello over tls passthrough'));
|
||||
await echoPromise;
|
||||
|
||||
await waitForCondition(async () => {
|
||||
await pollMetrics(proxy);
|
||||
return hasIpDomainRequest('passthrough.example.com');
|
||||
});
|
||||
|
||||
const requestRates = proxy.getMetrics().requests.byDomain();
|
||||
expect(requestRates.has('passthrough.example.com')).toBeFalse();
|
||||
expect(requestRates.get('example.com')?.lastMinute).toEqual(3);
|
||||
expect(hasIpDomainRequest('passthrough.example.com')).toBeTrue();
|
||||
|
||||
tlsClient.destroy();
|
||||
});
|
||||
|
||||
tap.test('cleanup - stop proxy and close backend servers', async () => {
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => httpBackend.close(() => resolve()));
|
||||
await new Promise<void>((resolve) => tlsBackend.close(() => resolve()));
|
||||
await assertPortsFree([httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort]);
|
||||
});
|
||||
|
||||
export default tap.start()
|
||||
@@ -83,6 +83,9 @@ tap.test('should verify new metrics API structure', async () => {
|
||||
expect(metrics.throughput).toHaveProperty('history');
|
||||
expect(metrics.throughput).toHaveProperty('byRoute');
|
||||
expect(metrics.throughput).toHaveProperty('byIP');
|
||||
|
||||
// Check request methods
|
||||
expect(metrics.requests).toHaveProperty('byDomain');
|
||||
});
|
||||
|
||||
tap.test('should track active connections', async (tools) => {
|
||||
@@ -273,4 +276,4 @@ tap.test('should clean up resources', async () => {
|
||||
await assertPortsFree([echoServerPort, proxyPort]);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
export default tap.start();
|
||||
|
||||
@@ -188,10 +188,12 @@ tap.test('TCP forward - real-time byte tracking', async (tools) => {
|
||||
const byRoute = m.throughput.byRoute();
|
||||
console.log('TCP forward — throughput byRoute:', Array.from(byRoute.entries()));
|
||||
|
||||
// After close, per-IP data should be evicted (memory leak fix)
|
||||
// After close, per-IP buckets are retained briefly for final throughput sampling,
|
||||
// but active connection counts must already be zero.
|
||||
const byIPAfter = m.connections.byIP();
|
||||
console.log('TCP forward — connections byIP after close:', Array.from(byIPAfter.entries()));
|
||||
expect(byIPAfter.size).toEqual(0);
|
||||
expect(byIPAfter.size).toBeGreaterThan(0);
|
||||
expect(Array.from(byIPAfter.values()).every((count) => count === 0)).toEqual(true);
|
||||
|
||||
await proxy.stop();
|
||||
await tools.delayFor(200);
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '27.7.1',
|
||||
version: '27.8.2',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
@@ -29,6 +29,11 @@ export interface IThroughputHistoryPoint {
|
||||
out: number;
|
||||
}
|
||||
|
||||
export interface IRequestRateMetrics {
|
||||
perSecond: number;
|
||||
lastMinute: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Main metrics interface with clean, grouped API
|
||||
*/
|
||||
@@ -81,6 +86,7 @@ export interface IMetrics {
|
||||
perSecond(): number;
|
||||
perMinute(): number;
|
||||
total(): number;
|
||||
byDomain(): Map<string, IRequestRateMetrics>;
|
||||
};
|
||||
|
||||
// Cumulative totals
|
||||
@@ -185,4 +191,4 @@ export interface IByteTracker {
|
||||
bytesOut: number;
|
||||
startTime: number;
|
||||
lastUpdate: number;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,6 +134,11 @@ export interface IRustBackendMetrics {
|
||||
h2Failures: number;
|
||||
}
|
||||
|
||||
export interface IRustHttpDomainRequestMetrics {
|
||||
requestsPerSecond: number;
|
||||
requestsLastMinute: number;
|
||||
}
|
||||
|
||||
export interface IRustMetricsSnapshot {
|
||||
activeConnections: number;
|
||||
totalConnections: number;
|
||||
@@ -150,6 +155,7 @@ export interface IRustMetricsSnapshot {
|
||||
totalHttpRequests: number;
|
||||
httpRequestsPerSec: number;
|
||||
httpRequestsPerSecRecent: number;
|
||||
httpDomainRequests: Record<string, IRustHttpDomainRequestMetrics>;
|
||||
activeUdpSessions: number;
|
||||
totalUdpSessions: number;
|
||||
totalDatagramsIn: number;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
|
||||
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IRequestRateMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
|
||||
import type { RustProxyBridge } from './rust-proxy-bridge.js';
|
||||
import type { IRustBackendMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
|
||||
import type { IRustBackendMetrics, IRustHttpDomainRequestMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
|
||||
|
||||
/**
|
||||
* Adapts Rust JSON metrics to the IMetrics interface.
|
||||
@@ -219,6 +219,18 @@ export class RustMetricsAdapter implements IMetrics {
|
||||
total: (): number => {
|
||||
return this.cache?.totalHttpRequests ?? this.cache?.totalConnections ?? 0;
|
||||
},
|
||||
byDomain: (): Map<string, IRequestRateMetrics> => {
|
||||
const result = new Map<string, IRequestRateMetrics>();
|
||||
if (this.cache?.httpDomainRequests) {
|
||||
for (const [domain, metrics] of Object.entries(this.cache.httpDomainRequests) as Array<[string, IRustHttpDomainRequestMetrics]>) {
|
||||
result.set(domain, {
|
||||
perSecond: metrics.requestsPerSecond ?? 0,
|
||||
lastMinute: metrics.requestsLastMinute ?? 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
},
|
||||
};
|
||||
|
||||
public totals = {
|
||||
|
||||
Reference in New Issue
Block a user