Compare commits

...

10 Commits

Author SHA1 Message Date
jkunz fdb5ec59bc v27.8.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 09:17:11 +00:00
jkunz 1ea290a085 fix(rustproxy-metrics): preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated 2026-04-26 09:17:11 +00:00
jkunz cb71f32b90 v27.8.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 12:43:59 +00:00
jkunz 46155ab12c feat(metrics): add per-domain HTTP request rate metrics 2026-04-14 12:43:59 +00:00
jkunz 490a310b54 v27.7.4
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 09:17:55 +00:00
jkunz 6c5180573a fix(rustproxy metrics): use stable route metrics keys across HTTP and passthrough listeners 2026-04-14 09:17:55 +00:00
jkunz 30e5ab308f v27.7.3
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 01:14:33 +00:00
jkunz d2a54b3491 fix(repo): no changes detected 2026-04-14 01:14:33 +00:00
jkunz dc922c97df v27.7.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 00:55:25 +00:00
jkunz 8d1bae7604 fix(docs): clarify metrics documentation for domain normalization and saturating gauges 2026-04-14 00:55:25 +00:00
16 changed files with 667 additions and 32 deletions
+31
View File
@@ -1,5 +1,36 @@
# Changelog
## 2026-04-26 - 27.8.1 - fix(rustproxy-metrics)
preserve high-throughput IPs in metrics snapshots when active-connection rankings are saturated
- Select snapshot IPs using a blend of active-connection and throughput rankings instead of only active connections
- Adds a regression test to ensure a high-bandwidth IP remains included when many other IPs have more active connections
## 2026-04-14 - 27.8.0 - feat(metrics)
add per-domain HTTP request rate metrics
- Record canonicalized HTTP request rates per domain in the Rust metrics collector and expose per-second and last-minute values in snapshots.
- Add TypeScript metrics interfaces and adapter support for requests.byDomain().
- Cover HTTP domain rate tracking and ensure TLS passthrough SNI traffic does not affect HTTP request rate metrics.
## 2026-04-14 - 27.7.4 - fix(rustproxy metrics)
use stable route metrics keys across HTTP and passthrough listeners
- adds a shared RouteConfig::metrics_key helper that prefers route name and falls back to route id
- updates HTTP, TCP, UDP, and QUIC metrics labeling to use the shared route metrics key consistently
- keeps route cancellation and rate limiter indexing bound to route config ids where required
- adds tests covering metrics key selection behavior
## 2026-04-14 - 27.7.3 - fix(repo)
no changes detected
## 2026-04-14 - 27.7.2 - fix(docs)
clarify metrics documentation for domain normalization and saturating gauges
- Document that per-IP domain keys are normalized to lowercase and have trailing dots stripped before counting.
- Clarify that the saturating close pattern also applies to connection and UDP active gauges.
## 2026-04-14 - 27.7.1 - fix(rustproxy-http,rustproxy-metrics)
fix domain-scoped request host detection and harden connection metrics cleanup
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "27.7.1",
"version": "27.8.1",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
+2 -2
View File
@@ -78,7 +78,7 @@ Entries are pruned via `retain_routes()` when routes are removed.
All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in `sample_all()` catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending.
**Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.
**Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Domain keys are normalized to lowercase with any trailing dot stripped so the same hostname does not fragment across multiple counters. Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.
### Per-Backend Metrics (keyed by "host:port")
@@ -110,7 +110,7 @@ Tracked via `ProtocolGuard` RAII guards and `FrontendProtocolTracker`. Five prot
| ws | `ProtocolGuard::frontend("ws")` on WebSocket upgrade |
| other | Fallback (TCP passthrough without HTTP) |
Uses `fetch_update` for saturating decrements to prevent underflow races.
Uses `fetch_update` for saturating decrements to prevent underflow races. The same saturating-close pattern is also used for connection and UDP active gauges.
### Backend Protocol Distribution
@@ -656,6 +656,11 @@ impl RouteConfig {
self.route_match.ports.to_ports()
}
/// Stable key used for frontend route-scoped metrics.
pub fn metrics_key(&self) -> Option<&str> {
self.name.as_deref().or(self.id.as_deref())
}
/// Get the TLS mode for this route (from action-level or first target).
pub fn tls_mode(&self) -> Option<&crate::tls_types::TlsMode> {
// Check action-level TLS first
@@ -673,3 +678,63 @@ impl RouteConfig {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_route(name: Option<&str>, id: Option<&str>) -> RouteConfig {
RouteConfig {
id: id.map(str::to_string),
route_match: RouteMatch {
ports: PortRange::Single(443),
transport: None,
domains: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
protocol: None,
},
action: RouteAction {
action_type: RouteActionType::Forward,
targets: None,
tls: None,
websocket: None,
load_balancing: None,
advanced: None,
options: None,
send_proxy_protocol: None,
udp: None,
},
headers: None,
security: None,
name: name.map(str::to_string),
description: None,
priority: None,
tags: None,
enabled: None,
}
}
#[test]
fn metrics_key_prefers_name() {
let route = test_route(Some("named-route"), Some("route-id"));
assert_eq!(route.metrics_key(), Some("named-route"));
}
#[test]
fn metrics_key_falls_back_to_id() {
let route = test_route(None, Some("route-id"));
assert_eq!(route.metrics_key(), Some("route-id"));
}
#[test]
fn metrics_key_is_absent_without_name_or_id() {
let route = test_route(None, None);
assert_eq!(route.metrics_key(), None);
}
}
@@ -639,10 +639,12 @@ impl HttpProxyService {
}
};
let route_id = route_match.route.id.as_deref();
let route_config_id = route_match.route.id.as_deref();
let route_id = route_match.route.metrics_key();
let ip_str = ip_string; // reuse from above (avoid redundant to_string())
self.metrics.record_http_request();
if let Some(ref h) = host {
self.metrics.record_http_domain_request(h);
self.metrics.record_ip_domain_request(&ip_str, h);
}
@@ -654,7 +656,7 @@ impl HttpProxyService {
.as_ref()
.filter(|rl| rl.enabled)
.map(|rl| {
let route_key = route_id.unwrap_or("__default__").to_string();
let route_key = route_config_id.unwrap_or("__default__").to_string();
self.route_rate_limiters
.entry(route_key)
.or_insert_with(|| Arc::new(RateLimiter::new(rl.max_requests, rl.window)))
+183 -9
View File
@@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use crate::throughput::{ThroughputSample, ThroughputTracker};
use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker};
/// Aggregated metrics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -26,6 +26,7 @@ pub struct Metrics {
pub total_http_requests: u64,
pub http_requests_per_sec: u64,
pub http_requests_per_sec_recent: u64,
pub http_domain_requests: std::collections::HashMap<String, HttpDomainRequestMetrics>,
// UDP metrics
pub active_udp_sessions: u64,
pub total_udp_sessions: u64,
@@ -66,6 +67,14 @@ pub struct IpMetrics {
pub domain_requests: HashMap<String, u64>,
}
/// Per-domain HTTP request rate metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpDomainRequestMetrics {
pub requests_per_second: u64,
pub requests_last_minute: u64,
}
/// Per-backend metrics (keyed by "host:port").
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -135,7 +144,7 @@ pub struct Statistics {
/// Default retention for throughput samples (1 hour).
const DEFAULT_RETENTION_SECONDS: usize = 3600;
/// Maximum number of IPs to include in a snapshot (top by active connections).
/// Maximum number of IPs to include in a snapshot.
const MAX_IPS_IN_SNAPSHOT: usize = 100;
/// Maximum number of backends to include in a snapshot (top by total connections).
@@ -144,6 +153,9 @@ const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse).
const MAX_DOMAINS_PER_IP: usize = 256;
/// Number of one-second HTTP request samples retained per domain.
const HTTP_DOMAIN_REQUEST_WINDOW_SECONDS: usize = 60;
fn canonicalize_domain_key(domain: &str) -> Option<String> {
let normalized = domain.trim().trim_end_matches('.').to_ascii_lowercase();
if normalized.is_empty() {
@@ -201,6 +213,7 @@ pub struct MetricsCollector {
total_http_requests: AtomicU64,
pending_http_requests: AtomicU64,
http_request_throughput: Mutex<ThroughputTracker>,
http_domain_request_rates: DashMap<String, Mutex<RequestRateTracker>>,
// ── UDP metrics ──
active_udp_sessions: AtomicU64,
@@ -284,6 +297,7 @@ impl MetricsCollector {
total_http_requests: AtomicU64::new(0),
pending_http_requests: AtomicU64::new(0),
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
http_domain_request_rates: DashMap::new(),
frontend_h1_active: AtomicU64::new(0),
frontend_h1_total: AtomicU64::new(0),
frontend_h2_active: AtomicU64::new(0),
@@ -522,6 +536,24 @@ impl MetricsCollector {
self.pending_http_requests.fetch_add(1, Ordering::Relaxed);
}
/// Record a real HTTP request for a canonicalized domain.
pub fn record_http_domain_request(&self, domain: &str) {
let Some(domain) = canonicalize_domain_key(domain) else {
return;
};
self.http_domain_request_rates
.entry(domain.clone())
.or_insert_with(|| {
Mutex::new(RequestRateTracker::new(HTTP_DOMAIN_REQUEST_WINDOW_SECONDS))
});
if let Some(tracker_ref) = self.http_domain_request_rates.get(domain.as_str()) {
if let Ok(mut tracker) = tracker_ref.value().lock() {
tracker.record_event();
}
}
}
/// Record a domain request/connection for a frontend IP.
///
/// Called per HTTP request (with Host header) and per TCP passthrough
@@ -791,8 +823,7 @@ impl MetricsCollector {
/// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval).
///
/// Drains the lock-free pending counters and feeds the accumulated bytes
/// into the throughput trackers (under Mutex). This is the only place
/// the Mutex is locked.
/// into the throughput trackers under their sampling mutexes.
pub fn sample_all(&self) {
// Drain global pending bytes and feed into the tracker
let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed);
@@ -873,6 +904,20 @@ impl MetricsCollector {
tracker.sample();
}
// Advance HTTP domain request windows and prune fully idle domains.
let mut stale_http_domains = Vec::new();
for entry in self.http_domain_request_rates.iter() {
if let Ok(mut tracker) = entry.value().lock() {
tracker.advance_to_now();
if tracker.is_idle() {
stale_http_domains.push(entry.key().clone());
}
}
}
for domain in stale_http_domains {
self.http_domain_request_rates.remove(&domain);
}
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
@@ -1019,8 +1064,8 @@ impl MetricsCollector {
);
}
// Collect per-IP metrics — only IPs with active connections or total > 0,
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
// Collect per-IP metrics — capped to the IPs most relevant for either
// active connection visibility or bandwidth attribution.
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> =
Vec::new();
for entry in self.ip_total_connections.iter() {
@@ -1068,9 +1113,54 @@ impl MetricsCollector {
domain_requests,
));
}
// Sort by active connections descending, then cap
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
ip_entries.truncate(MAX_IPS_IN_SNAPSHOT);
if ip_entries.len() > MAX_IPS_IN_SNAPSHOT {
let mut selected = vec![false; ip_entries.len()];
let mut selected_count = 0usize;
let mut active_rank: Vec<usize> = (0..ip_entries.len()).collect();
active_rank.sort_by(|&a, &b| {
ip_entries[b]
.1
.cmp(&ip_entries[a].1)
.then_with(|| ip_entries[b].2.cmp(&ip_entries[a].2))
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
});
let mut throughput_rank: Vec<usize> = (0..ip_entries.len()).collect();
throughput_rank.sort_by(|&a, &b| {
let a_tp = ip_entries[a].5.saturating_add(ip_entries[a].6);
let b_tp = ip_entries[b].5.saturating_add(ip_entries[b].6);
let a_bytes = ip_entries[a].3.saturating_add(ip_entries[a].4);
let b_bytes = ip_entries[b].3.saturating_add(ip_entries[b].4);
b_tp.cmp(&a_tp)
.then_with(|| b_bytes.cmp(&a_bytes))
.then_with(|| ip_entries[b].1.cmp(&ip_entries[a].1))
.then_with(|| ip_entries[a].0.cmp(&ip_entries[b].0))
});
for idx in active_rank.into_iter().take(MAX_IPS_IN_SNAPSHOT / 2) {
if !selected[idx] {
selected[idx] = true;
selected_count += 1;
}
}
for idx in throughput_rank {
if selected_count >= MAX_IPS_IN_SNAPSHOT {
break;
}
if !selected[idx] {
selected[idx] = true;
selected_count += 1;
}
}
ip_entries = ip_entries
.into_iter()
.enumerate()
.filter_map(|(idx, entry)| selected[idx].then_some(entry))
.collect();
}
let mut ips = std::collections::HashMap::new();
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries {
@@ -1179,6 +1269,24 @@ impl MetricsCollector {
})
.unwrap_or((0, 0));
let mut http_domain_requests = std::collections::HashMap::new();
for entry in self.http_domain_request_rates.iter() {
if let Ok(mut tracker) = entry.value().lock() {
tracker.advance_to_now();
let requests_per_second = tracker.last_second();
let requests_last_minute = tracker.last_minute();
if requests_per_second > 0 || requests_last_minute > 0 {
http_domain_requests.insert(
entry.key().clone(),
HttpDomainRequestMetrics {
requests_per_second,
requests_last_minute,
},
);
}
}
}
Metrics {
active_connections: self.active_connections(),
total_connections: self.total_connections(),
@@ -1195,6 +1303,7 @@ impl MetricsCollector {
total_http_requests: self.total_http_requests.load(Ordering::Relaxed),
http_requests_per_sec: http_rps,
http_requests_per_sec_recent: http_rps_recent,
http_domain_requests,
active_udp_sessions: self.active_udp_sessions.load(Ordering::Relaxed),
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
@@ -1388,6 +1497,30 @@ mod tests {
assert!(collector.ip_connections.get("1.2.3.4").is_none());
}
#[test]
fn test_snapshot_retains_high_throughput_ip_over_many_active_ips() {
let collector = MetricsCollector::with_retention(60);
for i in 1..=(MAX_IPS_IN_SNAPSHOT + 20) {
let ip = format!("10.0.0.{}", i);
collector.connection_opened(Some("scanner-route"), Some(&ip));
collector.connection_opened(Some("scanner-route"), Some(&ip));
}
let busy_ip = "203.0.113.10";
collector.connection_opened(Some("download-route"), Some(busy_ip));
collector.record_bytes(0, 900_000, Some("download-route"), Some(busy_ip));
collector.sample_all();
let snapshot = collector.snapshot();
let busy_metrics = snapshot.ips.get(busy_ip).unwrap();
assert_eq!(snapshot.ips.len(), MAX_IPS_IN_SNAPSHOT);
assert_eq!(busy_metrics.active_connections, 1);
assert_eq!(busy_metrics.bytes_out, 900_000);
assert_eq!(busy_metrics.throughput_out_bytes_per_sec, 900_000);
}
#[test]
fn test_per_ip_full_eviction_on_last_close() {
let collector = MetricsCollector::with_retention(60);
@@ -1514,6 +1647,47 @@ mod tests {
assert_eq!(snapshot.http_requests_per_sec, 3);
}
#[test]
fn test_http_domain_request_rates_are_canonicalized() {
let collector = MetricsCollector::with_retention(60);
collector.record_http_domain_request("Example.COM");
collector.record_http_domain_request("example.com.");
collector.record_http_domain_request(" example.com ");
let now_sec = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if let Some(tracker) = collector.http_domain_request_rates.get("example.com") {
tracker.value().lock().unwrap().advance_to(now_sec + 1);
}
let snapshot = collector.snapshot();
let metrics = snapshot.http_domain_requests.get("example.com").unwrap();
assert_eq!(snapshot.http_domain_requests.len(), 1);
assert_eq!(metrics.requests_per_second, 3);
assert_eq!(metrics.requests_last_minute, 3);
}
#[test]
fn test_ip_domain_requests_do_not_affect_http_domain_request_rates() {
let collector = MetricsCollector::with_retention(60);
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_ip_domain_request("10.0.0.1", "example.com");
let snapshot = collector.snapshot();
assert!(snapshot.http_domain_requests.is_empty());
assert_eq!(
snapshot
.ips
.get("10.0.0.1")
.and_then(|ip| ip.domain_requests.get("example.com")),
Some(&1)
);
}
#[test]
fn test_retain_routes_prunes_stale() {
let collector = MetricsCollector::with_retention(60);
+146 -1
View File
@@ -29,6 +29,113 @@ pub struct ThroughputTracker {
created_at: Instant,
}
fn unix_timestamp_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
/// Circular buffer for per-second event counts.
///
/// Unlike `ThroughputTracker`, events are recorded directly into the current
/// second so request counts remain stable even when the collector is sampled
/// more frequently than once per second.
pub(crate) struct RequestRateTracker {
samples: Vec<u64>,
write_index: usize,
count: usize,
capacity: usize,
current_second: Option<u64>,
current_count: u64,
}
impl RequestRateTracker {
pub(crate) fn new(retention_seconds: usize) -> Self {
Self {
samples: Vec::with_capacity(retention_seconds.max(1)),
write_index: 0,
count: 0,
capacity: retention_seconds.max(1),
current_second: None,
current_count: 0,
}
}
fn push_sample(&mut self, count: u64) {
if self.samples.len() < self.capacity {
self.samples.push(count);
} else {
self.samples[self.write_index] = count;
}
self.write_index = (self.write_index + 1) % self.capacity;
self.count = (self.count + 1).min(self.capacity);
}
pub(crate) fn record_event(&mut self) {
self.record_events_at(unix_timestamp_seconds(), 1);
}
pub(crate) fn record_events_at(&mut self, now_sec: u64, count: u64) {
self.advance_to(now_sec);
self.current_count = self.current_count.saturating_add(count);
}
pub(crate) fn advance_to_now(&mut self) {
self.advance_to(unix_timestamp_seconds());
}
pub(crate) fn advance_to(&mut self, now_sec: u64) {
match self.current_second {
Some(current_second) if now_sec > current_second => {
self.push_sample(self.current_count);
for _ in 1..(now_sec - current_second) {
self.push_sample(0);
}
self.current_second = Some(now_sec);
self.current_count = 0;
}
Some(_) => {}
None => {
self.current_second = Some(now_sec);
self.current_count = 0;
}
}
}
fn sum_recent(&self, window_seconds: usize) -> u64 {
let window = window_seconds.min(self.count);
if window == 0 {
return 0;
}
let mut total = 0u64;
for i in 0..window {
let idx = if self.write_index >= i + 1 {
self.write_index - i - 1
} else {
self.capacity - (i + 1 - self.write_index)
};
if idx < self.samples.len() {
total += self.samples[idx];
}
}
total
}
pub(crate) fn last_second(&self) -> u64 {
self.sum_recent(1)
}
pub(crate) fn last_minute(&self) -> u64 {
self.sum_recent(60)
}
pub(crate) fn is_idle(&self) -> bool {
self.current_count == 0 && self.sum_recent(self.capacity) == 0
}
}
impl ThroughputTracker {
/// Create a new tracker with the given capacity (seconds of retention).
pub fn new(retention_seconds: usize) -> Self {
@@ -46,7 +153,8 @@ impl ThroughputTracker {
/// Record bytes (called from data flow callbacks).
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64) {
self.pending_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.pending_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
self.pending_bytes_out
.fetch_add(bytes_out, Ordering::Relaxed);
}
/// Take a sample (called at 1Hz).
@@ -229,4 +337,41 @@ mod tests {
let history = tracker.history(10);
assert!(history.is_empty());
}
#[test]
fn test_request_rate_tracker_counts_last_second_and_last_minute() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 2);
tracker.record_events_at(100, 3);
tracker.advance_to(101);
assert_eq!(tracker.last_second(), 5);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_adds_zero_samples_for_gaps() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 4);
tracker.record_events_at(102, 1);
tracker.advance_to(103);
assert_eq!(tracker.last_second(), 1);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_decays_to_zero_over_window() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 7);
tracker.advance_to(101);
tracker.advance_to(161);
assert_eq!(tracker.last_second(), 0);
assert_eq!(tracker.last_minute(), 0);
assert!(tracker.is_idle());
}
}
@@ -420,7 +420,7 @@ pub async fn quic_accept_loop(
}
conn_tracker.connection_opened(&ip);
let route_id = route.name.clone().or(route.id.clone());
let route_id = route.metrics_key().map(str::to_string);
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
// Resolve per-route cancel token (child of global cancel)
@@ -541,7 +541,7 @@ async fn handle_quic_stream_forwarding(
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
let route_id = route.name.as_deref().or(route.id.as_deref());
let route_id = route.metrics_key();
let metrics_arc = metrics;
// Resolve backend target
@@ -715,10 +715,11 @@ impl TcpListenerManager {
} else if let Some(target) = quick_match.target {
let target_host = target.host.first().to_string();
let target_port = target.port.resolve(port);
let route_id = quick_match.route.id.as_deref();
let route_config_id = quick_match.route.id.as_deref();
let route_id = quick_match.route.metrics_key();
// Resolve per-route cancel token (child of global cancel)
let route_cancel = match route_id {
let route_cancel = match route_config_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
@@ -733,7 +734,7 @@ impl TcpListenerManager {
cancel: conn_cancel.clone(),
source_ip: peer_addr.ip(),
domain: None, // fast path has no domain
route_id: route_id.map(|s| s.to_string()),
route_id: route_config_id.map(|s| s.to_string()),
},
);
@@ -905,12 +906,13 @@ impl TcpListenerManager {
}
};
let route_id = route_match.route.id.as_deref();
let route_config_id = route_match.route.id.as_deref();
let route_id = route_match.route.metrics_key();
// Resolve per-route cancel token (child of global cancel).
// When this route is removed via updateRoutes, the token is cancelled,
// terminating all connections on this route.
let route_cancel = match route_id {
let route_cancel = match route_config_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
@@ -925,7 +927,7 @@ impl TcpListenerManager {
cancel: cancel.clone(),
source_ip: peer_addr.ip(),
domain: domain.clone(),
route_id: route_id.map(|s| s.to_string()),
route_id: route_config_id.map(|s| s.to_string()),
},
);
@@ -1314,9 +1316,7 @@ impl TcpListenerManager {
};
// Build metadata JSON
let route_key = route_match.route.name.as_deref()
.or(route_match.route.id.as_deref())
.unwrap_or("unknown");
let route_key = route_match.route.metrics_key().unwrap_or("unknown");
let metadata = serde_json::json!({
"routeKey": route_key,
@@ -617,7 +617,7 @@ impl UdpListenerManager {
};
let route = route_match.route;
let route_id = route.name.as_deref().or(route.id.as_deref());
let route_id = route.metrics_key();
// Socket handler routes → relay datagram to TS via persistent Unix socket
if route.action.action_type == RouteActionType::SocketHandler {
+191
View File
@@ -0,0 +1,191 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { SmartProxy } from '../ts/index.js';
import * as http from 'http';
import * as net from 'net';
import * as tls from 'tls';
import * as fs from 'fs';
import * as path from 'path';
import { fileURLToPath } from 'url';
import { assertPortsFree, findFreePorts } from './helpers/port-allocator.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const CERT_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'cert.pem'), 'utf8');
const KEY_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'key.pem'), 'utf8');
let httpBackendPort: number;
let tlsBackendPort: number;
let httpProxyPort: number;
let tlsProxyPort: number;
let httpBackend: http.Server;
let tlsBackend: tls.Server;
let proxy: SmartProxy;
async function pollMetrics(proxyToPoll: SmartProxy): Promise<void> {
await (proxyToPoll as any).metricsAdapter.poll();
}
async function waitForCondition(
callback: () => Promise<boolean>,
timeoutMs: number = 5000,
stepMs: number = 100,
): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (await callback()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, stepMs));
}
throw new Error(`Condition not met within ${timeoutMs}ms`);
}
function hasIpDomainRequest(domain: string): boolean {
const byIp = proxy.getMetrics().connections.domainRequestsByIP();
for (const domainMap of byIp.values()) {
if (domainMap.has(domain)) {
return true;
}
}
return false;
}
tap.test('setup - backend servers for HTTP domain rate metrics', async () => {
[httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort] = await findFreePorts(4);
httpBackend = http.createServer((req, res) => {
let body = '';
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(`ok:${body}`);
});
});
await new Promise<void>((resolve) => {
httpBackend.listen(httpBackendPort, () => resolve());
});
tlsBackend = tls.createServer({ cert: CERT_PEM, key: KEY_PEM }, (socket) => {
socket.on('data', (data) => {
socket.write(data);
});
socket.on('error', () => {});
});
await new Promise<void>((resolve) => {
tlsBackend.listen(tlsBackendPort, () => resolve());
});
});
tap.test('setup - start proxy with HTTP and TLS passthrough routes', async () => {
proxy = new SmartProxy({
routes: [
{
id: 'http-domain-rates',
name: 'http-domain-rates',
match: { ports: httpProxyPort, domains: 'example.com' },
action: {
type: 'forward',
targets: [{ host: 'localhost', port: httpBackendPort }],
},
},
{
id: 'tls-passthrough-domain-rates',
name: 'tls-passthrough-domain-rates',
match: { ports: tlsProxyPort, domains: 'passthrough.example.com' },
action: {
type: 'forward',
tls: { mode: 'passthrough' },
targets: [{ host: 'localhost', port: tlsBackendPort }],
},
},
],
metrics: { enabled: true, sampleIntervalMs: 100, retentionSeconds: 60 },
});
await proxy.start();
await new Promise((resolve) => setTimeout(resolve, 300));
});
tap.test('HTTP requests populate per-domain HTTP request rates', async () => {
for (let i = 0; i < 3; i++) {
await new Promise<void>((resolve, reject) => {
const body = `payload-${i}`;
const req = http.request(
{
hostname: 'localhost',
port: httpProxyPort,
path: '/echo',
method: 'POST',
headers: {
Host: 'Example.COM',
'Content-Type': 'text/plain',
'Content-Length': String(body.length),
},
},
(res) => {
res.resume();
res.on('end', () => resolve());
},
);
req.on('error', reject);
req.end(body);
});
}
await waitForCondition(async () => {
await pollMetrics(proxy);
const domainMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
return (domainMetrics?.lastMinute ?? 0) >= 3 && (domainMetrics?.perSecond ?? 0) > 0;
});
const exampleMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
expect(exampleMetrics).toBeTruthy();
expect(exampleMetrics?.lastMinute).toEqual(3);
expect(exampleMetrics?.perSecond).toBeGreaterThan(0);
});
tap.test('TLS passthrough SNI does not inflate HTTP domain request rates', async () => {
const tlsClient = tls.connect({
host: 'localhost',
port: tlsProxyPort,
servername: 'passthrough.example.com',
rejectUnauthorized: false,
});
await new Promise<void>((resolve, reject) => {
tlsClient.once('secureConnect', () => resolve());
tlsClient.once('error', reject);
});
const echoPromise = new Promise<void>((resolve, reject) => {
tlsClient.once('data', () => resolve());
tlsClient.once('error', reject);
});
tlsClient.write(Buffer.from('hello over tls passthrough'));
await echoPromise;
await waitForCondition(async () => {
await pollMetrics(proxy);
return hasIpDomainRequest('passthrough.example.com');
});
const requestRates = proxy.getMetrics().requests.byDomain();
expect(requestRates.has('passthrough.example.com')).toBeFalse();
expect(requestRates.get('example.com')?.lastMinute).toEqual(3);
expect(hasIpDomainRequest('passthrough.example.com')).toBeTrue();
tlsClient.destroy();
});
tap.test('cleanup - stop proxy and close backend servers', async () => {
await proxy.stop();
await new Promise<void>((resolve) => httpBackend.close(() => resolve()));
await new Promise<void>((resolve) => tlsBackend.close(() => resolve()));
await assertPortsFree([httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort]);
});
export default tap.start()
+4 -1
View File
@@ -83,6 +83,9 @@ tap.test('should verify new metrics API structure', async () => {
expect(metrics.throughput).toHaveProperty('history');
expect(metrics.throughput).toHaveProperty('byRoute');
expect(metrics.throughput).toHaveProperty('byIP');
// Check request methods
expect(metrics.requests).toHaveProperty('byDomain');
});
tap.test('should track active connections', async (tools) => {
@@ -273,4 +276,4 @@ tap.test('should clean up resources', async () => {
await assertPortsFree([echoServerPort, proxyPort]);
});
export default tap.start();
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '27.7.1',
version: '27.8.1',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}
@@ -29,6 +29,11 @@ export interface IThroughputHistoryPoint {
out: number;
}
export interface IRequestRateMetrics {
perSecond: number;
lastMinute: number;
}
/**
* Main metrics interface with clean, grouped API
*/
@@ -81,6 +86,7 @@ export interface IMetrics {
perSecond(): number;
perMinute(): number;
total(): number;
byDomain(): Map<string, IRequestRateMetrics>;
};
// Cumulative totals
@@ -185,4 +191,4 @@ export interface IByteTracker {
bytesOut: number;
startTime: number;
lastUpdate: number;
}
}
@@ -134,6 +134,11 @@ export interface IRustBackendMetrics {
h2Failures: number;
}
export interface IRustHttpDomainRequestMetrics {
requestsPerSecond: number;
requestsLastMinute: number;
}
export interface IRustMetricsSnapshot {
activeConnections: number;
totalConnections: number;
@@ -150,6 +155,7 @@ export interface IRustMetricsSnapshot {
totalHttpRequests: number;
httpRequestsPerSec: number;
httpRequestsPerSecRecent: number;
httpDomainRequests: Record<string, IRustHttpDomainRequestMetrics>;
activeUdpSessions: number;
totalUdpSessions: number;
totalDatagramsIn: number;
+14 -2
View File
@@ -1,6 +1,6 @@
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IRequestRateMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { RustProxyBridge } from './rust-proxy-bridge.js';
import type { IRustBackendMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
import type { IRustBackendMetrics, IRustHttpDomainRequestMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
/**
* Adapts Rust JSON metrics to the IMetrics interface.
@@ -219,6 +219,18 @@ export class RustMetricsAdapter implements IMetrics {
total: (): number => {
return this.cache?.totalHttpRequests ?? this.cache?.totalConnections ?? 0;
},
byDomain: (): Map<string, IRequestRateMetrics> => {
const result = new Map<string, IRequestRateMetrics>();
if (this.cache?.httpDomainRequests) {
for (const [domain, metrics] of Object.entries(this.cache.httpDomainRequests) as Array<[string, IRustHttpDomainRequestMetrics]>) {
result.set(domain, {
perSecond: metrics.requestsPerSecond ?? 0,
lastMinute: metrics.requestsLastMinute ?? 0,
});
}
}
return result;
},
};
public totals = {