diff --git a/changelog.md b/changelog.md index f5951ae..320af44 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # Changelog +## 2026-02-14 - 25.2.0 - feat(metrics) +add per-IP and HTTP-request metrics, propagate source IP through proxy paths, and expose new metrics to the TS adapter + +- Add per-IP tracking and IpMetrics in MetricsCollector (active/total connections, bytes, throughput). +- Add HTTP request counters and tracking (total_http_requests, http_requests_per_sec, recent counters and tests). +- Include throughput history (ThroughputSample serialization, retention and snapshotting) and expose history in snapshots. +- Propagate source IP through HTTP and passthrough code paths: CountingBody.record_bytes and MetricsCollector methods now accept source_ip; connection_opened/closed calls include source IP. +- Introduce ForwardMetricsCtx to carry metrics context (collector, route_id, source_ip) into passthrough forwarding routines; update ConnectionGuard to include source_ip. +- TypeScript adapter (rust-metrics-adapter.ts) updated to return per-IP counts, top IPs, per-IP throughput, throughput history mapping, and HTTP request rates/total where available. +- Numerous unit tests added for per-IP tracking, HTTP request tracking, throughput history and ThroughputTracker.history behavior. + ## 2026-02-13 - 25.1.0 - feat(metrics) add real-time throughput sampling and byte-counting metrics diff --git a/rust/crates/rustproxy-http/src/counting_body.rs b/rust/crates/rustproxy-http/src/counting_body.rs index 364266c..5d3fb87 100644 --- a/rust/crates/rustproxy-http/src/counting_body.rs +++ b/rust/crates/rustproxy-http/src/counting_body.rs @@ -20,6 +20,7 @@ pub struct CountingBody { counted_bytes: AtomicU64, metrics: Arc, route_id: Option, + source_ip: Option, /// Whether we count bytes as "in" (request body) or "out" (response body). direction: Direction, /// Whether we've already reported the bytes (to avoid double-reporting on drop). @@ -41,6 +42,7 @@ impl CountingBody { inner: B, metrics: Arc, route_id: Option, + source_ip: Option, direction: Direction, ) -> Self { Self { @@ -48,6 +50,7 @@ impl CountingBody { counted_bytes: AtomicU64::new(0), metrics, route_id, + source_ip, direction, reported: false, } @@ -66,9 +69,10 @@ impl CountingBody { } let route_id = self.route_id.as_deref(); + let source_ip = self.source_ip.as_deref(); match self.direction { - Direction::In => self.metrics.record_bytes(bytes, 0, route_id), - Direction::Out => self.metrics.record_bytes(0, bytes, route_id), + Direction::In => self.metrics.record_bytes(bytes, 0, route_id, source_ip), + Direction::Out => self.metrics.record_bytes(0, bytes, route_id, source_ip), } } } diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 7244464..3080d0f 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -184,12 +184,14 @@ impl HttpProxyService { }; let route_id = route_match.route.id.as_deref(); - self.metrics.connection_opened(route_id); + let ip_str = peer_addr.ip().to_string(); + self.metrics.record_http_request(); + self.metrics.connection_opened(route_id, Some(&ip_str)); // Apply request filters (IP check, rate limiting, auth) if let Some(ref security) = route_match.route.security { if let Some(response) = RequestFilter::apply(security, &req, &peer_addr) { - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(response); } } @@ -197,7 +199,7 @@ impl HttpProxyService { // Check for test response (returns immediately, no upstream needed) if let Some(ref advanced) = route_match.route.action.advanced { if let Some(ref test_response) = advanced.test_response { - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(Self::build_test_response(test_response)); } } @@ -205,7 +207,7 @@ impl HttpProxyService { // Check for static file serving if let Some(ref advanced) = route_match.route.action.advanced { if let Some(ref static_files) = advanced.static_files { - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(Self::serve_static_file(&path, static_files)); } } @@ -214,7 +216,7 @@ impl HttpProxyService { let target = match route_match.target { Some(t) => t, None => { - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::BAD_GATEWAY, "No target available")); } }; @@ -232,7 +234,7 @@ impl HttpProxyService { if is_websocket { let result = self.handle_websocket_upgrade( - req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, + req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, &ip_str, ).await; // Note: for WebSocket, connection_ended is called inside // the spawned tunnel task when the connection closes. @@ -280,13 +282,13 @@ impl HttpProxyService { Ok(Err(e)) => { error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(&upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } Err(_) => { error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port); self.upstream_selector.connection_ended(&upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(&ip_str)); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); } }; @@ -296,10 +298,10 @@ impl HttpProxyService { let result = if use_h2 { // HTTP/2 backend - self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id).await + self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str).await } else { // HTTP/1.1 backend (default) - self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id).await + self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str).await }; self.upstream_selector.connection_ended(&upstream_key); result @@ -316,12 +318,13 @@ impl HttpProxyService { upstream: &crate::upstream_selector::UpstreamSelection, route: &rustproxy_config::RouteConfig, route_id: Option<&str>, + source_ip: &str, ) -> Result>, hyper::Error> { let (mut sender, conn) = match hyper::client::conn::http1::handshake(io).await { Ok(h) => h, Err(e) => { error!("Upstream handshake failed: {}", e); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed")); } }; @@ -351,6 +354,7 @@ impl HttpProxyService { body, Arc::clone(&self.metrics), route_id.map(|s| s.to_string()), + Some(source_ip.to_string()), Direction::In, ); @@ -361,12 +365,12 @@ impl HttpProxyService { Ok(resp) => resp, Err(e) => { error!("Upstream request failed: {}", e); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed")); } }; - self.build_streaming_response(upstream_response, route, route_id).await + self.build_streaming_response(upstream_response, route, route_id, source_ip).await } /// Forward request to backend via HTTP/2 with body streaming. @@ -380,13 +384,14 @@ impl HttpProxyService { upstream: &crate::upstream_selector::UpstreamSelection, route: &rustproxy_config::RouteConfig, route_id: Option<&str>, + source_ip: &str, ) -> Result>, hyper::Error> { let exec = hyper_util::rt::TokioExecutor::new(); let (mut sender, conn) = match hyper::client::conn::http2::handshake(exec, io).await { Ok(h) => h, Err(e) => { error!("HTTP/2 upstream handshake failed: {}", e); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed")); } }; @@ -415,6 +420,7 @@ impl HttpProxyService { body, Arc::clone(&self.metrics), route_id.map(|s| s.to_string()), + Some(source_ip.to_string()), Direction::In, ); @@ -425,12 +431,12 @@ impl HttpProxyService { Ok(resp) => resp, Err(e) => { error!("HTTP/2 upstream request failed: {}", e); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed")); } }; - self.build_streaming_response(upstream_response, route, route_id).await + self.build_streaming_response(upstream_response, route, route_id, source_ip).await } /// Build the client-facing response from an upstream response, streaming the body. @@ -443,6 +449,7 @@ impl HttpProxyService { upstream_response: Response, route: &rustproxy_config::RouteConfig, route_id: Option<&str>, + source_ip: &str, ) -> Result>, hyper::Error> { let (resp_parts, resp_body) = upstream_response.into_parts(); @@ -461,13 +468,14 @@ impl HttpProxyService { resp_body, Arc::clone(&self.metrics), route_id.map(|s| s.to_string()), + Some(source_ip.to_string()), Direction::Out, ); // Close the connection metric now — the HTTP request/response cycle is done // from the proxy's perspective once we hand the streaming body to hyper. // Bytes will still be counted as they flow. - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); let body: BoxBody = BoxBody::new(counting_body); @@ -484,6 +492,7 @@ impl HttpProxyService { route_id: Option<&str>, upstream_key: &str, cancel: CancellationToken, + source_ip: &str, ) -> Result>, hyper::Error> { use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -499,7 +508,7 @@ impl HttpProxyService { .unwrap_or(""); if !allowed_origins.is_empty() && !allowed_origins.iter().any(|o| o == "*" || o == origin) { self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::FORBIDDEN, "Origin not allowed")); } } @@ -516,13 +525,13 @@ impl HttpProxyService { Ok(Err(e)) => { error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } Err(_) => { error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); } }; @@ -584,7 +593,7 @@ impl HttpProxyService { if let Err(e) = upstream_stream.write_all(raw_request.as_bytes()).await { error!("WebSocket: failed to send upgrade request to upstream: {}", e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend write failed")); } @@ -595,7 +604,7 @@ impl HttpProxyService { Ok(0) => { error!("WebSocket: upstream closed before completing handshake"); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed")); } Ok(_) => { @@ -609,14 +618,14 @@ impl HttpProxyService { if response_buf.len() > 8192 { error!("WebSocket: upstream response headers too large"); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend response too large")); } } Err(e) => { error!("WebSocket: failed to read upstream response: {}", e); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend read failed")); } } @@ -634,7 +643,7 @@ impl HttpProxyService { if status_code != 101 { debug!("WebSocket: upstream rejected upgrade with status {}", status_code); self.upstream_selector.connection_ended(upstream_key); - self.metrics.connection_closed(route_id); + self.metrics.connection_closed(route_id, Some(source_ip)); return Ok(error_response( StatusCode::from_u16(status_code).unwrap_or(StatusCode::BAD_GATEWAY), "WebSocket upgrade rejected by backend", @@ -668,6 +677,7 @@ impl HttpProxyService { let metrics = Arc::clone(&self.metrics); let route_id_owned = route_id.map(|s| s.to_string()); + let source_ip_owned = source_ip.to_string(); let upstream_selector = self.upstream_selector.clone(); let upstream_key_owned = upstream_key.to_string(); @@ -678,7 +688,7 @@ impl HttpProxyService { debug!("WebSocket: client upgrade failed: {}", e); upstream_selector.connection_ended(&upstream_key_owned); if let Some(ref rid) = route_id_owned { - metrics.connection_closed(Some(rid.as_str())); + metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned)); } return; } @@ -783,8 +793,8 @@ impl HttpProxyService { upstream_selector.connection_ended(&upstream_key_owned); if let Some(ref rid) = route_id_owned { - metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str())); - metrics.connection_closed(Some(rid.as_str())); + metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str()), Some(&source_ip_owned)); + metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned)); } }); diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 2f3edfb..f7dc7d3 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; -use crate::throughput::ThroughputTracker; +use crate::throughput::{ThroughputSample, ThroughputTracker}; /// Aggregated metrics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -18,6 +18,11 @@ pub struct Metrics { pub throughput_recent_in_bytes_per_sec: u64, pub throughput_recent_out_bytes_per_sec: u64, pub routes: std::collections::HashMap, + pub ips: std::collections::HashMap, + pub throughput_history: Vec, + pub total_http_requests: u64, + pub http_requests_per_sec: u64, + pub http_requests_per_sec_recent: u64, } /// Per-route metrics. @@ -34,6 +39,18 @@ pub struct RouteMetrics { pub throughput_recent_out_bytes_per_sec: u64, } +/// Per-IP metrics. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct IpMetrics { + pub active_connections: u64, + pub total_connections: u64, + pub bytes_in: u64, + pub bytes_out: u64, + pub throughput_in_bytes_per_sec: u64, + pub throughput_out_bytes_per_sec: u64, +} + /// Statistics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -48,6 +65,9 @@ pub struct Statistics { /// Default retention for throughput samples (1 hour). const DEFAULT_RETENTION_SECONDS: usize = 3600; +/// Maximum number of IPs to include in a snapshot (top by active connections). +const MAX_IPS_IN_SNAPSHOT: usize = 100; + /// Metrics collector tracking connections and throughput. /// /// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches @@ -67,6 +87,19 @@ pub struct MetricsCollector { route_bytes_in: DashMap, route_bytes_out: DashMap, + // ── Per-IP tracking ── + ip_connections: DashMap, + ip_total_connections: DashMap, + ip_bytes_in: DashMap, + ip_bytes_out: DashMap, + ip_pending_tp: DashMap, + ip_throughput: DashMap>, + + // ── HTTP request tracking ── + total_http_requests: AtomicU64, + pending_http_requests: AtomicU64, + http_request_throughput: Mutex, + // ── Lock-free pending throughput counters (hot path) ── global_pending_tp_in: AtomicU64, global_pending_tp_out: AtomicU64, @@ -94,6 +127,15 @@ impl MetricsCollector { route_total_connections: DashMap::new(), route_bytes_in: DashMap::new(), route_bytes_out: DashMap::new(), + ip_connections: DashMap::new(), + ip_total_connections: DashMap::new(), + ip_bytes_in: DashMap::new(), + ip_bytes_out: DashMap::new(), + ip_pending_tp: DashMap::new(), + ip_throughput: DashMap::new(), + total_http_requests: AtomicU64::new(0), + pending_http_requests: AtomicU64::new(0), + http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)), global_pending_tp_in: AtomicU64::new(0), global_pending_tp_out: AtomicU64::new(0), route_pending_tp: DashMap::new(), @@ -104,7 +146,7 @@ impl MetricsCollector { } /// Record a new connection. - pub fn connection_opened(&self, route_id: Option<&str>) { + pub fn connection_opened(&self, route_id: Option<&str>, source_ip: Option<&str>) { self.active_connections.fetch_add(1, Ordering::Relaxed); self.total_connections.fetch_add(1, Ordering::Relaxed); @@ -118,10 +160,21 @@ impl MetricsCollector { .or_insert_with(|| AtomicU64::new(0)) .fetch_add(1, Ordering::Relaxed); } + + if let Some(ip) = source_ip { + self.ip_connections + .entry(ip.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + self.ip_total_connections + .entry(ip.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + } } /// Record a connection closing. - pub fn connection_closed(&self, route_id: Option<&str>) { + pub fn connection_closed(&self, route_id: Option<&str>, source_ip: Option<&str>) { self.active_connections.fetch_sub(1, Ordering::Relaxed); if let Some(route_id) = route_id { @@ -132,13 +185,27 @@ impl MetricsCollector { } } } + + if let Some(ip) = source_ip { + if let Some(counter) = self.ip_connections.get(ip) { + let val = counter.load(Ordering::Relaxed); + if val > 0 { + counter.fetch_sub(1, Ordering::Relaxed); + } + // Clean up zero-count entries to prevent memory growth + if val <= 1 { + drop(counter); + self.ip_connections.remove(ip); + } + } + } } /// Record bytes transferred (lock-free hot path). /// /// Called per-chunk in the TCP copy loop. Only touches AtomicU64 counters — /// no Mutex is taken. The throughput trackers are fed during `sample_all()`. - pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64, route_id: Option<&str>) { + pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64, route_id: Option<&str>, source_ip: Option<&str>) { self.total_bytes_in.fetch_add(bytes_in, Ordering::Relaxed); self.total_bytes_out.fetch_add(bytes_out, Ordering::Relaxed); @@ -163,6 +230,30 @@ impl MetricsCollector { entry.0.fetch_add(bytes_in, Ordering::Relaxed); entry.1.fetch_add(bytes_out, Ordering::Relaxed); } + + if let Some(ip) = source_ip { + self.ip_bytes_in + .entry(ip.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(bytes_in, Ordering::Relaxed); + self.ip_bytes_out + .entry(ip.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(bytes_out, Ordering::Relaxed); + + // Accumulate into per-IP pending throughput counters (lock-free) + let entry = self.ip_pending_tp + .entry(ip.to_string()) + .or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0))); + entry.0.fetch_add(bytes_in, Ordering::Relaxed); + entry.1.fetch_add(bytes_out, Ordering::Relaxed); + } + } + + /// Record an HTTP request (called once per request in the HTTP proxy). + pub fn record_http_request(&self) { + self.total_http_requests.fetch_add(1, Ordering::Relaxed); + self.pending_http_requests.fetch_add(1, Ordering::Relaxed); } /// Take a throughput sample on all trackers (cold path, call at 1Hz or configured interval). @@ -213,6 +304,42 @@ impl MetricsCollector { } } } + + // Drain per-IP pending bytes and feed into IP throughput trackers + let mut ip_samples: Vec<(String, u64, u64)> = Vec::new(); + for entry in self.ip_pending_tp.iter() { + let ip = entry.key().clone(); + let pending_in = entry.value().0.swap(0, Ordering::Relaxed); + let pending_out = entry.value().1.swap(0, Ordering::Relaxed); + ip_samples.push((ip, pending_in, pending_out)); + } + for (ip, pending_in, pending_out) in &ip_samples { + self.ip_throughput + .entry(ip.clone()) + .or_insert_with(|| Mutex::new(ThroughputTracker::new(retention))); + if let Some(tracker_ref) = self.ip_throughput.get(ip) { + if let Ok(mut tracker) = tracker_ref.value().lock() { + tracker.record_bytes(*pending_in, *pending_out); + tracker.sample(); + } + } + } + // Sample idle IP trackers + for entry in self.ip_throughput.iter() { + if !self.ip_pending_tp.contains_key(entry.key()) { + if let Ok(mut tracker) = entry.value().lock() { + tracker.sample(); + } + } + } + + // Drain pending HTTP request count and feed into HTTP throughput tracker + let pending_reqs = self.pending_http_requests.swap(0, Ordering::Relaxed); + if let Ok(mut tracker) = self.http_request_throughput.lock() { + // Use bytes_in field to track request count (each request = 1 "byte") + tracker.record_bytes(pending_reqs, 0); + tracker.sample(); + } } /// Get current active connection count. @@ -235,19 +362,21 @@ impl MetricsCollector { self.total_bytes_out.load(Ordering::Relaxed) } - /// Get a full metrics snapshot including per-route data. + /// Get a full metrics snapshot including per-route and per-IP data. pub fn snapshot(&self) -> Metrics { let mut routes = std::collections::HashMap::new(); // Get global throughput (instant = last 1 sample, recent = last 10 samples) - let (global_tp_in, global_tp_out, global_recent_in, global_recent_out) = self.global_throughput - .lock() - .map(|t| { - let (i_in, i_out) = t.instant(); - let (r_in, r_out) = t.recent(); - (i_in, i_out, r_in, r_out) - }) - .unwrap_or((0, 0, 0, 0)); + let (global_tp_in, global_tp_out, global_recent_in, global_recent_out, throughput_history) = + self.global_throughput + .lock() + .map(|t| { + let (i_in, i_out) = t.instant(); + let (r_in, r_out) = t.recent(); + let history = t.history(60); + (i_in, i_out, r_in, r_out, history) + }) + .unwrap_or((0, 0, 0, 0, Vec::new())); // Collect per-route metrics for entry in self.route_total_connections.iter() { @@ -287,6 +416,56 @@ impl MetricsCollector { }); } + // Collect per-IP metrics — only IPs with active connections or total > 0, + // capped at top MAX_IPS_IN_SNAPSHOT sorted by active count + let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64)> = Vec::new(); + for entry in self.ip_total_connections.iter() { + let ip = entry.key().clone(); + let total = entry.value().load(Ordering::Relaxed); + let active = self.ip_connections + .get(&ip) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let bytes_in = self.ip_bytes_in + .get(&ip) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let bytes_out = self.ip_bytes_out + .get(&ip) + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0); + let (tp_in, tp_out) = self.ip_throughput + .get(&ip) + .and_then(|entry| entry.value().lock().ok().map(|t| t.instant())) + .unwrap_or((0, 0)); + ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out)); + } + // Sort by active connections descending, then cap + ip_entries.sort_by(|a, b| b.1.cmp(&a.1)); + ip_entries.truncate(MAX_IPS_IN_SNAPSHOT); + + let mut ips = std::collections::HashMap::new(); + for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out) in ip_entries { + ips.insert(ip, IpMetrics { + active_connections: active, + total_connections: total, + bytes_in, + bytes_out, + throughput_in_bytes_per_sec: tp_in, + throughput_out_bytes_per_sec: tp_out, + }); + } + + // HTTP request rates + let (http_rps, http_rps_recent) = self.http_request_throughput + .lock() + .map(|t| { + let (instant, _) = t.instant(); + let (recent, _) = t.recent(); + (instant, recent) + }) + .unwrap_or((0, 0)); + Metrics { active_connections: self.active_connections(), total_connections: self.total_connections(), @@ -297,6 +476,11 @@ impl MetricsCollector { throughput_recent_in_bytes_per_sec: global_recent_in, throughput_recent_out_bytes_per_sec: global_recent_out, routes, + ips, + throughput_history, + total_http_requests: self.total_http_requests.load(Ordering::Relaxed), + http_requests_per_sec: http_rps, + http_requests_per_sec_recent: http_rps_recent, } } } @@ -321,10 +505,10 @@ mod tests { #[test] fn test_connection_opened_increments() { let collector = MetricsCollector::new(); - collector.connection_opened(None); + collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 1); assert_eq!(collector.total_connections(), 1); - collector.connection_opened(None); + collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 2); assert_eq!(collector.total_connections(), 2); } @@ -332,10 +516,10 @@ mod tests { #[test] fn test_connection_closed_decrements() { let collector = MetricsCollector::new(); - collector.connection_opened(None); - collector.connection_opened(None); + collector.connection_opened(None, None); + collector.connection_opened(None, None); assert_eq!(collector.active_connections(), 2); - collector.connection_closed(None); + collector.connection_closed(None, None); assert_eq!(collector.active_connections(), 1); // total_connections should stay at 2 assert_eq!(collector.total_connections(), 2); @@ -344,23 +528,23 @@ mod tests { #[test] fn test_route_specific_tracking() { let collector = MetricsCollector::new(); - collector.connection_opened(Some("route-a")); - collector.connection_opened(Some("route-a")); - collector.connection_opened(Some("route-b")); + collector.connection_opened(Some("route-a"), None); + collector.connection_opened(Some("route-a"), None); + collector.connection_opened(Some("route-b"), None); assert_eq!(collector.active_connections(), 3); assert_eq!(collector.total_connections(), 3); - collector.connection_closed(Some("route-a")); + collector.connection_closed(Some("route-a"), None); assert_eq!(collector.active_connections(), 2); } #[test] fn test_record_bytes() { let collector = MetricsCollector::new(); - collector.record_bytes(100, 200, Some("route-a")); - collector.record_bytes(50, 75, Some("route-a")); - collector.record_bytes(25, 30, None); + collector.record_bytes(100, 200, Some("route-a"), None); + collector.record_bytes(50, 75, Some("route-a"), None); + collector.record_bytes(25, 30, None, None); let total_in = collector.total_bytes_in.load(Ordering::Relaxed); let total_out = collector.total_bytes_out.load(Ordering::Relaxed); @@ -377,11 +561,11 @@ mod tests { let collector = MetricsCollector::with_retention(60); // Open a connection so the route appears in the snapshot - collector.connection_opened(Some("route-a")); + collector.connection_opened(Some("route-a"), None); // Record some bytes - collector.record_bytes(1000, 2000, Some("route-a")); - collector.record_bytes(500, 750, None); + collector.record_bytes(1000, 2000, Some("route-a"), None); + collector.record_bytes(500, 750, None, None); // Take a sample (simulates the 1Hz tick) collector.sample_all(); @@ -400,11 +584,85 @@ mod tests { #[test] fn test_throughput_zero_before_sampling() { let collector = MetricsCollector::with_retention(60); - collector.record_bytes(1000, 2000, None); + collector.record_bytes(1000, 2000, None, None); // Without sampling, throughput should be 0 let snapshot = collector.snapshot(); assert_eq!(snapshot.throughput_in_bytes_per_sec, 0); assert_eq!(snapshot.throughput_out_bytes_per_sec, 0); } + + #[test] + fn test_per_ip_tracking() { + let collector = MetricsCollector::with_retention(60); + + collector.connection_opened(Some("route-a"), Some("1.2.3.4")); + collector.connection_opened(Some("route-a"), Some("1.2.3.4")); + collector.connection_opened(Some("route-b"), Some("5.6.7.8")); + + // Check IP active connections (drop DashMap refs immediately to avoid deadlock) + assert_eq!( + collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed), + 2 + ); + assert_eq!( + collector.ip_connections.get("5.6.7.8").unwrap().load(Ordering::Relaxed), + 1 + ); + + // Record bytes per IP + collector.record_bytes(100, 200, Some("route-a"), Some("1.2.3.4")); + collector.record_bytes(300, 400, Some("route-b"), Some("5.6.7.8")); + collector.sample_all(); + + let snapshot = collector.snapshot(); + assert_eq!(snapshot.ips.len(), 2); + let ip1_metrics = snapshot.ips.get("1.2.3.4").unwrap(); + assert_eq!(ip1_metrics.active_connections, 2); + assert_eq!(ip1_metrics.bytes_in, 100); + + // Close connections + collector.connection_closed(Some("route-a"), Some("1.2.3.4")); + assert_eq!( + collector.ip_connections.get("1.2.3.4").unwrap().load(Ordering::Relaxed), + 1 + ); + + // Close last connection for IP — should be cleaned up + collector.connection_closed(Some("route-a"), Some("1.2.3.4")); + assert!(collector.ip_connections.get("1.2.3.4").is_none()); + } + + #[test] + fn test_http_request_tracking() { + let collector = MetricsCollector::with_retention(60); + + collector.record_http_request(); + collector.record_http_request(); + collector.record_http_request(); + + assert_eq!(collector.total_http_requests.load(Ordering::Relaxed), 3); + + collector.sample_all(); + + let snapshot = collector.snapshot(); + assert_eq!(snapshot.total_http_requests, 3); + assert_eq!(snapshot.http_requests_per_sec, 3); + } + + #[test] + fn test_throughput_history_in_snapshot() { + let collector = MetricsCollector::with_retention(60); + + for i in 1..=5 { + collector.record_bytes(i * 100, i * 200, None, None); + collector.sample_all(); + } + + let snapshot = collector.snapshot(); + assert_eq!(snapshot.throughput_history.len(), 5); + // History should be chronological (oldest first) + assert_eq!(snapshot.throughput_history[0].bytes_in, 100); + assert_eq!(snapshot.throughput_history[4].bytes_in, 500); + } } diff --git a/rust/crates/rustproxy-metrics/src/throughput.rs b/rust/crates/rustproxy-metrics/src/throughput.rs index e73833f..1fce59e 100644 --- a/rust/crates/rustproxy-metrics/src/throughput.rs +++ b/rust/crates/rustproxy-metrics/src/throughput.rs @@ -1,8 +1,10 @@ +use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; /// A single throughput sample. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct ThroughputSample { pub timestamp_ms: u64, pub bytes_in: u64, @@ -106,6 +108,27 @@ impl ThroughputTracker { self.throughput(10) } + /// Return the last N samples in chronological order (oldest first). + pub fn history(&self, window_seconds: usize) -> Vec { + let window = window_seconds.min(self.count); + if window == 0 { + return Vec::new(); + } + let mut result = Vec::with_capacity(window); + for i in 0..window { + let idx = if self.write_index >= i + 1 { + self.write_index - i - 1 + } else { + self.capacity - (i + 1 - self.write_index) + }; + if idx < self.samples.len() { + result.push(self.samples[idx]); + } + } + result.reverse(); // Return oldest-first (chronological) + result + } + /// How long this tracker has been alive. pub fn uptime(&self) -> std::time::Duration { self.created_at.elapsed() @@ -170,4 +193,40 @@ mod tests { std::thread::sleep(std::time::Duration::from_millis(10)); assert!(tracker.uptime().as_millis() >= 10); } + + #[test] + fn test_history_returns_chronological() { + let mut tracker = ThroughputTracker::new(60); + for i in 1..=5 { + tracker.record_bytes(i * 100, i * 200); + tracker.sample(); + } + let history = tracker.history(5); + assert_eq!(history.len(), 5); + // First sample should have 100 bytes_in, last should have 500 + assert_eq!(history[0].bytes_in, 100); + assert_eq!(history[4].bytes_in, 500); + } + + #[test] + fn test_history_wraps_around() { + let mut tracker = ThroughputTracker::new(3); // Small capacity + for i in 1..=5 { + tracker.record_bytes(i * 100, i * 200); + tracker.sample(); + } + // Only last 3 should be retained + let history = tracker.history(10); // Ask for more than available + assert_eq!(history.len(), 3); + assert_eq!(history[0].bytes_in, 300); + assert_eq!(history[1].bytes_in, 400); + assert_eq!(history[2].bytes_in, 500); + } + + #[test] + fn test_history_empty() { + let tracker = ThroughputTracker::new(60); + let history = tracker.history(10); + assert!(history.is_empty()); + } } diff --git a/rust/crates/rustproxy-passthrough/src/forwarder.rs b/rust/crates/rustproxy-passthrough/src/forwarder.rs index 6231cc6..e92d628 100644 --- a/rust/crates/rustproxy-passthrough/src/forwarder.rs +++ b/rust/crates/rustproxy-passthrough/src/forwarder.rs @@ -7,6 +7,14 @@ use tracing::debug; use rustproxy_metrics::MetricsCollector; +/// Context for forwarding metrics, replacing the growing tuple pattern. +#[derive(Clone)] +pub struct ForwardMetricsCtx { + pub collector: Arc, + pub route_id: Option, + pub source_ip: Option, +} + /// Perform bidirectional TCP forwarding between client and backend. /// /// This is the core data path for passthrough connections. @@ -73,13 +81,13 @@ pub async fn forward_bidirectional_with_timeouts( inactivity_timeout: std::time::Duration, max_lifetime: std::time::Duration, cancel: CancellationToken, - metrics: Option<(Arc, Option)>, + metrics: Option, ) -> std::io::Result<(u64, u64)> { // Send initial data (peeked bytes) to backend if let Some(data) = initial_data { backend.write_all(data).await?; - if let Some((ref m, ref rid)) = metrics { - m.record_bytes(data.len() as u64, 0, rid.as_deref()); + if let Some(ref ctx) = metrics { + ctx.collector.record_bytes(data.len() as u64, 0, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } @@ -105,8 +113,8 @@ pub async fn forward_bidirectional_with_timeouts( } total += n as u64; la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); - if let Some((ref m, ref rid)) = metrics_c2b { - m.record_bytes(n as u64, 0, rid.as_deref()); + if let Some(ref ctx) = metrics_c2b { + ctx.collector.record_bytes(n as u64, 0, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } let _ = backend_write.shutdown().await; @@ -128,8 +136,8 @@ pub async fn forward_bidirectional_with_timeouts( } total += n as u64; la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); - if let Some((ref m, ref rid)) = metrics_b2c { - m.record_bytes(0, n as u64, rid.as_deref()); + if let Some(ref ctx) = metrics_b2c { + ctx.collector.record_bytes(0, n as u64, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } let _ = client_write.shutdown().await; @@ -182,4 +190,3 @@ pub async fn forward_bidirectional_with_timeouts( watchdog.abort(); Ok((bytes_in, bytes_out)) } - diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index febf9ab..5affcdd 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -20,14 +20,16 @@ use crate::connection_tracker::ConnectionTracker; struct ConnectionGuard { metrics: Arc, route_id: Option, + source_ip: Option, disarmed: bool, } impl ConnectionGuard { - fn new(metrics: Arc, route_id: Option<&str>) -> Self { + fn new(metrics: Arc, route_id: Option<&str>, source_ip: Option<&str>) -> Self { Self { metrics, route_id: route_id.map(|s| s.to_string()), + source_ip: source_ip.map(|s| s.to_string()), disarmed: false, } } @@ -42,7 +44,7 @@ impl ConnectionGuard { impl Drop for ConnectionGuard { fn drop(&mut self) { if !self.disarmed { - self.metrics.connection_closed(self.route_id.as_deref()); + self.metrics.connection_closed(self.route_id.as_deref(), self.source_ip.as_deref()); } } } @@ -395,6 +397,9 @@ impl TcpListenerManager { stream.set_nodelay(true)?; + // Extract source IP once for all metric calls + let ip_str = peer_addr.ip().to_string(); + // === Fast path: try port-only matching before peeking at data === // This handles "server-speaks-first" protocols where the client // doesn't send initial data (e.g., SMTP, greeting-based protocols). @@ -460,8 +465,8 @@ impl TcpListenerManager { } } - metrics.connection_opened(route_id); - let _fast_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id); + metrics.connection_opened(route_id, Some(&ip_str)); + let _fast_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id, Some(&ip_str)); let connect_timeout = std::time::Duration::from_millis(conn_config.connection_timeout_ms); let inactivity_timeout = std::time::Duration::from_millis(conn_config.socket_timeout_ms); @@ -499,13 +504,21 @@ impl TcpListenerManager { let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts( stream, backend_w, None, inactivity_timeout, max_lifetime, cancel, - Some((Arc::clone(&metrics), route_id.map(|s| s.to_string()))), + Some(forwarder::ForwardMetricsCtx { + collector: Arc::clone(&metrics), + route_id: route_id.map(|s| s.to_string()), + source_ip: Some(ip_str.clone()), + }), ).await?; } else { let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts( stream, backend, None, inactivity_timeout, max_lifetime, cancel, - Some((Arc::clone(&metrics), route_id.map(|s| s.to_string()))), + Some(forwarder::ForwardMetricsCtx { + collector: Arc::clone(&metrics), + route_id: route_id.map(|s| s.to_string()), + source_ip: Some(ip_str.clone()), + }), ).await?; } @@ -646,8 +659,8 @@ impl TcpListenerManager { } // Track connection in metrics — guard ensures connection_closed on all exit paths - metrics.connection_opened(route_id); - let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id); + metrics.connection_opened(route_id, Some(&ip_str)); + let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id, Some(&ip_str)); // Check if this is a socket-handler route that should be relayed to TypeScript if route_match.route.action.action_type == RouteActionType::SocketHandler { @@ -755,7 +768,11 @@ impl TcpListenerManager { let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts( stream, backend, Some(&actual_buf), inactivity_timeout, max_lifetime, cancel, - Some((Arc::clone(&metrics), route_id.map(|s| s.to_string()))), + Some(forwarder::ForwardMetricsCtx { + collector: Arc::clone(&metrics), + route_id: route_id.map(|s| s.to_string()), + source_ip: Some(ip_str.clone()), + }), ).await?; Ok(()) } @@ -816,7 +833,11 @@ impl TcpListenerManager { let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts( tls_read, tls_write, backend_read, backend_write, inactivity_timeout, max_lifetime, - Some((Arc::clone(&metrics), route_id.map(|s| s.to_string()))), + Some(forwarder::ForwardMetricsCtx { + collector: Arc::clone(&metrics), + route_id: route_id.map(|s| s.to_string()), + source_ip: Some(ip_str.clone()), + }), ).await; } Ok(()) @@ -865,7 +886,11 @@ impl TcpListenerManager { let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts( stream, backend, Some(&actual_buf), inactivity_timeout, max_lifetime, cancel, - Some((Arc::clone(&metrics), route_id.map(|s| s.to_string()))), + Some(forwarder::ForwardMetricsCtx { + collector: Arc::clone(&metrics), + route_id: route_id.map(|s| s.to_string()), + source_ip: Some(ip_str.clone()), + }), ).await?; Ok(()) } @@ -941,12 +966,14 @@ impl TcpListenerManager { let total_in = c2s + initial_len; debug!("Socket handler relay complete for {}: {} bytes in, {} bytes out", route_key, total_in, s2c); - metrics.record_bytes(total_in, s2c, route_id); + let ip = peer_addr.ip().to_string(); + metrics.record_bytes(total_in, s2c, route_id, Some(&ip)); } Err(e) => { // Still record the initial data even on error if initial_len > 0 { - metrics.record_bytes(initial_len, 0, route_id); + let ip = peer_addr.ip().to_string(); + metrics.record_bytes(initial_len, 0, route_id, Some(&ip)); } debug!("Socket handler relay ended for {}: {}", route_key, e); } @@ -1032,7 +1059,11 @@ impl TcpListenerManager { let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts( client_read, client_write, backend_read, backend_write, inactivity_timeout, max_lifetime, - Some((metrics, route_id.map(|s| s.to_string()))), + Some(forwarder::ForwardMetricsCtx { + collector: metrics, + route_id: route_id.map(|s| s.to_string()), + source_ip: Some(peer_addr.ip().to_string()), + }), ).await; Ok(()) @@ -1078,7 +1109,7 @@ impl TcpListenerManager { mut backend_write: W2, inactivity_timeout: std::time::Duration, max_lifetime: std::time::Duration, - metrics: Option<(Arc, Option)>, + metrics: Option, ) -> (u64, u64) where R1: tokio::io::AsyncRead + Unpin + Send + 'static, @@ -1111,8 +1142,8 @@ impl TcpListenerManager { start.elapsed().as_millis() as u64, Ordering::Relaxed, ); - if let Some((ref m, ref rid)) = metrics_c2b { - m.record_bytes(n as u64, 0, rid.as_deref()); + if let Some(ref ctx) = metrics_c2b { + ctx.collector.record_bytes(n as u64, 0, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } let _ = backend_write.shutdown().await; @@ -1137,8 +1168,8 @@ impl TcpListenerManager { start.elapsed().as_millis() as u64, Ordering::Relaxed, ); - if let Some((ref m, ref rid)) = metrics_b2c { - m.record_bytes(0, n as u64, rid.as_deref()); + if let Some(ref ctx) = metrics_b2c { + ctx.collector.record_bytes(0, n as u64, ctx.route_id.as_deref(), ctx.source_ip.as_deref()); } } let _ = client_write.shutdown().await; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 72ebbfa..07c1d29 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.1.0', + version: '25.2.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/rust-metrics-adapter.ts b/ts/proxies/smart-proxy/rust-metrics-adapter.ts index 3eb1add..38f4108 100644 --- a/ts/proxies/smart-proxy/rust-metrics-adapter.ts +++ b/ts/proxies/smart-proxy/rust-metrics-adapter.ts @@ -72,12 +72,23 @@ export class RustMetricsAdapter implements IMetrics { return result; }, byIP: (): Map => { - // Per-IP tracking not yet available from Rust - return new Map(); + const result = new Map(); + if (this.cache?.ips) { + for (const [ip, im] of Object.entries(this.cache.ips)) { + result.set(ip, (im as any).activeConnections ?? 0); + } + } + return result; }, - topIPs: (_limit?: number): Array<{ ip: string; count: number }> => { - // Per-IP tracking not yet available from Rust - return []; + topIPs: (limit: number = 10): Array<{ ip: string; count: number }> => { + const result: Array<{ ip: string; count: number }> = []; + if (this.cache?.ips) { + for (const [ip, im] of Object.entries(this.cache.ips)) { + result.push({ ip, count: (im as any).activeConnections ?? 0 }); + } + } + result.sort((a, b) => b.count - a.count); + return result.slice(0, limit); }, }; @@ -100,9 +111,13 @@ export class RustMetricsAdapter implements IMetrics { custom: (_seconds: number): IThroughputData => { return this.throughput.instant(); }, - history: (_seconds: number): Array => { - // Throughput history not yet available from Rust - return []; + history: (seconds: number): Array => { + if (!this.cache?.throughputHistory) return []; + return this.cache.throughputHistory.slice(-seconds).map((p: any) => ({ + timestamp: p.timestampMs, + in: p.bytesIn, + out: p.bytesOut, + })); }, byRoute: (_windowSeconds?: number): Map => { const result = new Map(); @@ -117,21 +132,28 @@ export class RustMetricsAdapter implements IMetrics { return result; }, byIP: (_windowSeconds?: number): Map => { - return new Map(); + const result = new Map(); + if (this.cache?.ips) { + for (const [ip, im] of Object.entries(this.cache.ips)) { + result.set(ip, { + in: (im as any).throughputInBytesPerSec ?? 0, + out: (im as any).throughputOutBytesPerSec ?? 0, + }); + } + } + return result; }, }; public requests = { perSecond: (): number => { - // Rust tracks connections, not HTTP requests (TCP-level proxy) - return 0; + return this.cache?.httpRequestsPerSec ?? 0; }, perMinute: (): number => { - return 0; + return (this.cache?.httpRequestsPerSecRecent ?? 0) * 60; }, total: (): number => { - // Use total connections as a proxy for total requests - return this.cache?.totalConnections ?? 0; + return this.cache?.totalHttpRequests ?? this.cache?.totalConnections ?? 0; }, };