diff --git a/changelog.md b/changelog.md index 6241afe..9847109 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-04-13 - 27.6.0 - feat(metrics) +track per-IP domain request metrics across HTTP and TCP passthrough traffic + +- records domain request counts per frontend IP from HTTP Host headers and TCP SNI +- exposes per-IP domain maps and top IP-domain request pairs through the TypeScript metrics adapter +- bounds per-IP domain tracking and prunes stale entries to limit memory growth +- adds metrics system documentation covering architecture, collected data, and known gaps + ## 2026-04-06 - 27.5.0 - feat(security) add domain-scoped IP allow list support across HTTP and passthrough filtering diff --git a/readme.metrics.md b/readme.metrics.md new file mode 100644 index 0000000..9782101 --- /dev/null +++ b/readme.metrics.md @@ -0,0 +1,484 @@ +# SmartProxy Metrics System + +## Architecture + +Two-tier design separating the data plane from the observation plane: + +**Hot path (per-chunk, lock-free):** All recording in the proxy data plane touches only `AtomicU64` counters. No `Mutex` is ever acquired on the forwarding path. `CountingBody` batches flushes every 64KB to reduce DashMap shard contention. + +**Cold path (1Hz sampling):** A background tokio task drains pending atomics into `ThroughputTracker` circular buffers (Mutex-guarded), producing per-second throughput history. Same task prunes orphaned entries and cleans up rate limiter state. + +**Read path (on-demand):** `snapshot()` reads all atomics and locks ThroughputTrackers to build a serializable `Metrics` struct. TypeScript polls this at 1s intervals via IPC. + +``` +Data Plane (lock-free) Background (1Hz) Read Path +───────────────────── ────────────────── ───────── +record_bytes() ──> AtomicU64 ──┐ +record_http_request() ──> AtomicU64 ──┤ +connection_opened/closed() ──> AtomicU64 ──┤ sample_all() snapshot() +backend_*() ──> DashMap ──┤────> drain atomics ──────> Metrics struct +protocol_*() ──> AtomicU64 ──┤ feed ThroughputTrackers ──> JSON +datagram_*() ──> AtomicU64 ──┘ prune orphans ──> IPC stdout + ──> TS cache + ──> IMetrics API +``` + +### Key Types + +| Type | Crate | Purpose | +|---|---|---| +| `MetricsCollector` | `rustproxy-metrics` | Central store. All DashMaps, atomics, and throughput trackers | +| `ThroughputTracker` | `rustproxy-metrics` | Circular buffer of 1Hz samples. Default 3600 capacity (1 hour) | +| `ForwardMetricsCtx` | `rustproxy-passthrough` | Carries `Arc` + route_id + source_ip through TCP forwarding | +| `CountingBody` | `rustproxy-http` | Wraps HTTP bodies, batches byte recording per 64KB, flushes on drop | +| `ProtocolGuard` | `rustproxy-http` | RAII guard for frontend/backend protocol active/total counters | +| `ConnectionGuard` | `rustproxy-passthrough` | RAII guard calling `connection_closed()` on drop | +| `RustMetricsAdapter` | TypeScript | Polls Rust via IPC, implements `IMetrics` interface over cached JSON | + +--- + +## What's Collected + +### Global Counters + +| Metric | Type | Updated by | +|---|---|---| +| Active connections | AtomicU64 | `connection_opened/closed` | +| Total connections (lifetime) | AtomicU64 | `connection_opened` | +| Total bytes in | AtomicU64 | `record_bytes` | +| Total bytes out | AtomicU64 | `record_bytes` | +| Total HTTP requests | AtomicU64 | `record_http_request` | +| Active UDP sessions | AtomicU64 | `udp_session_opened/closed` | +| Total UDP sessions | AtomicU64 | `udp_session_opened` | +| Total datagrams in | AtomicU64 | `record_datagram_in` | +| Total datagrams out | AtomicU64 | `record_datagram_out` | + +### Per-Route Metrics (keyed by route ID string) + +| Metric | Storage | +|---|---| +| Active connections | `DashMap` | +| Total connections | `DashMap` | +| Bytes in / out | `DashMap` | +| Pending throughput (in, out) | `DashMap` | +| Throughput history | `DashMap>` | + +Entries are pruned via `retain_routes()` when routes are removed. + +### Per-IP Metrics (keyed by IP string) + +| Metric | Storage | +|---|---| +| Active connections | `DashMap` | +| Total connections | `DashMap` | +| Bytes in / out | `DashMap` | +| Pending throughput (in, out) | `DashMap` | +| Throughput history | `DashMap>` | +| Domain requests | `DashMap>` (IP → domain → count) | + +All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in `sample_all()` catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending. + +**Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation. + +### Per-Backend Metrics (keyed by "host:port") + +| Metric | Storage | +|---|---| +| Active connections | `DashMap` | +| Total connections | `DashMap` | +| Detected protocol (h1/h2/h3) | `DashMap` | +| Connect errors | `DashMap` | +| Handshake errors | `DashMap` | +| Request errors | `DashMap` | +| Total connect time (microseconds) | `DashMap` | +| Connect count | `DashMap` | +| Pool hits | `DashMap` | +| Pool misses | `DashMap` | +| H2 failures (fallback to H1) | `DashMap` | + +All per-backend maps are evicted when active count reaches 0. Pruned via `retain_backends()`. + +### Frontend Protocol Distribution + +Tracked via `ProtocolGuard` RAII guards and `FrontendProtocolTracker`. Five protocol categories, each with active + total counters (AtomicU64): + +| Protocol | Where detected | +|---|---| +| h1 | `FrontendProtocolTracker` on first HTTP/1.x request | +| h2 | `FrontendProtocolTracker` on first HTTP/2 request | +| h3 | `ProtocolGuard::frontend("h3")` in H3ProxyService | +| ws | `ProtocolGuard::frontend("ws")` on WebSocket upgrade | +| other | Fallback (TCP passthrough without HTTP) | + +Uses `fetch_update` for saturating decrements to prevent underflow races. + +### Backend Protocol Distribution + +Same five categories (h1/h2/h3/ws/other), tracked via `ProtocolGuard::backend()` at connection establishment. Backend h2 failures (fallback to h1) are separately counted. + +### Throughput History + +`ThroughputTracker` is a circular buffer storing `ThroughputSample { timestamp_ms, bytes_in, bytes_out }` at 1Hz. + +- Global tracker: 1 instance, default 3600 capacity +- Per-route trackers: 1 per active route +- Per-IP trackers: 1 per connected IP (evicted with the IP) +- HTTP request tracker: reuses ThroughputTracker with bytes_in = request count, bytes_out = 0 + +Query methods: +- `instant()` — last 1 second average +- `recent()` — last 10 seconds average +- `throughput(N)` — last N seconds average +- `history(N)` — last N raw samples in chronological order + +Snapshots return 60 samples of global throughput history. + +### Protocol Detection Cache + +Not part of MetricsCollector. Maintained by `HttpProxyService`'s protocol detection system. Injected into the metrics snapshot at read time by `get_metrics()`. + +Each entry records: host, port, domain, detected protocol (h1/h2/h3), H3 port, age, last accessed, last probed, suppression flags, cooldown timers, consecutive failure counts. + +--- + +## Instrumentation Points + +### TCP Passthrough (`rustproxy-passthrough`) + +**Connection lifecycle** — `tcp_listener.rs`: +- Accept: `conn_tracker.connection_opened(&ip)` (rate limiter) + `ConnectionTrackerGuard` RAII +- Route match: `metrics.connection_opened(route_id, source_ip)` + `ConnectionGuard` RAII +- Close: Both guards call their respective `_closed()` methods on drop + +**Byte recording** — `forwarder.rs` (`forward_bidirectional_with_timeouts`): +- Initial peeked data recorded immediately +- Per-chunk in both directions: `record_bytes(n, 0, ...)` / `record_bytes(0, n, ...)` +- Same pattern in `forward_bidirectional_split_with_timeouts` (tcp_listener.rs) for TLS-terminated paths + +### HTTP Proxy (`rustproxy-http`) + +**Request counting** — `proxy_service.rs`: +- `record_http_request()` called once per request after route matching succeeds + +**Body byte counting** — `counting_body.rs` wrapping: +- Request bodies: `CountingBody::new(body, ..., Direction::In)` — counts client-to-upstream bytes +- Response bodies: `CountingBody::new(body, ..., Direction::Out)` — counts upstream-to-client bytes +- Batched flush every 64KB (`BYTE_FLUSH_THRESHOLD = 65_536`), remainder flushed on drop +- Also updates `connection_activity` atomic (idle watchdog) and `active_requests` counter (streaming detection) + +**Backend metrics** — `proxy_service.rs`: +- `backend_connection_opened(key, connect_time)` — after TCP/TLS connect succeeds +- `backend_connection_closed(key)` — on teardown +- `backend_connect_error(key)` — TCP/TLS connect failure or timeout +- `backend_handshake_error(key)` — H1/H2 protocol handshake failure +- `backend_request_error(key)` — send_request failure +- `backend_h2_failure(key)` — H2 attempted, fell back to H1 +- `backend_pool_hit(key)` / `backend_pool_miss(key)` — connection pool reuse +- `set_backend_protocol(key, proto)` — records detected protocol + +**WebSocket** — `proxy_service.rs`: +- Does NOT use CountingBody; records bytes directly per-chunk in both directions of the bidirectional copy loop + +### QUIC (`rustproxy-passthrough`) + +**Connection level** — `quic_handler.rs`: +- `connection_opened` / `connection_closed` via `QuicConnGuard` RAII +- `conn_tracker.connection_opened/closed` for rate limiting + +**Stream level**: +- For QUIC-to-TCP stream forwarding: `record_bytes(bytes_in, bytes_out, ...)` called once per stream at completion (post-hoc, not per-chunk) +- For HTTP/3: delegates to `HttpProxyService.handle_request()`, so all HTTP proxy metrics apply + +**H3 specifics** — `h3_service.rs`: +- `ProtocolGuard::frontend("h3")` tracks the H3 connection +- H3 request bodies: `record_bytes(data.len(), 0, ...)` called directly (not CountingBody) since H3 uses `stream.send_data()` +- H3 response bodies: wrapped in CountingBody like HTTP/1 and HTTP/2 + +### UDP (`rustproxy-passthrough`) + +**Session lifecycle** — `udp_listener.rs` / `udp_session.rs`: +- `udp_session_opened()` + `connection_opened(route_id, source_ip)` on new session +- `udp_session_closed()` + `connection_closed(route_id, source_ip)` on idle reap or port drain + +**Datagram counting** — `udp_listener.rs`: +- Inbound: `record_bytes(len, 0, ...)` + `record_datagram_in()` +- Outbound (backend reply): `record_bytes(0, len, ...)` + `record_datagram_out()` + +--- + +## Sampling Loop + +`lib.rs` spawns a tokio task at configurable interval (default 1000ms): + +```rust +loop { + tokio::select! { + _ = cancel => break, + _ = interval.tick() => { + metrics.sample_all(); + conn_tracker.cleanup_stale_timestamps(); + http_proxy.cleanup_all_rate_limiters(); + } + } +} +``` + +`sample_all()` performs in one pass: +1. Drains `global_pending_tp_in/out` into global ThroughputTracker, samples +2. Drains per-route pending counters into per-route trackers, samples each +3. Samples idle route trackers (no new data) to advance their window +4. Drains per-IP pending counters into per-IP trackers, samples each +5. Drains `pending_http_requests` into HTTP request throughput tracker +6. Prunes orphaned per-IP entries (bytes/throughput maps with no matching ip_connections key) +7. Prunes orphaned per-backend entries (error/stats maps with no matching active/total key) + +--- + +## Data Flow: Rust to TypeScript + +``` +MetricsCollector.snapshot() + ├── reads all AtomicU64 counters + ├── iterates DashMaps (routes, IPs, backends) + ├── locks ThroughputTrackers for instant/recent rates + history + └── produces Metrics struct + +RustProxy::get_metrics() + ├── calls snapshot() + ├── enriches with detectedProtocols from HTTP proxy protocol cache + └── returns Metrics + +management.rs "getMetrics" IPC command + ├── calls get_metrics() + ├── serde_json::to_value (camelCase) + └── writes JSON to stdout + +RustProxyBridge (TypeScript) + ├── reads JSON from Rust process stdout + └── returns parsed object + +RustMetricsAdapter + ├── setInterval polls bridge.getMetrics() every 1s + ├── stores raw JSON in this.cache + └── IMetrics methods read synchronously from cache + +SmartProxy.getMetrics() + └── returns the RustMetricsAdapter instance +``` + +### IPC JSON Shape (Metrics) + +```json +{ + "activeConnections": 42, + "totalConnections": 1000, + "bytesIn": 123456789, + "bytesOut": 987654321, + "throughputInBytesPerSec": 50000, + "throughputOutBytesPerSec": 80000, + "throughputRecentInBytesPerSec": 45000, + "throughputRecentOutBytesPerSec": 75000, + "routes": { + "": { + "activeConnections": 5, + "totalConnections": 100, + "bytesIn": 0, "bytesOut": 0, + "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0, + "throughputRecentInBytesPerSec": 0, "throughputRecentOutBytesPerSec": 0 + } + }, + "ips": { + "": { + "activeConnections": 2, "totalConnections": 10, + "bytesIn": 0, "bytesOut": 0, + "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0, + "domainRequests": { + "example.com": 4821, + "api.example.com": 312 + } + } + }, + "backends": { + "": { + "activeConnections": 3, "totalConnections": 50, + "protocol": "h2", + "connectErrors": 0, "handshakeErrors": 0, "requestErrors": 0, + "totalConnectTimeUs": 150000, "connectCount": 50, + "poolHits": 40, "poolMisses": 10, "h2Failures": 1 + } + }, + "throughputHistory": [ + { "timestampMs": 1713000000000, "bytesIn": 50000, "bytesOut": 80000 } + ], + "totalHttpRequests": 5000, + "httpRequestsPerSec": 100, + "httpRequestsPerSecRecent": 95, + "activeUdpSessions": 0, "totalUdpSessions": 5, + "totalDatagramsIn": 1000, "totalDatagramsOut": 1000, + "frontendProtocols": { + "h1Active": 10, "h1Total": 500, + "h2Active": 5, "h2Total": 200, + "h3Active": 1, "h3Total": 50, + "wsActive": 2, "wsTotal": 30, + "otherActive": 0, "otherTotal": 0 + }, + "backendProtocols": { "...same shape..." }, + "detectedProtocols": [ + { + "host": "backend", "port": 443, "domain": "example.com", + "protocol": "h2", "h3Port": 443, + "ageSecs": 120, "lastAccessedSecs": 5, "lastProbedSecs": 120, + "h2Suppressed": false, "h3Suppressed": false, + "h2CooldownRemainingSecs": null, "h3CooldownRemainingSecs": null, + "h2ConsecutiveFailures": null, "h3ConsecutiveFailures": null + } + ] +} +``` + +### IPC JSON Shape (Statistics) + +Lightweight administrative summary, fetched on-demand (not polled): + +```json +{ + "activeConnections": 42, + "totalConnections": 1000, + "routesCount": 5, + "listeningPorts": [80, 443, 8443], + "uptimeSeconds": 86400 +} +``` + +--- + +## TypeScript Consumer API + +`SmartProxy.getMetrics()` returns an `IMetrics` object. All members are synchronous methods reading from the polled cache. + +### connections + +| Method | Return | Source | +|---|---|---| +| `active()` | `number` | `cache.activeConnections` | +| `total()` | `number` | `cache.totalConnections` | +| `byRoute()` | `Map` | `cache.routes[name].activeConnections` | +| `byIP()` | `Map` | `cache.ips[ip].activeConnections` | +| `topIPs(limit?)` | `Array<{ip, count}>` | `cache.ips` sorted by active desc, default 10 | +| `domainRequestsByIP()` | `Map>` | `cache.ips[ip].domainRequests` | +| `topDomainRequests(limit?)` | `Array<{ip, domain, count}>` | Flattened from all IPs, sorted by count desc, default 20 | +| `frontendProtocols()` | `IProtocolDistribution` | `cache.frontendProtocols.*` | +| `backendProtocols()` | `IProtocolDistribution` | `cache.backendProtocols.*` | + +### throughput + +| Method | Return | Source | +|---|---|---| +| `instant()` | `{in, out}` | `cache.throughputInBytesPerSec/Out` | +| `recent()` | `{in, out}` | `cache.throughputRecentInBytesPerSec/Out` | +| `average()` | `{in, out}` | Falls back to `instant()` (not wired to windowed average) | +| `custom(seconds)` | `{in, out}` | Falls back to `instant()` (not wired) | +| `history(seconds)` | `IThroughputHistoryPoint[]` | `cache.throughputHistory` sliced to last N entries | +| `byRoute(windowSeconds?)` | `Map` | `cache.routes[name].throughputIn/OutBytesPerSec` | +| `byIP(windowSeconds?)` | `Map` | `cache.ips[ip].throughputIn/OutBytesPerSec` | + +### requests + +| Method | Return | Source | +|---|---|---| +| `perSecond()` | `number` | `cache.httpRequestsPerSec` | +| `perMinute()` | `number` | `cache.httpRequestsPerSecRecent * 60` | +| `total()` | `number` | `cache.totalHttpRequests` (fallback: totalConnections) | + +### totals + +| Method | Return | Source | +|---|---|---| +| `bytesIn()` | `number` | `cache.bytesIn` | +| `bytesOut()` | `number` | `cache.bytesOut` | +| `connections()` | `number` | `cache.totalConnections` | + +### backends + +| Method | Return | Source | +|---|---|---| +| `byBackend()` | `Map` | `cache.backends[key].*` with computed `avgConnectTimeMs` and `poolHitRate` | +| `protocols()` | `Map` | `cache.backends[key].protocol` | +| `topByErrors(limit?)` | `IBackendMetrics[]` | Sorted by total errors desc | +| `detectedProtocols()` | `IProtocolCacheEntry[]` | `cache.detectedProtocols` passthrough | + +`IBackendMetrics`: `{ protocol, activeConnections, totalConnections, connectErrors, handshakeErrors, requestErrors, avgConnectTimeMs, poolHitRate, h2Failures }` + +### udp + +| Method | Return | Source | +|---|---|---| +| `activeSessions()` | `number` | `cache.activeUdpSessions` | +| `totalSessions()` | `number` | `cache.totalUdpSessions` | +| `datagramsIn()` | `number` | `cache.totalDatagramsIn` | +| `datagramsOut()` | `number` | `cache.totalDatagramsOut` | + +### percentiles (stub) + +`connectionDuration()` and `bytesTransferred()` always return zeros. Not implemented. + +--- + +## Configuration + +```typescript +interface IMetricsConfig { + enabled: boolean; // default true + sampleIntervalMs: number; // default 1000 (1Hz sampling + TS polling) + retentionSeconds: number; // default 3600 (ThroughputTracker capacity) + enableDetailedTracking: boolean; + enablePercentiles: boolean; + cacheResultsMs: number; + prometheusEnabled: boolean; // not wired + prometheusPath: string; // not wired + prometheusPrefix: string; // not wired +} +``` + +Rust-side config (`MetricsConfig` in `rustproxy-config`): + +```rust +pub struct MetricsConfig { + pub enabled: Option, + pub sample_interval_ms: Option, // default 1000 + pub retention_seconds: Option, // default 3600 +} +``` + +--- + +## Design Decisions + +**Lock-free hot path.** `record_bytes()` is the most frequently called method (per-chunk in TCP, per-64KB in HTTP). It only touches `AtomicU64` with `Relaxed` ordering and short-circuits zero-byte directions to skip DashMap lookups entirely. + +**CountingBody batching.** HTTP body frames are typically 16KB. Flushing to MetricsCollector every 64KB reduces DashMap shard contention by ~4x compared to per-frame recording. + +**RAII guards everywhere.** `ConnectionGuard`, `ConnectionTrackerGuard`, `QuicConnGuard`, `ProtocolGuard`, `FrontendProtocolTracker` all use Drop to guarantee counter cleanup on all exit paths including panics. + +**Saturating decrements.** Protocol counters use `fetch_update` loops instead of `fetch_sub` to prevent underflow to `u64::MAX` from concurrent close races. + +**Bounded memory.** Per-IP entries evicted on last connection close. Per-backend entries evicted on last connection close. Snapshot caps IPs and backends at 100 each. `sample_all()` prunes orphaned entries every second. + +**Two-phase throughput.** Pending bytes accumulate in lock-free atomics. The 1Hz cold path drains them into Mutex-guarded ThroughputTrackers. This means the hot path never contends on a Mutex, while the cold path does minimal work (one drain + one sample per tracker). + +--- + +## Known Gaps + +| Gap | Status | +|---|---| +| `throughput.average()` / `throughput.custom(seconds)` | Fall back to `instant()`. Not wired to Rust windowed queries. | +| `percentiles.connectionDuration()` / `percentiles.bytesTransferred()` | Stub returning zeros. No histogram in Rust. | +| Prometheus export | Config fields exist but not wired to any exporter. | +| `LogDeduplicator` | Implemented in `rustproxy-metrics` but not connected to any call site. | +| Rate limit hit counters | Rate-limited requests return 429 but no counter is recorded in MetricsCollector. | +| QUIC stream byte counting | Post-hoc (per-stream totals after close), not per-chunk like TCP. | +| Throughput history in snapshot | Capped at 60 samples. TS `history(seconds)` cannot return more than 60 points regardless of `retentionSeconds`. | +| Per-route total connections / bytes | Available in Rust JSON but `IMetrics.connections.byRoute()` only exposes active connections. | +| Per-IP total connections / bytes | Available in Rust JSON but `IMetrics.connections.byIP()` only exposes active connections. | +| IPC response typing | `RustProxyBridge` declares `result: any` for both metrics commands. No type-safe response. | diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index c3632eb..be305ef 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -626,6 +626,9 @@ impl HttpProxyService { let route_id = route_match.route.id.as_deref(); let ip_str = ip_string; // reuse from above (avoid redundant to_string()) self.metrics.record_http_request(); + if let Some(ref h) = host { + self.metrics.record_ip_domain_request(&ip_str, h); + } // Apply request filters (IP check, rate limiting, auth) if let Some(ref security) = route_match.route.security { diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 862c160..8392b96 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -1,6 +1,6 @@ use dashmap::DashMap; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; use std::time::Duration; @@ -62,6 +62,8 @@ pub struct IpMetrics { pub bytes_out: u64, pub throughput_in_bytes_per_sec: u64, pub throughput_out_bytes_per_sec: u64, + /// Per-domain request/connection counts for this IP. + pub domain_requests: HashMap, } /// Per-backend metrics (keyed by "host:port"). @@ -139,6 +141,9 @@ const MAX_IPS_IN_SNAPSHOT: usize = 100; /// Maximum number of backends to include in a snapshot (top by total connections). const MAX_BACKENDS_IN_SNAPSHOT: usize = 100; +/// Maximum number of distinct domains tracked per IP (prevents subdomain-spray abuse). +const MAX_DOMAINS_PER_IP: usize = 256; + /// Metrics collector tracking connections and throughput. /// /// Design: The hot path (`record_bytes`) is entirely lock-free — it only touches @@ -165,6 +170,10 @@ pub struct MetricsCollector { ip_bytes_out: DashMap, ip_pending_tp: DashMap, ip_throughput: DashMap>, + /// Per-IP domain request counts: IP → { domain → count }. + /// Tracks which domains each frontend IP has requested (via HTTP Host/SNI). + /// Inner DashMap uses 2 shards to minimise base memory per IP. + ip_domain_requests: DashMap>, // ── Per-backend tracking (keyed by "host:port") ── backend_active: DashMap, @@ -247,6 +256,7 @@ impl MetricsCollector { ip_bytes_out: DashMap::new(), ip_pending_tp: DashMap::new(), ip_throughput: DashMap::new(), + ip_domain_requests: DashMap::new(), backend_active: DashMap::new(), backend_total: DashMap::new(), backend_protocol: DashMap::new(), @@ -351,6 +361,7 @@ impl MetricsCollector { self.ip_bytes_out.remove(ip); self.ip_pending_tp.remove(ip); self.ip_throughput.remove(ip); + self.ip_domain_requests.remove(ip); } } } @@ -452,6 +463,37 @@ impl MetricsCollector { self.pending_http_requests.fetch_add(1, Ordering::Relaxed); } + /// Record a domain request/connection for a frontend IP. + /// + /// Called per HTTP request (with Host header) and per TCP passthrough + /// connection (with SNI domain). The common case (IP + domain both already + /// tracked) is two DashMap reads + one atomic increment — zero allocation. + pub fn record_ip_domain_request(&self, ip: &str, domain: &str) { + // Fast path: IP already tracked, domain already tracked + if let Some(domains) = self.ip_domain_requests.get(ip) { + if let Some(counter) = domains.get(domain) { + counter.fetch_add(1, Ordering::Relaxed); + return; + } + // New domain for this IP — enforce cap + if domains.len() >= MAX_DOMAINS_PER_IP { + return; + } + domains + .entry(domain.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(1, Ordering::Relaxed); + return; + } + // New IP — only create if the IP has active connections + if !self.ip_connections.contains_key(ip) { + return; + } + let inner = DashMap::with_capacity_and_shard_amount(4, 2); + inner.insert(domain.to_string(), AtomicU64::new(1)); + self.ip_domain_requests.insert(ip.to_string(), inner); + } + // ── UDP session recording methods ── /// Record a new UDP session opened. @@ -745,6 +787,7 @@ impl MetricsCollector { self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k)); self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k)); + self.ip_domain_requests.retain(|k, _| self.ip_connections.contains_key(k)); // Safety-net: prune orphaned backend error/stats entries for backends // that have no active or total connections (error-only backends). @@ -852,7 +895,7 @@ impl MetricsCollector { // Collect per-IP metrics — only IPs with active connections or total > 0, // capped at top MAX_IPS_IN_SNAPSHOT sorted by active count - let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64)> = Vec::new(); + let mut ip_entries: Vec<(String, u64, u64, u64, u64, u64, u64, HashMap)> = Vec::new(); for entry in self.ip_total_connections.iter() { let ip = entry.key().clone(); let total = entry.value().load(Ordering::Relaxed); @@ -872,14 +915,23 @@ impl MetricsCollector { .get(&ip) .and_then(|entry| entry.value().lock().ok().map(|t| t.instant())) .unwrap_or((0, 0)); - ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out)); + // Collect per-domain request counts for this IP + let domain_requests = self.ip_domain_requests + .get(&ip) + .map(|domains| { + domains.iter() + .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed))) + .collect() + }) + .unwrap_or_default(); + ip_entries.push((ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests)); } // Sort by active connections descending, then cap ip_entries.sort_by(|a, b| b.1.cmp(&a.1)); ip_entries.truncate(MAX_IPS_IN_SNAPSHOT); let mut ips = std::collections::HashMap::new(); - for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out) in ip_entries { + for (ip, active, total, bytes_in, bytes_out, tp_in, tp_out, domain_requests) in ip_entries { ips.insert(ip, IpMetrics { active_connections: active, total_connections: total, @@ -887,6 +939,7 @@ impl MetricsCollector { bytes_out, throughput_in_bytes_per_sec: tp_in, throughput_out_bytes_per_sec: tp_out, + domain_requests, }); } diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index 29ed085..0d94330 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -943,6 +943,9 @@ impl TcpListenerManager { // Track connection in metrics — guard ensures connection_closed on all exit paths metrics.connection_opened(route_id, Some(&ip_str)); + if let Some(d) = effective_domain { + metrics.record_ip_domain_request(&ip_str, d); + } let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id, Some(&ip_str)); // Check if this is a socket-handler route that should be relayed to TypeScript diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 23d7709..4e7cfcc 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '27.5.0', + version: '27.6.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/models/metrics-types.ts b/ts/proxies/smart-proxy/models/metrics-types.ts index dabd510..b847a90 100644 --- a/ts/proxies/smart-proxy/models/metrics-types.ts +++ b/ts/proxies/smart-proxy/models/metrics-types.ts @@ -57,6 +57,10 @@ export interface IMetrics { byRoute(): Map; byIP(): Map; topIPs(limit?: number): Array<{ ip: string; count: number }>; + /** Per-IP domain request counts: IP -> { domain -> count }. */ + domainRequestsByIP(): Map>; + /** Top IP-domain pairs sorted by request count descending. */ + topDomainRequests(limit?: number): Array<{ ip: string; domain: string; count: number }>; frontendProtocols(): IProtocolDistribution; backendProtocols(): IProtocolDistribution; }; diff --git a/ts/proxies/smart-proxy/rust-metrics-adapter.ts b/ts/proxies/smart-proxy/rust-metrics-adapter.ts index 5617583..16be275 100644 --- a/ts/proxies/smart-proxy/rust-metrics-adapter.ts +++ b/ts/proxies/smart-proxy/rust-metrics-adapter.ts @@ -90,6 +90,39 @@ export class RustMetricsAdapter implements IMetrics { result.sort((a, b) => b.count - a.count); return result.slice(0, limit); }, + domainRequestsByIP: (): Map> => { + const result = new Map>(); + if (this.cache?.ips) { + for (const [ip, im] of Object.entries(this.cache.ips)) { + const dr = (im as any).domainRequests; + if (dr && typeof dr === 'object') { + const domainMap = new Map(); + for (const [domain, count] of Object.entries(dr)) { + domainMap.set(domain, count as number); + } + if (domainMap.size > 0) { + result.set(ip, domainMap); + } + } + } + } + return result; + }, + topDomainRequests: (limit: number = 20): Array<{ ip: string; domain: string; count: number }> => { + const result: Array<{ ip: string; domain: string; count: number }> = []; + if (this.cache?.ips) { + for (const [ip, im] of Object.entries(this.cache.ips)) { + const dr = (im as any).domainRequests; + if (dr && typeof dr === 'object') { + for (const [domain, count] of Object.entries(dr)) { + result.push({ ip, domain, count: count as number }); + } + } + } + } + result.sort((a, b) => b.count - a.count); + return result.slice(0, limit); + }, frontendProtocols: (): IProtocolDistribution => { const fp = this.cache?.frontendProtocols; return {