feat(metrics): add per-domain HTTP request rate metrics

This commit is contained in:
2026-04-14 12:43:59 +00:00
parent 490a310b54
commit 46155ab12c
10 changed files with 485 additions and 9 deletions
+108 -3
View File
@@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use crate::throughput::{ThroughputSample, ThroughputTracker};
use crate::throughput::{RequestRateTracker, ThroughputSample, ThroughputTracker};
/// Aggregated metrics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -26,6 +26,7 @@ pub struct Metrics {
pub total_http_requests: u64,
pub http_requests_per_sec: u64,
pub http_requests_per_sec_recent: u64,
pub http_domain_requests: std::collections::HashMap<String, HttpDomainRequestMetrics>,
// UDP metrics
pub active_udp_sessions: u64,
pub total_udp_sessions: u64,
@@ -66,6 +67,14 @@ pub struct IpMetrics {
pub domain_requests: HashMap<String, u64>,
}
/// Per-domain HTTP request rate metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpDomainRequestMetrics {
pub requests_per_second: u64,
pub requests_last_minute: u64,
}
/// Per-backend metrics (keyed by "host:port").
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -144,6 +153,9 @@ const MAX_BACKENDS_IN_SNAPSHOT: usize = 100;
/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse).
const MAX_DOMAINS_PER_IP: usize = 256;
/// Number of one-second HTTP request samples retained per domain.
const HTTP_DOMAIN_REQUEST_WINDOW_SECONDS: usize = 60;
fn canonicalize_domain_key(domain: &str) -> Option<String> {
let normalized = domain.trim().trim_end_matches('.').to_ascii_lowercase();
if normalized.is_empty() {
@@ -201,6 +213,7 @@ pub struct MetricsCollector {
total_http_requests: AtomicU64,
pending_http_requests: AtomicU64,
http_request_throughput: Mutex<ThroughputTracker>,
http_domain_request_rates: DashMap<String, Mutex<RequestRateTracker>>,
// ── UDP metrics ──
active_udp_sessions: AtomicU64,
@@ -284,6 +297,7 @@ impl MetricsCollector {
total_http_requests: AtomicU64::new(0),
pending_http_requests: AtomicU64::new(0),
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
http_domain_request_rates: DashMap::new(),
frontend_h1_active: AtomicU64::new(0),
frontend_h1_total: AtomicU64::new(0),
frontend_h2_active: AtomicU64::new(0),
@@ -522,6 +536,24 @@ impl MetricsCollector {
self.pending_http_requests.fetch_add(1, Ordering::Relaxed);
}
/// Record a real HTTP request for a canonicalized domain.
pub fn record_http_domain_request(&self, domain: &str) {
let Some(domain) = canonicalize_domain_key(domain) else {
return;
};
self.http_domain_request_rates
.entry(domain.clone())
.or_insert_with(|| {
Mutex::new(RequestRateTracker::new(HTTP_DOMAIN_REQUEST_WINDOW_SECONDS))
});
if let Some(tracker_ref) = self.http_domain_request_rates.get(domain.as_str()) {
if let Ok(mut tracker) = tracker_ref.value().lock() {
tracker.record_event();
}
}
}
/// Record a domain request/connection for a frontend IP.
///
/// Called per HTTP request (with Host header) and per TCP passthrough
@@ -791,8 +823,7 @@ impl MetricsCollector {
/// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval).
///
/// Drains the lock-free pending counters and feeds the accumulated bytes
/// into the throughput trackers (under Mutex). This is the only place
/// the Mutex is locked.
/// into the throughput trackers under their sampling mutexes.
pub fn sample_all(&self) {
// Drain global pending bytes and feed into the tracker
let global_in = self.global_pending_tp_in.swap(0, Ordering::Relaxed);
@@ -873,6 +904,20 @@ impl MetricsCollector {
tracker.sample();
}
// Advance HTTP domain request windows and prune fully idle domains.
let mut stale_http_domains = Vec::new();
for entry in self.http_domain_request_rates.iter() {
if let Ok(mut tracker) = entry.value().lock() {
tracker.advance_to_now();
if tracker.is_idle() {
stale_http_domains.push(entry.key().clone());
}
}
}
for domain in stale_http_domains {
self.http_domain_request_rates.remove(&domain);
}
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
@@ -1179,6 +1224,24 @@ impl MetricsCollector {
})
.unwrap_or((0, 0));
let mut http_domain_requests = std::collections::HashMap::new();
for entry in self.http_domain_request_rates.iter() {
if let Ok(mut tracker) = entry.value().lock() {
tracker.advance_to_now();
let requests_per_second = tracker.last_second();
let requests_last_minute = tracker.last_minute();
if requests_per_second > 0 || requests_last_minute > 0 {
http_domain_requests.insert(
entry.key().clone(),
HttpDomainRequestMetrics {
requests_per_second,
requests_last_minute,
},
);
}
}
}
Metrics {
active_connections: self.active_connections(),
total_connections: self.total_connections(),
@@ -1195,6 +1258,7 @@ impl MetricsCollector {
total_http_requests: self.total_http_requests.load(Ordering::Relaxed),
http_requests_per_sec: http_rps,
http_requests_per_sec_recent: http_rps_recent,
http_domain_requests,
active_udp_sessions: self.active_udp_sessions.load(Ordering::Relaxed),
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
@@ -1514,6 +1578,47 @@ mod tests {
assert_eq!(snapshot.http_requests_per_sec, 3);
}
#[test]
fn test_http_domain_request_rates_are_canonicalized() {
let collector = MetricsCollector::with_retention(60);
collector.record_http_domain_request("Example.COM");
collector.record_http_domain_request("example.com.");
collector.record_http_domain_request(" example.com ");
let now_sec = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if let Some(tracker) = collector.http_domain_request_rates.get("example.com") {
tracker.value().lock().unwrap().advance_to(now_sec + 1);
}
let snapshot = collector.snapshot();
let metrics = snapshot.http_domain_requests.get("example.com").unwrap();
assert_eq!(snapshot.http_domain_requests.len(), 1);
assert_eq!(metrics.requests_per_second, 3);
assert_eq!(metrics.requests_last_minute, 3);
}
#[test]
fn test_ip_domain_requests_do_not_affect_http_domain_request_rates() {
let collector = MetricsCollector::with_retention(60);
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_ip_domain_request("10.0.0.1", "example.com");
let snapshot = collector.snapshot();
assert!(snapshot.http_domain_requests.is_empty());
assert_eq!(
snapshot
.ips
.get("10.0.0.1")
.and_then(|ip| ip.domain_requests.get("example.com")),
Some(&1)
);
}
#[test]
fn test_retain_routes_prunes_stale() {
let collector = MetricsCollector::with_retention(60);
+146 -1
View File
@@ -29,6 +29,113 @@ pub struct ThroughputTracker {
created_at: Instant,
}
fn unix_timestamp_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
/// Circular buffer for per-second event counts.
///
/// Unlike `ThroughputTracker`, events are recorded directly into the current
/// second so request counts remain stable even when the collector is sampled
/// more frequently than once per second.
pub(crate) struct RequestRateTracker {
samples: Vec<u64>,
write_index: usize,
count: usize,
capacity: usize,
current_second: Option<u64>,
current_count: u64,
}
impl RequestRateTracker {
pub(crate) fn new(retention_seconds: usize) -> Self {
Self {
samples: Vec::with_capacity(retention_seconds.max(1)),
write_index: 0,
count: 0,
capacity: retention_seconds.max(1),
current_second: None,
current_count: 0,
}
}
fn push_sample(&mut self, count: u64) {
if self.samples.len() < self.capacity {
self.samples.push(count);
} else {
self.samples[self.write_index] = count;
}
self.write_index = (self.write_index + 1) % self.capacity;
self.count = (self.count + 1).min(self.capacity);
}
pub(crate) fn record_event(&mut self) {
self.record_events_at(unix_timestamp_seconds(), 1);
}
pub(crate) fn record_events_at(&mut self, now_sec: u64, count: u64) {
self.advance_to(now_sec);
self.current_count = self.current_count.saturating_add(count);
}
pub(crate) fn advance_to_now(&mut self) {
self.advance_to(unix_timestamp_seconds());
}
pub(crate) fn advance_to(&mut self, now_sec: u64) {
match self.current_second {
Some(current_second) if now_sec > current_second => {
self.push_sample(self.current_count);
for _ in 1..(now_sec - current_second) {
self.push_sample(0);
}
self.current_second = Some(now_sec);
self.current_count = 0;
}
Some(_) => {}
None => {
self.current_second = Some(now_sec);
self.current_count = 0;
}
}
}
fn sum_recent(&self, window_seconds: usize) -> u64 {
let window = window_seconds.min(self.count);
if window == 0 {
return 0;
}
let mut total = 0u64;
for i in 0..window {
let idx = if self.write_index >= i + 1 {
self.write_index - i - 1
} else {
self.capacity - (i + 1 - self.write_index)
};
if idx < self.samples.len() {
total += self.samples[idx];
}
}
total
}
pub(crate) fn last_second(&self) -> u64 {
self.sum_recent(1)
}
pub(crate) fn last_minute(&self) -> u64 {
self.sum_recent(60)
}
pub(crate) fn is_idle(&self) -> bool {
self.current_count == 0 && self.sum_recent(self.capacity) == 0
}
}
impl ThroughputTracker {
/// Create a new tracker with the given capacity (seconds of retention).
pub fn new(retention_seconds: usize) -> Self {
@@ -46,7 +153,8 @@ impl ThroughputTracker {
/// Record bytes (called from data flow callbacks).
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64) {
self.pending_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.pending_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
self.pending_bytes_out
.fetch_add(bytes_out, Ordering::Relaxed);
}
/// Take a sample (called at 1Hz).
@@ -229,4 +337,41 @@ mod tests {
let history = tracker.history(10);
assert!(history.is_empty());
}
#[test]
fn test_request_rate_tracker_counts_last_second_and_last_minute() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 2);
tracker.record_events_at(100, 3);
tracker.advance_to(101);
assert_eq!(tracker.last_second(), 5);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_adds_zero_samples_for_gaps() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 4);
tracker.record_events_at(102, 1);
tracker.advance_to(103);
assert_eq!(tracker.last_second(), 1);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_decays_to_zero_over_window() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 7);
tracker.advance_to(101);
tracker.advance_to(161);
assert_eq!(tracker.last_second(), 0);
assert_eq!(tracker.last_minute(), 0);
assert!(tracker.is_idle());
}
}