# SmartProxy Metrics System ## Architecture Two-tier design separating the data plane from the observation plane: **Hot path (per-chunk, lock-free):** All recording in the proxy data plane touches only `AtomicU64` counters. No `Mutex` is ever acquired on the forwarding path. `CountingBody` batches flushes every 64KB to reduce DashMap shard contention. **Cold path (1Hz sampling):** A background tokio task drains pending atomics into `ThroughputTracker` circular buffers (Mutex-guarded), producing per-second throughput history. Same task prunes orphaned entries and cleans up rate limiter state. **Read path (on-demand):** `snapshot()` reads all atomics and locks ThroughputTrackers to build a serializable `Metrics` struct. TypeScript polls this at 1s intervals via IPC. ``` Data Plane (lock-free) Background (1Hz) Read Path ───────────────────── ────────────────── ───────── record_bytes() ──> AtomicU64 ──┐ record_http_request() ──> AtomicU64 ──┤ connection_opened/closed() ──> AtomicU64 ──┤ sample_all() snapshot() backend_*() ──> DashMap ──┤────> drain atomics ──────> Metrics struct protocol_*() ──> AtomicU64 ──┤ feed ThroughputTrackers ──> JSON datagram_*() ──> AtomicU64 ──┘ prune orphans ──> IPC stdout ──> TS cache ──> IMetrics API ``` ### Key Types | Type | Crate | Purpose | |---|---|---| | `MetricsCollector` | `rustproxy-metrics` | Central store. All DashMaps, atomics, and throughput trackers | | `ThroughputTracker` | `rustproxy-metrics` | Circular buffer of 1Hz samples. Default 3600 capacity (1 hour) | | `ForwardMetricsCtx` | `rustproxy-passthrough` | Carries `Arc` + route_id + source_ip through TCP forwarding | | `CountingBody` | `rustproxy-http` | Wraps HTTP bodies, batches byte recording per 64KB, flushes on drop | | `ProtocolGuard` | `rustproxy-http` | RAII guard for frontend/backend protocol active/total counters | | `ConnectionGuard` | `rustproxy-passthrough` | RAII guard calling `connection_closed()` on drop | | `RustMetricsAdapter` | TypeScript | Polls Rust via IPC, implements `IMetrics` interface over cached JSON | --- ## What's Collected ### Global Counters | Metric | Type | Updated by | |---|---|---| | Active connections | AtomicU64 | `connection_opened/closed` | | Total connections (lifetime) | AtomicU64 | `connection_opened` | | Total bytes in | AtomicU64 | `record_bytes` | | Total bytes out | AtomicU64 | `record_bytes` | | Total HTTP requests | AtomicU64 | `record_http_request` | | Active UDP sessions | AtomicU64 | `udp_session_opened/closed` | | Total UDP sessions | AtomicU64 | `udp_session_opened` | | Total datagrams in | AtomicU64 | `record_datagram_in` | | Total datagrams out | AtomicU64 | `record_datagram_out` | ### Per-Route Metrics (keyed by route ID string) | Metric | Storage | |---|---| | Active connections | `DashMap` | | Total connections | `DashMap` | | Bytes in / out | `DashMap` | | Pending throughput (in, out) | `DashMap` | | Throughput history | `DashMap>` | Entries are pruned via `retain_routes()` when routes are removed. ### Per-IP Metrics (keyed by IP string) | Metric | Storage | |---|---| | Active connections | `DashMap` | | Total connections | `DashMap` | | Bytes in / out | `DashMap` | | Pending throughput (in, out) | `DashMap` | | Throughput history | `DashMap>` | | Domain requests | `DashMap>` (IP → domain → count) | All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in `sample_all()` catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending. **Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Domain keys are normalized to lowercase with any trailing dot stripped so the same hostname does not fragment across multiple counters. Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation. ### Per-Backend Metrics (keyed by "host:port") | Metric | Storage | |---|---| | Active connections | `DashMap` | | Total connections | `DashMap` | | Detected protocol (h1/h2/h3) | `DashMap` | | Connect errors | `DashMap` | | Handshake errors | `DashMap` | | Request errors | `DashMap` | | Total connect time (microseconds) | `DashMap` | | Connect count | `DashMap` | | Pool hits | `DashMap` | | Pool misses | `DashMap` | | H2 failures (fallback to H1) | `DashMap` | All per-backend maps are evicted when active count reaches 0. Pruned via `retain_backends()`. ### Frontend Protocol Distribution Tracked via `ProtocolGuard` RAII guards and `FrontendProtocolTracker`. Five protocol categories, each with active + total counters (AtomicU64): | Protocol | Where detected | |---|---| | h1 | `FrontendProtocolTracker` on first HTTP/1.x request | | h2 | `FrontendProtocolTracker` on first HTTP/2 request | | h3 | `ProtocolGuard::frontend("h3")` in H3ProxyService | | ws | `ProtocolGuard::frontend("ws")` on WebSocket upgrade | | other | Fallback (TCP passthrough without HTTP) | Uses `fetch_update` for saturating decrements to prevent underflow races. The same saturating-close pattern is also used for connection and UDP active gauges. ### Backend Protocol Distribution Same five categories (h1/h2/h3/ws/other), tracked via `ProtocolGuard::backend()` at connection establishment. Backend h2 failures (fallback to h1) are separately counted. ### Throughput History `ThroughputTracker` is a circular buffer storing `ThroughputSample { timestamp_ms, bytes_in, bytes_out }` at 1Hz. - Global tracker: 1 instance, default 3600 capacity - Per-route trackers: 1 per active route - Per-IP trackers: 1 per connected IP (evicted with the IP) - HTTP request tracker: reuses ThroughputTracker with bytes_in = request count, bytes_out = 0 Query methods: - `instant()` — last 1 second average - `recent()` — last 10 seconds average - `throughput(N)` — last N seconds average - `history(N)` — last N raw samples in chronological order Snapshots return 60 samples of global throughput history. ### Protocol Detection Cache Not part of MetricsCollector. Maintained by `HttpProxyService`'s protocol detection system. Injected into the metrics snapshot at read time by `get_metrics()`. Each entry records: host, port, domain, detected protocol (h1/h2/h3), H3 port, age, last accessed, last probed, suppression flags, cooldown timers, consecutive failure counts. --- ## Instrumentation Points ### TCP Passthrough (`rustproxy-passthrough`) **Connection lifecycle** — `tcp_listener.rs`: - Accept: `conn_tracker.connection_opened(&ip)` (rate limiter) + `ConnectionTrackerGuard` RAII - Route match: `metrics.connection_opened(route_id, source_ip)` + `ConnectionGuard` RAII - Close: Both guards call their respective `_closed()` methods on drop **Byte recording** — `forwarder.rs` (`forward_bidirectional_with_timeouts`): - Initial peeked data recorded immediately - Per-chunk in both directions: `record_bytes(n, 0, ...)` / `record_bytes(0, n, ...)` - Same pattern in `forward_bidirectional_split_with_timeouts` (tcp_listener.rs) for TLS-terminated paths ### HTTP Proxy (`rustproxy-http`) **Request counting** — `proxy_service.rs`: - `record_http_request()` called once per request after route matching succeeds **Body byte counting** — `counting_body.rs` wrapping: - Request bodies: `CountingBody::new(body, ..., Direction::In)` — counts client-to-upstream bytes - Response bodies: `CountingBody::new(body, ..., Direction::Out)` — counts upstream-to-client bytes - Batched flush every 64KB (`BYTE_FLUSH_THRESHOLD = 65_536`), remainder flushed on drop - Also updates `connection_activity` atomic (idle watchdog) and `active_requests` counter (streaming detection) **Backend metrics** — `proxy_service.rs`: - `backend_connection_opened(key, connect_time)` — after TCP/TLS connect succeeds - `backend_connection_closed(key)` — on teardown - `backend_connect_error(key)` — TCP/TLS connect failure or timeout - `backend_handshake_error(key)` — H1/H2 protocol handshake failure - `backend_request_error(key)` — send_request failure - `backend_h2_failure(key)` — H2 attempted, fell back to H1 - `backend_pool_hit(key)` / `backend_pool_miss(key)` — connection pool reuse - `set_backend_protocol(key, proto)` — records detected protocol **WebSocket** — `proxy_service.rs`: - Does NOT use CountingBody; records bytes directly per-chunk in both directions of the bidirectional copy loop ### QUIC (`rustproxy-passthrough`) **Connection level** — `quic_handler.rs`: - `connection_opened` / `connection_closed` via `QuicConnGuard` RAII - `conn_tracker.connection_opened/closed` for rate limiting **Stream level**: - For QUIC-to-TCP stream forwarding: `record_bytes(bytes_in, bytes_out, ...)` called once per stream at completion (post-hoc, not per-chunk) - For HTTP/3: delegates to `HttpProxyService.handle_request()`, so all HTTP proxy metrics apply **H3 specifics** — `h3_service.rs`: - `ProtocolGuard::frontend("h3")` tracks the H3 connection - H3 request bodies: `record_bytes(data.len(), 0, ...)` called directly (not CountingBody) since H3 uses `stream.send_data()` - H3 response bodies: wrapped in CountingBody like HTTP/1 and HTTP/2 ### UDP (`rustproxy-passthrough`) **Session lifecycle** — `udp_listener.rs` / `udp_session.rs`: - `udp_session_opened()` + `connection_opened(route_id, source_ip)` on new session - `udp_session_closed()` + `connection_closed(route_id, source_ip)` on idle reap or port drain **Datagram counting** — `udp_listener.rs`: - Inbound: `record_bytes(len, 0, ...)` + `record_datagram_in()` - Outbound (backend reply): `record_bytes(0, len, ...)` + `record_datagram_out()` --- ## Sampling Loop `lib.rs` spawns a tokio task at configurable interval (default 1000ms): ```rust loop { tokio::select! { _ = cancel => break, _ = interval.tick() => { metrics.sample_all(); conn_tracker.cleanup_stale_timestamps(); http_proxy.cleanup_all_rate_limiters(); } } } ``` `sample_all()` performs in one pass: 1. Drains `global_pending_tp_in/out` into global ThroughputTracker, samples 2. Drains per-route pending counters into per-route trackers, samples each 3. Samples idle route trackers (no new data) to advance their window 4. Drains per-IP pending counters into per-IP trackers, samples each 5. Drains `pending_http_requests` into HTTP request throughput tracker 6. Prunes orphaned per-IP entries (bytes/throughput maps with no matching ip_connections key) 7. Prunes orphaned per-backend entries (error/stats maps with no matching active/total key) --- ## Data Flow: Rust to TypeScript ``` MetricsCollector.snapshot() ├── reads all AtomicU64 counters ├── iterates DashMaps (routes, IPs, backends) ├── locks ThroughputTrackers for instant/recent rates + history └── produces Metrics struct RustProxy::get_metrics() ├── calls snapshot() ├── enriches with detectedProtocols from HTTP proxy protocol cache └── returns Metrics management.rs "getMetrics" IPC command ├── calls get_metrics() ├── serde_json::to_value (camelCase) └── writes JSON to stdout RustProxyBridge (TypeScript) ├── reads JSON from Rust process stdout └── returns parsed object RustMetricsAdapter ├── setInterval polls bridge.getMetrics() every 1s ├── stores raw JSON in this.cache └── IMetrics methods read synchronously from cache SmartProxy.getMetrics() └── returns the RustMetricsAdapter instance ``` ### IPC JSON Shape (Metrics) ```json { "activeConnections": 42, "totalConnections": 1000, "bytesIn": 123456789, "bytesOut": 987654321, "throughputInBytesPerSec": 50000, "throughputOutBytesPerSec": 80000, "throughputRecentInBytesPerSec": 45000, "throughputRecentOutBytesPerSec": 75000, "routes": { "": { "activeConnections": 5, "totalConnections": 100, "bytesIn": 0, "bytesOut": 0, "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0, "throughputRecentInBytesPerSec": 0, "throughputRecentOutBytesPerSec": 0 } }, "ips": { "": { "activeConnections": 2, "totalConnections": 10, "bytesIn": 0, "bytesOut": 0, "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0, "domainRequests": { "example.com": 4821, "api.example.com": 312 } } }, "backends": { "": { "activeConnections": 3, "totalConnections": 50, "protocol": "h2", "connectErrors": 0, "handshakeErrors": 0, "requestErrors": 0, "totalConnectTimeUs": 150000, "connectCount": 50, "poolHits": 40, "poolMisses": 10, "h2Failures": 1 } }, "throughputHistory": [ { "timestampMs": 1713000000000, "bytesIn": 50000, "bytesOut": 80000 } ], "totalHttpRequests": 5000, "httpRequestsPerSec": 100, "httpRequestsPerSecRecent": 95, "activeUdpSessions": 0, "totalUdpSessions": 5, "totalDatagramsIn": 1000, "totalDatagramsOut": 1000, "frontendProtocols": { "h1Active": 10, "h1Total": 500, "h2Active": 5, "h2Total": 200, "h3Active": 1, "h3Total": 50, "wsActive": 2, "wsTotal": 30, "otherActive": 0, "otherTotal": 0 }, "backendProtocols": { "...same shape..." }, "detectedProtocols": [ { "host": "backend", "port": 443, "domain": "example.com", "protocol": "h2", "h3Port": 443, "ageSecs": 120, "lastAccessedSecs": 5, "lastProbedSecs": 120, "h2Suppressed": false, "h3Suppressed": false, "h2CooldownRemainingSecs": null, "h3CooldownRemainingSecs": null, "h2ConsecutiveFailures": null, "h3ConsecutiveFailures": null } ] } ``` ### IPC JSON Shape (Statistics) Lightweight administrative summary, fetched on-demand (not polled): ```json { "activeConnections": 42, "totalConnections": 1000, "routesCount": 5, "listeningPorts": [80, 443, 8443], "uptimeSeconds": 86400 } ``` --- ## TypeScript Consumer API `SmartProxy.getMetrics()` returns an `IMetrics` object. All members are synchronous methods reading from the polled cache. ### connections | Method | Return | Source | |---|---|---| | `active()` | `number` | `cache.activeConnections` | | `total()` | `number` | `cache.totalConnections` | | `byRoute()` | `Map` | `cache.routes[name].activeConnections` | | `byIP()` | `Map` | `cache.ips[ip].activeConnections` | | `topIPs(limit?)` | `Array<{ip, count}>` | `cache.ips` sorted by active desc, default 10 | | `domainRequestsByIP()` | `Map>` | `cache.ips[ip].domainRequests` | | `topDomainRequests(limit?)` | `Array<{ip, domain, count}>` | Flattened from all IPs, sorted by count desc, default 20 | | `frontendProtocols()` | `IProtocolDistribution` | `cache.frontendProtocols.*` | | `backendProtocols()` | `IProtocolDistribution` | `cache.backendProtocols.*` | ### throughput | Method | Return | Source | |---|---|---| | `instant()` | `{in, out}` | `cache.throughputInBytesPerSec/Out` | | `recent()` | `{in, out}` | `cache.throughputRecentInBytesPerSec/Out` | | `average()` | `{in, out}` | Falls back to `instant()` (not wired to windowed average) | | `custom(seconds)` | `{in, out}` | Falls back to `instant()` (not wired) | | `history(seconds)` | `IThroughputHistoryPoint[]` | `cache.throughputHistory` sliced to last N entries | | `byRoute(windowSeconds?)` | `Map` | `cache.routes[name].throughputIn/OutBytesPerSec` | | `byIP(windowSeconds?)` | `Map` | `cache.ips[ip].throughputIn/OutBytesPerSec` | ### requests | Method | Return | Source | |---|---|---| | `perSecond()` | `number` | `cache.httpRequestsPerSec` | | `perMinute()` | `number` | `cache.httpRequestsPerSecRecent * 60` | | `total()` | `number` | `cache.totalHttpRequests` (fallback: totalConnections) | ### totals | Method | Return | Source | |---|---|---| | `bytesIn()` | `number` | `cache.bytesIn` | | `bytesOut()` | `number` | `cache.bytesOut` | | `connections()` | `number` | `cache.totalConnections` | ### backends | Method | Return | Source | |---|---|---| | `byBackend()` | `Map` | `cache.backends[key].*` with computed `avgConnectTimeMs` and `poolHitRate` | | `protocols()` | `Map` | `cache.backends[key].protocol` | | `topByErrors(limit?)` | `IBackendMetrics[]` | Sorted by total errors desc | | `detectedProtocols()` | `IProtocolCacheEntry[]` | `cache.detectedProtocols` passthrough | `IBackendMetrics`: `{ protocol, activeConnections, totalConnections, connectErrors, handshakeErrors, requestErrors, avgConnectTimeMs, poolHitRate, h2Failures }` ### udp | Method | Return | Source | |---|---|---| | `activeSessions()` | `number` | `cache.activeUdpSessions` | | `totalSessions()` | `number` | `cache.totalUdpSessions` | | `datagramsIn()` | `number` | `cache.totalDatagramsIn` | | `datagramsOut()` | `number` | `cache.totalDatagramsOut` | ### percentiles (stub) `connectionDuration()` and `bytesTransferred()` always return zeros. Not implemented. --- ## Configuration ```typescript interface IMetricsConfig { enabled: boolean; // default true sampleIntervalMs: number; // default 1000 (1Hz sampling + TS polling) retentionSeconds: number; // default 3600 (ThroughputTracker capacity) enableDetailedTracking: boolean; enablePercentiles: boolean; cacheResultsMs: number; prometheusEnabled: boolean; // not wired prometheusPath: string; // not wired prometheusPrefix: string; // not wired } ``` Rust-side config (`MetricsConfig` in `rustproxy-config`): ```rust pub struct MetricsConfig { pub enabled: Option, pub sample_interval_ms: Option, // default 1000 pub retention_seconds: Option, // default 3600 } ``` --- ## Design Decisions **Lock-free hot path.** `record_bytes()` is the most frequently called method (per-chunk in TCP, per-64KB in HTTP). It only touches `AtomicU64` with `Relaxed` ordering and short-circuits zero-byte directions to skip DashMap lookups entirely. **CountingBody batching.** HTTP body frames are typically 16KB. Flushing to MetricsCollector every 64KB reduces DashMap shard contention by ~4x compared to per-frame recording. **RAII guards everywhere.** `ConnectionGuard`, `ConnectionTrackerGuard`, `QuicConnGuard`, `ProtocolGuard`, `FrontendProtocolTracker` all use Drop to guarantee counter cleanup on all exit paths including panics. **Saturating decrements.** Protocol counters use `fetch_update` loops instead of `fetch_sub` to prevent underflow to `u64::MAX` from concurrent close races. **Bounded memory.** Per-IP entries evicted on last connection close. Per-backend entries evicted on last connection close. Snapshot caps IPs and backends at 100 each. `sample_all()` prunes orphaned entries every second. **Two-phase throughput.** Pending bytes accumulate in lock-free atomics. The 1Hz cold path drains them into Mutex-guarded ThroughputTrackers. This means the hot path never contends on a Mutex, while the cold path does minimal work (one drain + one sample per tracker). --- ## Known Gaps | Gap | Status | |---|---| | `throughput.average()` / `throughput.custom(seconds)` | Fall back to `instant()`. Not wired to Rust windowed queries. | | `percentiles.connectionDuration()` / `percentiles.bytesTransferred()` | Stub returning zeros. No histogram in Rust. | | Prometheus export | Config fields exist but not wired to any exporter. | | `LogDeduplicator` | Implemented in `rustproxy-metrics` but not connected to any call site. | | Rate limit hit counters | Rate-limited requests return 429 but no counter is recorded in MetricsCollector. | | QUIC stream byte counting | Post-hoc (per-stream totals after close), not per-chunk like TCP. | | Throughput history in snapshot | Capped at 60 samples. TS `history(seconds)` cannot return more than 60 points regardless of `retentionSeconds`. | | Per-route total connections / bytes | Available in Rust JSON but `IMetrics.connections.byRoute()` only exposes active connections. | | Per-IP total connections / bytes | Available in Rust JSON but `IMetrics.connections.byIP()` only exposes active connections. | | IPC response typing | `RustProxyBridge` declares `result: any` for both metrics commands. No type-safe response. |