fix(rustproxy-http,rustproxy-metrics): fix domain-scoped request host detection and harden connection metrics cleanup

This commit is contained in:
2026-04-14 00:54:12 +00:00
parent 6ee7237357
commit a53a2c4ca5
15 changed files with 1813 additions and 590 deletions
+489 -150
View File
@@ -144,6 +144,15 @@ const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse).
const MAX_DOMAINS_PER_IP: usize = 256;
fn canonicalize_domain_key(domain: &str) -> Option<String> {
let normalized = domain.trim().trim_end_matches('.').to_ascii_lowercase();
if normalized.is_empty() {
None
} else {
Some(normalized)
}
}
/// Metrics collector tracking connections and throughput.
///
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
@@ -334,25 +343,43 @@ impl MetricsCollector {
/// Record a connection closing.
pub fn connection_closed(&self, route_id: Option<&str>, source_ip: Option<&str>) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
self.active_connections
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.ok();
if let Some(route_id) = route_id {
if let Some(counter) = self.route_connections.get(route_id) {
let val = counter.load(Ordering::Relaxed);
if val > 0 {
counter.fetch_sub(1, Ordering::Relaxed);
}
counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.ok();
}
}
if let Some(ip) = source_ip {
if let Some(counter) = self.ip_connections.get(ip) {
let val = counter.load(Ordering::Relaxed);
if val > 0 {
counter.fetch_sub(1, Ordering::Relaxed);
}
let prev = counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.ok();
// Clean up zero-count entries to prevent memory growth
if val <= 1 {
if matches!(prev, Some(v) if v <= 1) {
drop(counter);
self.ip_connections.remove(ip);
// Evict all per-IP tracking data for this IP
@@ -371,17 +398,25 @@ impl MetricsCollector {
///
/// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters —
/// no Mutex is taken. The throughput trackers are fed during `sample_all()`.
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64, route_id: Option<&str>, source_ip: Option<&str>) {
pub fn record_bytes(
&self,
bytes_in: u64,
bytes_out: u64,
route_id: Option<&str>,
source_ip: Option<&str>,
) {
// Short-circuit: only touch counters for the direction that has data.
// CountingBody always calls with one direction zero — skipping the zero
// direction avoids ~50% of DashMap shard-locked reads per call.
if bytes_in > 0 {
self.total_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.global_pending_tp_in.fetch_add(bytes_in, Ordering::Relaxed);
self.global_pending_tp_in
.fetch_add(bytes_in, Ordering::Relaxed);
}
if bytes_out > 0 {
self.total_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
self.global_pending_tp_out.fetch_add(bytes_out, Ordering::Relaxed);
self.global_pending_tp_out
.fetch_add(bytes_out, Ordering::Relaxed);
}
// Per-route tracking: use get() first (zero-alloc fast path for existing entries),
@@ -391,7 +426,8 @@ impl MetricsCollector {
if let Some(counter) = self.route_bytes_in.get(route_id) {
counter.fetch_add(bytes_in, Ordering::Relaxed);
} else {
self.route_bytes_in.entry(route_id.to_string())
self.route_bytes_in
.entry(route_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_in, Ordering::Relaxed);
}
@@ -400,7 +436,8 @@ impl MetricsCollector {
if let Some(counter) = self.route_bytes_out.get(route_id) {
counter.fetch_add(bytes_out, Ordering::Relaxed);
} else {
self.route_bytes_out.entry(route_id.to_string())
self.route_bytes_out
.entry(route_id.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_out, Ordering::Relaxed);
}
@@ -408,13 +445,23 @@ impl MetricsCollector {
// Accumulate into per-route pending throughput counters (lock-free)
if let Some(entry) = self.route_pending_tp.get(route_id) {
if bytes_in > 0 { entry.0.fetch_add(bytes_in, Ordering::Relaxed); }
if bytes_out > 0 { entry.1.fetch_add(bytes_out, Ordering::Relaxed); }
if bytes_in > 0 {
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
}
if bytes_out > 0 {
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
} else {
let entry = self.route_pending_tp.entry(route_id.to_string())
let entry = self
.route_pending_tp
.entry(route_id.to_string())
.or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0)));
if bytes_in > 0 { entry.0.fetch_add(bytes_in, Ordering::Relaxed); }
if bytes_out > 0 { entry.1.fetch_add(bytes_out, Ordering::Relaxed); }
if bytes_in > 0 {
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
}
if bytes_out > 0 {
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
}
}
@@ -428,7 +475,8 @@ impl MetricsCollector {
if let Some(counter) = self.ip_bytes_in.get(ip) {
counter.fetch_add(bytes_in, Ordering::Relaxed);
} else {
self.ip_bytes_in.entry(ip.to_string())
self.ip_bytes_in
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_in, Ordering::Relaxed);
}
@@ -437,7 +485,8 @@ impl MetricsCollector {
if let Some(counter) = self.ip_bytes_out.get(ip) {
counter.fetch_add(bytes_out, Ordering::Relaxed);
} else {
self.ip_bytes_out.entry(ip.to_string())
self.ip_bytes_out
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_out, Ordering::Relaxed);
}
@@ -445,13 +494,23 @@ impl MetricsCollector {
// Accumulate into per-IP pending throughput counters (lock-free)
if let Some(entry) = self.ip_pending_tp.get(ip) {
if bytes_in > 0 { entry.0.fetch_add(bytes_in, Ordering::Relaxed); }
if bytes_out > 0 { entry.1.fetch_add(bytes_out, Ordering::Relaxed); }
if bytes_in > 0 {
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
}
if bytes_out > 0 {
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
} else {
let entry = self.ip_pending_tp.entry(ip.to_string())
let entry = self
.ip_pending_tp
.entry(ip.to_string())
.or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0)));
if bytes_in > 0 { entry.0.fetch_add(bytes_in, Ordering::Relaxed); }
if bytes_out > 0 { entry.1.fetch_add(bytes_out, Ordering::Relaxed); }
if bytes_in > 0 {
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
}
if bytes_out > 0 {
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
}
}
}
@@ -469,9 +528,13 @@ impl MetricsCollector {
/// connection (with SNI domain). The common case (IP + domain both already
/// tracked) is two DashMap reads + one atomic increment — zero allocation.
pub fn record_ip_domain_request(&self, ip: &str, domain: &str) {
let Some(domain) = canonicalize_domain_key(domain) else {
return;
};
// Fast path: IP already tracked, domain already tracked
if let Some(domains) = self.ip_domain_requests.get(ip) {
if let Some(counter) = domains.get(domain) {
if let Some(counter) = domains.get(domain.as_str()) {
counter.fetch_add(1, Ordering::Relaxed);
return;
}
@@ -480,7 +543,7 @@ impl MetricsCollector {
return;
}
domains
.entry(domain.to_string())
.entry(domain)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
return;
@@ -490,7 +553,7 @@ impl MetricsCollector {
return;
}
let inner = DashMap::with_capacity_and_shard_amount(4, 2);
inner.insert(domain.to_string(), AtomicU64::new(1));
inner.insert(domain, AtomicU64::new(1));
self.ip_domain_requests.insert(ip.to_string(), inner);
}
@@ -504,7 +567,15 @@ impl MetricsCollector {
/// Record a UDP session closed.
pub fn udp_session_closed(&self) {
self.active_udp_sessions.fetch_sub(1, Ordering::Relaxed);
self.active_udp_sessions
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.ok();
}
/// Record a UDP datagram (inbound or outbound).
@@ -553,9 +624,15 @@ impl MetricsCollector {
let (active, _) = self.frontend_proto_counters(proto);
// Atomic saturating decrement — avoids TOCTOU race where concurrent
// closes could both read val=1, both subtract, wrapping to u64::MAX.
active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 { Some(v - 1) } else { None }
}).ok();
active
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.ok();
}
/// Record a backend connection opened with a given protocol.
@@ -569,9 +646,15 @@ impl MetricsCollector {
pub fn backend_protocol_closed(&self, proto: &str) {
let (active, _) = self.backend_proto_counters(proto);
// Atomic saturating decrement — see frontend_protocol_closed for rationale.
active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 { Some(v - 1) } else { None }
}).ok();
active
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 {
Some(v - 1)
} else {
None
}
})
.ok();
}
// ── Per-backend recording methods ──
@@ -681,17 +764,28 @@ impl MetricsCollector {
/// Remove per-backend metrics for backends no longer in any route target.
pub fn retain_backends(&self, active_backends: &HashSet<String>) {
self.backend_active.retain(|k, _| active_backends.contains(k));
self.backend_total.retain(|k, _| active_backends.contains(k));
self.backend_protocol.retain(|k, _| active_backends.contains(k));
self.backend_connect_errors.retain(|k, _| active_backends.contains(k));
self.backend_handshake_errors.retain(|k, _| active_backends.contains(k));
self.backend_request_errors.retain(|k, _| active_backends.contains(k));
self.backend_connect_time_us.retain(|k, _| active_backends.contains(k));
self.backend_connect_count.retain(|k, _| active_backends.contains(k));
self.backend_pool_hits.retain(|k, _| active_backends.contains(k));
self.backend_pool_misses.retain(|k, _| active_backends.contains(k));
self.backend_h2_failures.retain(|k, _| active_backends.contains(k));
self.backend_active
.retain(|k, _| active_backends.contains(k));
self.backend_total
.retain(|k, _| active_backends.contains(k));
self.backend_protocol
.retain(|k, _| active_backends.contains(k));
self.backend_connect_errors
.retain(|k, _| active_backends.contains(k));
self.backend_handshake_errors
.retain(|k, _| active_backends.contains(k));
self.backend_request_errors
.retain(|k, _| active_backends.contains(k));
self.backend_connect_time_us
.retain(|k, _| active_backends.contains(k));
self.backend_connect_count
.retain(|k, _| active_backends.contains(k));
self.backend_pool_hits
.retain(|k, _| active_backends.contains(k));
self.backend_pool_misses
.retain(|k, _| active_backends.contains(k));
self.backend_h2_failures
.retain(|k, _| active_backends.contains(k));
}
/// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval).
@@ -782,41 +876,64 @@ impl MetricsCollector {
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
self.ip_bytes_in.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_domain_requests.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_in
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_pending_tp
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_throughput
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_total_connections
.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_domain_requests
.retain(|k, _| self.ip_connections.contains_key(k));
// Safety-net: prune orphaned backend error/stats entries for backends
// that have no active or total connections (error-only backends).
// These accumulate when backend_connect_error/backend_handshake_error
// create entries but backend_connection_opened is never called.
let known_backends: HashSet<String> = self.backend_active.iter()
let known_backends: HashSet<String> = self
.backend_active
.iter()
.map(|e| e.key().clone())
.chain(self.backend_total.iter().map(|e| e.key().clone()))
.collect();
self.backend_connect_errors.retain(|k, _| known_backends.contains(k));
self.backend_handshake_errors.retain(|k, _| known_backends.contains(k));
self.backend_request_errors.retain(|k, _| known_backends.contains(k));
self.backend_connect_time_us.retain(|k, _| known_backends.contains(k));
self.backend_connect_count.retain(|k, _| known_backends.contains(k));
self.backend_pool_hits.retain(|k, _| known_backends.contains(k));
self.backend_pool_misses.retain(|k, _| known_backends.contains(k));
self.backend_h2_failures.retain(|k, _| known_backends.contains(k));
self.backend_protocol.retain(|k, _| known_backends.contains(k));
self.backend_connect_errors
.retain(|k, _| known_backends.contains(k));
self.backend_handshake_errors
.retain(|k, _| known_backends.contains(k));
self.backend_request_errors
.retain(|k, _| known_backends.contains(k));
self.backend_connect_time_us
.retain(|k, _| known_backends.contains(k));
self.backend_connect_count
.retain(|k, _| known_backends.contains(k));
self.backend_pool_hits
.retain(|k, _| known_backends.contains(k));
self.backend_pool_misses
.retain(|k, _| known_backends.contains(k));
self.backend_h2_failures
.retain(|k, _| known_backends.contains(k));
self.backend_protocol
.retain(|k, _| known_backends.contains(k));
}
/// Remove per-route metrics for route IDs that are no longer active.
/// Call this after `update_routes()` to prune stale entries.
pub fn retain_routes(&self, active_route_ids: &HashSet<String>) {
self.route_connections.retain(|k, _| active_route_ids.contains(k));
self.route_total_connections.retain(|k, _| active_route_ids.contains(k));
self.route_bytes_in.retain(|k, _| active_route_ids.contains(k));
self.route_bytes_out.retain(|k, _| active_route_ids.contains(k));
self.route_pending_tp.retain(|k, _| active_route_ids.contains(k));
self.route_throughput.retain(|k, _| active_route_ids.contains(k));
self.route_connections
.retain(|k, _| active_route_ids.contains(k));
self.route_total_connections
.retain(|k, _| active_route_ids.contains(k));
self.route_bytes_in
.retain(|k, _| active_route_ids.contains(k));
self.route_bytes_out
.retain(|k, _| active_route_ids.contains(k));
self.route_pending_tp
.retain(|k, _| active_route_ids.contains(k));
self.route_throughput
.retain(|k, _| active_route_ids.contains(k));
}
/// Get current active connection count.
@@ -859,72 +976,97 @@ impl MetricsCollector {
for entry in self.route_total_connections.iter() {
let route_id = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
let active = self.route_connections
let active = self
.route_connections
.get(&route_id)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_in = self.route_bytes_in
let bytes_in = self
.route_bytes_in
.get(&route_id)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_out = self.route_bytes_out
let bytes_out = self
.route_bytes_out
.get(&route_id)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let (route_tp_in, route_tp_out, route_recent_in, route_recent_out) = self.route_throughput
let (route_tp_in, route_tp_out, route_recent_in, route_recent_out) = self
.route_throughput
.get(&route_id)
.and_then(|entry| entry.value().lock().ok().map(|t| {
let (i_in, i_out) = t.instant();
let (r_in, r_out) = t.recent();
(i_in, i_out, r_in, r_out)
}))
.and_then(|entry| {
entry.value().lock().ok().map(|t| {
let (i_in, i_out) = t.instant();
let (r_in, r_out) = t.recent();
(i_in, i_out, r_in, r_out)
})
})
.unwrap_or((0, 0, 0, 0));
routes.insert(route_id, RouteMetrics {
active_connections: active,
total_connections: total,
bytes_in,
bytes_out,
throughput_in_bytes_per_sec: route_tp_in,
throughput_out_bytes_per_sec: route_tp_out,
throughput_recent_in_bytes_per_sec: route_recent_in,
throughput_recent_out_bytes_per_sec: route_recent_out,
});
routes.insert(
route_id,
RouteMetrics {
active_connections: active,
total_connections: total,
bytes_in,
bytes_out,
throughput_in_bytes_per_sec: route_tp_in,
throughput_out_bytes_per_sec: route_tp_out,
throughput_recent_in_bytes_per_sec: route_recent_in,
throughput_recent_out_bytes_per_sec: route_recent_out,
},
);
}
// Collect per-IP metrics — only IPs with active connections or total > 0,
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> = Vec::new();
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> =
Vec::new();
for entry in self.ip_total_connections.iter() {
let ip = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
let active = self.ip_connections
let active = self
.ip_connections
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_in = self.ip_bytes_in
let bytes_in = self
.ip_bytes_in
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let bytes_out = self.ip_bytes_out
let bytes_out = self
.ip_bytes_out
.get(&ip)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let (tp_in, tp_out) = self.ip_throughput
let (tp_in, tp_out) = self
.ip_throughput
.get(&ip)
.and_then(|entry| entry.value().lock().ok().map(|t| t.instant()))
.unwrap_or((0, 0));
// Collect per-domain request counts for this IP
let domain_requests = self.ip_domain_requests
let domain_requests = self
.ip_domain_requests
.get(&ip)
.map(|domains| {
domains.iter()
domains
.iter()
.map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
.collect()
})
.unwrap_or_default();
ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests));
ip_entries.push((
ip,
active,
total,
bytes_in,
bytes_out,
tp_in,
tp_out,
domain_requests,
));
}
// Sort by active connections descending, then cap
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
@@ -932,15 +1074,18 @@ impl MetricsCollector {
let mut ips = std::collections::HashMap::new();
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries {
ips.insert(ip, IpMetrics {
active_connections: active,
total_connections: total,
bytes_in,
bytes_out,
throughput_in_bytes_per_sec: tp_in,
throughput_out_bytes_per_sec: tp_out,
domain_requests,
});
ips.insert(
ip,
IpMetrics {
active_connections: active,
total_connections: total,
bytes_in,
bytes_out,
throughput_in_bytes_per_sec: tp_in,
throughput_out_bytes_per_sec: tp_out,
domain_requests,
},
);
}
// Collect per-backend metrics, capped at top MAX_BACKENDS_IN_SNAPSHOT by total connections
@@ -948,69 +1093,84 @@ impl MetricsCollector {
for entry in self.backend_total.iter() {
let key = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
let active = self.backend_active
let active = self
.backend_active
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let protocol = self.backend_protocol
let protocol = self
.backend_protocol
.get(&key)
.map(|v| v.value().clone())
.unwrap_or_else(|| "unknown".to_string());
let connect_errors = self.backend_connect_errors
let connect_errors = self
.backend_connect_errors
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let handshake_errors = self.backend_handshake_errors
let handshake_errors = self
.backend_handshake_errors
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let request_errors = self.backend_request_errors
let request_errors = self
.backend_request_errors
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let total_connect_time_us = self.backend_connect_time_us
let total_connect_time_us = self
.backend_connect_time_us
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let connect_count = self.backend_connect_count
let connect_count = self
.backend_connect_count
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let pool_hits = self.backend_pool_hits
let pool_hits = self
.backend_pool_hits
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let pool_misses = self.backend_pool_misses
let pool_misses = self
.backend_pool_misses
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
let h2_failures = self.backend_h2_failures
let h2_failures = self
.backend_h2_failures
.get(&key)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0);
backend_entries.push((key, BackendMetrics {
active_connections: active,
total_connections: total,
protocol,
connect_errors,
handshake_errors,
request_errors,
total_connect_time_us,
connect_count,
pool_hits,
pool_misses,
h2_failures,
}));
backend_entries.push((
key,
BackendMetrics {
active_connections: active,
total_connections: total,
protocol,
connect_errors,
handshake_errors,
request_errors,
total_connect_time_us,
connect_count,
pool_hits,
pool_misses,
h2_failures,
},
));
}
// Sort by total connections descending, then cap
backend_entries.sort_by(|a, b| b.1.total_connections.cmp(&a.1.total_connections));
backend_entries.truncate(MAX_BACKENDS_IN_SNAPSHOT);
let backends: std::collections::HashMap<String, BackendMetrics> = backend_entries.into_iter().collect();
let backends: std::collections::HashMap<String, BackendMetrics> =
backend_entries.into_iter().collect();
// HTTP request rates
let (http_rps, http_rps_recent) = self.http_request_throughput
let (http_rps, http_rps_recent) = self
.http_request_throughput
.lock()
.map(|t| {
let (instant, _) = t.instant();
@@ -1185,11 +1345,19 @@ mod tests {
// Check IP active connections (drop DashMap refs immediately to avoid deadlock)
assert_eq!(
collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed),
collector
.ip_connections
.get("1.2.3.4")
.unwrap()
.load(Ordering::Relaxed),
2
);
assert_eq!(
collector.ip_connections.get("5.6.7.8").unwrap().load(Ordering::Relaxed),
collector
.ip_connections
.get("5.6.7.8")
.unwrap()
.load(Ordering::Relaxed),
1
);
@@ -1207,7 +1375,11 @@ mod tests {
// Close connections
collector.connection_closed(Some("route-a"), Some("1.2.3.4"));
assert_eq!(
collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed),
collector
.ip_connections
.get("1.2.3.4")
.unwrap()
.load(Ordering::Relaxed),
1
);
@@ -1252,6 +1424,79 @@ mod tests {
assert!(collector.ip_total_connections.get("10.0.0.2").is_some());
}
#[test]
fn test_connection_closed_saturates_active_gauges() {
let collector = MetricsCollector::new();
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
assert_eq!(collector.active_connections(), 0);
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
assert_eq!(collector.active_connections(), 0);
assert_eq!(
collector
.route_connections
.get("route-a")
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0),
0
);
assert!(collector.ip_connections.get("10.0.0.1").is_none());
}
#[test]
fn test_udp_session_closed_saturates() {
let collector = MetricsCollector::new();
collector.udp_session_closed();
assert_eq!(collector.snapshot().active_udp_sessions, 0);
collector.udp_session_opened();
collector.udp_session_closed();
collector.udp_session_closed();
assert_eq!(collector.snapshot().active_udp_sessions, 0);
}
#[test]
fn test_ip_domain_requests_are_canonicalized() {
let collector = MetricsCollector::new();
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_ip_domain_request("10.0.0.1", "Example.COM");
collector.record_ip_domain_request("10.0.0.1", "example.com.");
collector.record_ip_domain_request("10.0.0.1", " example.com ");
let snapshot = collector.snapshot();
let ip_metrics = snapshot.ips.get("10.0.0.1").unwrap();
assert_eq!(ip_metrics.domain_requests.len(), 1);
assert_eq!(ip_metrics.domain_requests.get("example.com"), Some(&3));
}
#[test]
fn test_protocol_metrics_appear_in_snapshot() {
let collector = MetricsCollector::new();
collector.frontend_protocol_opened("h2");
collector.frontend_protocol_opened("ws");
collector.backend_protocol_opened("h3");
collector.backend_protocol_opened("ws");
collector.frontend_protocol_closed("h2");
collector.backend_protocol_closed("h3");
let snapshot = collector.snapshot();
assert_eq!(snapshot.frontend_protocols.h2_active, 0);
assert_eq!(snapshot.frontend_protocols.h2_total, 1);
assert_eq!(snapshot.frontend_protocols.ws_active, 1);
assert_eq!(snapshot.frontend_protocols.ws_total, 1);
assert_eq!(snapshot.backend_protocols.h3_active, 0);
assert_eq!(snapshot.backend_protocols.h3_total, 1);
assert_eq!(snapshot.backend_protocols.ws_active, 1);
assert_eq!(snapshot.backend_protocols.ws_total, 1);
}
#[test]
fn test_http_request_tracking() {
let collector = MetricsCollector::with_retention(60);
@@ -1326,9 +1571,16 @@ mod tests {
let collector = MetricsCollector::with_retention(60);
// Manually insert orphaned entries (simulates the race before the guard)
collector.ip_bytes_in.insert("orphan-ip".to_string(), AtomicU64::new(100));
collector.ip_bytes_out.insert("orphan-ip".to_string(), AtomicU64::new(200));
collector.ip_pending_tp.insert("orphan-ip".to_string(), (AtomicU64::new(0), AtomicU64::new(0)));
collector
.ip_bytes_in
.insert("orphan-ip".to_string(), AtomicU64::new(100));
collector
.ip_bytes_out
.insert("orphan-ip".to_string(), AtomicU64::new(200));
collector.ip_pending_tp.insert(
"orphan-ip".to_string(),
(AtomicU64::new(0), AtomicU64::new(0)),
);
// No ip_connections entry for "orphan-ip"
assert!(collector.ip_connections.get("orphan-ip").is_none());
@@ -1366,17 +1618,59 @@ mod tests {
collector.backend_connection_opened(key, Duration::from_millis(15));
collector.backend_connection_opened(key, Duration::from_millis(25));
assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(collector.backend_connect_count.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(
collector
.backend_active
.get(key)
.unwrap()
.load(Ordering::Relaxed),
2
);
assert_eq!(
collector
.backend_total
.get(key)
.unwrap()
.load(Ordering::Relaxed),
2
);
assert_eq!(
collector
.backend_connect_count
.get(key)
.unwrap()
.load(Ordering::Relaxed),
2
);
// 15ms + 25ms = 40ms = 40_000us
assert_eq!(collector.backend_connect_time_us.get(key).unwrap().load(Ordering::Relaxed), 40_000);
assert_eq!(
collector
.backend_connect_time_us
.get(key)
.unwrap()
.load(Ordering::Relaxed),
40_000
);
// Close one
collector.backend_connection_closed(key);
assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(
collector
.backend_active
.get(key)
.unwrap()
.load(Ordering::Relaxed),
1
);
// total stays
assert_eq!(collector.backend_total.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(
collector
.backend_total
.get(key)
.unwrap()
.load(Ordering::Relaxed),
2
);
// Record errors
collector.backend_connect_error(key);
@@ -1387,12 +1681,54 @@ mod tests {
collector.backend_pool_hit(key);
collector.backend_pool_miss(key);
assert_eq!(collector.backend_connect_errors.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_handshake_errors.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_request_errors.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_h2_failures.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(collector.backend_pool_hits.get(key).unwrap().load(Ordering::Relaxed), 2);
assert_eq!(collector.backend_pool_misses.get(key).unwrap().load(Ordering::Relaxed), 1);
assert_eq!(
collector
.backend_connect_errors
.get(key)
.unwrap()
.load(Ordering::Relaxed),
1
);
assert_eq!(
collector
.backend_handshake_errors
.get(key)
.unwrap()
.load(Ordering::Relaxed),
1
);
assert_eq!(
collector
.backend_request_errors
.get(key)
.unwrap()
.load(Ordering::Relaxed),
1
);
assert_eq!(
collector
.backend_h2_failures
.get(key)
.unwrap()
.load(Ordering::Relaxed),
1
);
assert_eq!(
collector
.backend_pool_hits
.get(key)
.unwrap()
.load(Ordering::Relaxed),
2
);
assert_eq!(
collector
.backend_pool_misses
.get(key)
.unwrap()
.load(Ordering::Relaxed),
1
);
// Protocol
collector.set_backend_protocol(key, "h1");
@@ -1449,7 +1785,10 @@ mod tests {
assert!(collector.backend_total.get("stale:8080").is_none());
assert!(collector.backend_protocol.get("stale:8080").is_none());
assert!(collector.backend_connect_errors.get("stale:8080").is_none());
assert!(collector.backend_connect_time_us.get("stale:8080").is_none());
assert!(collector
.backend_connect_time_us
.get("stale:8080")
.is_none());
assert!(collector.backend_connect_count.get("stale:8080").is_none());
assert!(collector.backend_pool_hits.get("stale:8080").is_none());
assert!(collector.backend_pool_misses.get("stale:8080").is_none());