feat(metrics): track per-IP domain request metrics across HTTP and TCP passthrough traffic

This commit is contained in:
2026-04-13 18:33:28 +00:00
parent e988d935b6
commit 781634446a
8 changed files with 593 additions and 5 deletions
@@ -626,6 +626,9 @@ impl HttpProxyService {
let route_id = route_match.route.id.as_deref();
let ip_str = ip_string; // reuse from above (avoid redundant to_string())
self.metrics.record_http_request();
if let Some(ref h) = host {
self.metrics.record_ip_domain_request(&ip_str, h);
}
// Apply request filters (IP check, rate limiting, auth)
if let Some(ref security) = route_match.route.security {
+57 -4
View File
@@ -1,6 +1,6 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
@@ -62,6 +62,8 @@ pub struct IpMetrics {
pub bytes_out: u64,
pub throughput_in_bytes_per_sec: u64,
pub throughput_out_bytes_per_sec: u64,
/// Per-domain request/connection counts for this IP.
pub domain_requests: HashMap<String, u64>,
}
/// Per-backend metrics (keyed by "host:port").
@@ -139,6 +141,9 @@ const MAX_IPS_IN_SNAPSHOT: usize = 100;
/// Maximum number of backends to include in a snapshot (top by total connections).
const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse).
const MAX_DOMAINS_PER_IP: usize = 256;
/// Metrics collector tracking connections and throughput.
///
/// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches
@@ -165,6 +170,10 @@ pub struct MetricsCollector {
ip_bytes_out: DashMap<String, AtomicU64>,
ip_pending_tp: DashMap<String, (AtomicU64, AtomicU64)>,
ip_throughput: DashMap<String, Mutex<ThroughputTracker>>,
/// Per-IP domain request counts: IP → { domain → count }.
/// Tracks which domains each frontend IP has requested (via HTTP Host/SNI).
/// Inner DashMap uses 2 shards to minimise base memory per IP.
ip_domain_requests: DashMap<String, DashMap<String, AtomicU64>>,
// ── Per-backend tracking (keyed by "host:port") ──
backend_active: DashMap<String, AtomicU64>,
@@ -247,6 +256,7 @@ impl MetricsCollector {
ip_bytes_out: DashMap::new(),
ip_pending_tp: DashMap::new(),
ip_throughput: DashMap::new(),
ip_domain_requests: DashMap::new(),
backend_active: DashMap::new(),
backend_total: DashMap::new(),
backend_protocol: DashMap::new(),
@@ -351,6 +361,7 @@ impl MetricsCollector {
self.ip_bytes_out.remove(ip);
self.ip_pending_tp.remove(ip);
self.ip_throughput.remove(ip);
self.ip_domain_requests.remove(ip);
}
}
}
@@ -452,6 +463,37 @@ impl MetricsCollector {
self.pending_http_requests.fetch_add(1, Ordering::Relaxed);
}
/// Record a domain request/connection for a frontend IP.
///
/// Called per HTTP request (with Host header) and per TCP passthrough
/// connection (with SNI domain). The common case (IP + domain both already
/// tracked) is two DashMap reads + one atomic increment — zero allocation.
pub fn record_ip_domain_request(&self, ip: &str, domain: &str) {
// Fast path: IP already tracked, domain already tracked
if let Some(domains) = self.ip_domain_requests.get(ip) {
if let Some(counter) = domains.get(domain) {
counter.fetch_add(1, Ordering::Relaxed);
return;
}
// New domain for this IP — enforce cap
if domains.len() >= MAX_DOMAINS_PER_IP {
return;
}
domains
.entry(domain.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
return;
}
// New IP — only create if the IP has active connections
if !self.ip_connections.contains_key(ip) {
return;
}
let inner = DashMap::with_capacity_and_shard_amount(4, 2);
inner.insert(domain.to_string(), AtomicU64::new(1));
self.ip_domain_requests.insert(ip.to_string(), inner);
}
// ── UDP session recording methods ──
/// Record a new UDP session opened.
@@ -745,6 +787,7 @@ impl MetricsCollector {
self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_domain_requests.retain(|k, _| self.ip_connections.contains_key(k));
// Safety-net: prune orphaned backend error/stats entries for backends
// that have no active or total connections (error-only backends).
@@ -852,7 +895,7 @@ impl MetricsCollector {
// Collect per-IP metrics — only IPs with active connections or total > 0,
// capped at top MAX_IPS_IN_SNAPSHOT sorted by active count
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64)> = Vec::new();
let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap<String, u64>)> = Vec::new();
for entry in self.ip_total_connections.iter() {
let ip = entry.key().clone();
let total = entry.value().load(Ordering::Relaxed);
@@ -872,14 +915,23 @@ impl MetricsCollector {
.get(&ip)
.and_then(|entry| entry.value().lock().ok().map(|t| t.instant()))
.unwrap_or((0, 0));
ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out));
// Collect per-domain request counts for this IP
let domain_requests = self.ip_domain_requests
.get(&ip)
.map(|domains| {
domains.iter()
.map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
.collect()
})
.unwrap_or_default();
ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests));
}
// Sort by active connections descending, then cap
ip_entries.sort_by(|a, b| b.1.cmp(&a.1));
ip_entries.truncate(MAX_IPS_IN_SNAPSHOT);
let mut ips = std::collections::HashMap::new();
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out) in ip_entries {
for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries {
ips.insert(ip, IpMetrics {
active_connections: active,
total_connections: total,
@@ -887,6 +939,7 @@ impl MetricsCollector {
bytes_out,
throughput_in_bytes_per_sec: tp_in,
throughput_out_bytes_per_sec: tp_out,
domain_requests,
});
}
@@ -943,6 +943,9 @@ impl TcpListenerManager {
// Track connection in metrics — guard ensures connection_closed on all exit paths
metrics.connection_opened(route_id, Some(&ip_str));
if let Some(d) = effective_domain {
metrics.record_ip_domain_request(&ip_str, d);
}
let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id, Some(&ip_str));
// Check if this is a socket-handler route that should be relayed to TypeScript