# SmartProxy Metrics System ## Architecture Two-tier design separating the data plane from the observation plane: **Hot path (per-chunk, lock-free):** All recording in the proxy data plane touches only `AtomicU64` counters. No `Mutex` is ever acquired on the forwarding path. `CountingBody` batches flushes every 64KB to reduce DashMap shard contention. **Cold path (1Hz sampling):** A background tokio task drains pending atomics into `ThroughputTracker` circular buffers (Mutex-guarded), producing per-second throughput history. Same task prunes orphaned entries and cleans up rate limiter state. **Read path (on-demand):** `snapshot()` reads all atomics and locks ThroughputTrackers to build a serializable `Metrics` struct. TypeScript polls this at 1s intervals via IPC. ``` Data Plane (lock-free) Background (1Hz) Read Path ───────────────────── ────────────────── ───────── record_bytes() ──> AtomicU64 ──┐ record_http_request() ──> AtomicU64 ──┤ connection_opened/closed() ──> AtomicU64 ──┤ sample_all() snapshot() backend_*() ──> DashMap ──┤────> drain atomics ──────> Metrics struct protocol_*() ──> AtomicU64 ──┤ feed ThroughputTrackers ──> JSON datagram_*() ──> AtomicU64 ──┘ prune orphans ──> IPC stdout ──> TS cache ──> IMetrics API ``` ### Key Types | Type | Crate | Purpose | |---|---|---| | `MetricsCollector` | `rustproxy-metrics` | Central store. All DashMaps, atomics, and throughput trackers | | `ThroughputTracker` | `rustproxy-metrics` | Circular buffer of 1Hz samples. Default 3600 capacity (1 hour) | | `ForwardMetricsCtx` | `rustproxy-passthrough` | Carries `Arc` + route_id + source_ip through TCP forwarding | | `CountingBody` | `rustproxy-http` | Wraps HTTP bodies, batches byte recording per 64KB, flushes on drop | | `ProtocolGuard` | `rustproxy-http` | RAII guard for frontend/backend protocol active/total counters | | `ConnectionGuard` | `rustproxy-passthrough` | RAII guard calling `connection_closed()` on drop | | `RustMetricsAdapter` | TypeScript | Polls Rust via IPC, implements `IMetrics` interface over cached JSON | --- ## What's Collected ### Global Counters | Metric | Type | Updated by | |---|---|---| | Active connections | AtomicU64 | `connection_opened/closed` | | Total connections (lifetime) | AtomicU64 | `connection_opened` | | Total bytes in | AtomicU64 | `record_bytes` | | Total bytes out | AtomicU64 | `record_bytes` | | Total HTTP requests | AtomicU64 | `record_http_request` | | Active UDP sessions | AtomicU64 | `udp_session_opened/closed` | | Total UDP sessions | AtomicU64 | `udp_session_opened` | | Total datagrams in | AtomicU64 | `record_datagram_in` | | Total datagrams out | AtomicU64 | `record_datagram_out` | ### Per-Route Metrics (keyed by route ID string) | Metric | Storage | |---|---| | Active connections | `DashMap` | | Total connections | `DashMap` | | Bytes in / out | `DashMap` | | Pending throughput (in, out) | `DashMap` | | Throughput history | `DashMap>` | Entries are pruned via `retain_routes()` when routes are removed. ### Per-IP Metrics (keyed by IP string) | Metric | Storage | |---|---| | Active connections | `DashMap` | | Total connections | `DashMap` | | Bytes in / out | `DashMap` | | Pending throughput (in, out) | `DashMap` | | Throughput history | `DashMap>` | | Domain requests | `DashMap>` (IP → domain → count) | All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in `sample_all()` catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending. **Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation. ### Per-Backend Metrics (keyed by "host:port") | Metric | Storage | |---|---| | Active connections | `DashMap` | | Total connections | `DashMap` | | Detected protocol (h1/h2/h3) | `DashMap` | | Connect errors | `DashMap` | | Handshake errors | `DashMap` | | Request errors | `DashMap` | | Total connect time (microseconds) | `DashMap` | | Connect count | `DashMap` | | Pool hits | `DashMap` | | Pool misses | `DashMap` | | H2 failures (fallback to H1) | `DashMap` | All per-backend maps are evicted when active count reaches 0. Pruned via `retain_backends()`. ### Frontend Protocol Distribution Tracked via `ProtocolGuard` RAII guards and `FrontendProtocolTracker`. Five protocol categories, each with active + total counters (AtomicU64): | Protocol | Where detected | |---|---| | h1 | `FrontendProtocolTracker` on first HTTP/1.x request | | h2 | `FrontendProtocolTracker` on first HTTP/2 request | | h3 | `ProtocolGuard::frontend("h3")` in H3ProxyService | | ws | `ProtocolGuard::frontend("ws")` on WebSocket upgrade | | other | Fallback (TCP passthrough without HTTP) | Uses `fetch_update` for saturating decrements to prevent underflow races. ### Backend Protocol Distribution Same five categories (h1/h2/h3/ws/other), tracked via `ProtocolGuard::backend()` at connection establishment. Backend h2 failures (fallback to h1) are separately counted. ### Throughput History `ThroughputTracker` is a circular buffer storing `ThroughputSample { timestamp_ms, bytes_in, bytes_out }` at 1Hz. - Global tracker: 1 instance, default 3600 capacity - Per-route trackers: 1 per active route - Per-IP trackers: 1 per connected IP (evicted with the IP) - HTTP request tracker: reuses ThroughputTracker with bytes_in = request count, bytes_out = 0 Query methods: - `instant()` — last 1 second average - `recent()` — last 10 seconds average - `throughput(N)` — last N seconds average - `history(N)` — last N raw samples in chronological order Snapshots return 60 samples of global throughput history. ### Protocol Detection Cache Not part of MetricsCollector. Maintained by `HttpProxyService`'s protocol detection system. Injected into the metrics snapshot at read time by `get_metrics()`. Each entry records: host, port, domain, detected protocol (h1/h2/h3), H3 port, age, last accessed, last probed, suppression flags, cooldown timers, consecutive failure counts. --- ## Instrumentation Points ### TCP Passthrough (`rustproxy-passthrough`) **Connection lifecycle** — `tcp_listener.rs`: - Accept: `conn_tracker.connection_opened(&ip)` (rate limiter) + `ConnectionTrackerGuard` RAII - Route match: `metrics.connection_opened(route_id, source_ip)` + `ConnectionGuard` RAII - Close: Both guards call their respective `_closed()` methods on drop **Byte recording** — `forwarder.rs` (`forward_bidirectional_with_timeouts`): - Initial peeked data recorded immediately - Per-chunk in both directions: `record_bytes(n, 0, ...)` / `record_bytes(0, n, ...)` - Same pattern in `forward_bidirectional_split_with_timeouts` (tcp_listener.rs) for TLS-terminated paths ### HTTP Proxy (`rustproxy-http`) **Request counting** — `proxy_service.rs`: - `record_http_request()` called once per request after route matching succeeds **Body byte counting** — `counting_body.rs` wrapping: - Request bodies: `CountingBody::new(body, ..., Direction::In)` — counts client-to-upstream bytes - Response bodies: `CountingBody::new(body, ..., Direction::Out)` — counts upstream-to-client bytes - Batched flush every 64KB (`BYTE_FLUSH_THRESHOLD = 65_536`), remainder flushed on drop - Also updates `connection_activity` atomic (idle watchdog) and `active_requests` counter (streaming detection) **Backend metrics** — `proxy_service.rs`: - `backend_connection_opened(key, connect_time)` — after TCP/TLS connect succeeds - `backend_connection_closed(key)` — on teardown - `backend_connect_error(key)` — TCP/TLS connect failure or timeout - `backend_handshake_error(key)` — H1/H2 protocol handshake failure - `backend_request_error(key)` — send_request failure - `backend_h2_failure(key)` — H2 attempted, fell back to H1 - `backend_pool_hit(key)` / `backend_pool_miss(key)` — connection pool reuse - `set_backend_protocol(key, proto)` — records detected protocol **WebSocket** — `proxy_service.rs`: - Does NOT use CountingBody; records bytes directly per-chunk in both directions of the bidirectional copy loop ### QUIC (`rustproxy-passthrough`) **Connection level** — `quic_handler.rs`: - `connection_opened` / `connection_closed` via `QuicConnGuard` RAII - `conn_tracker.connection_opened/closed` for rate limiting **Stream level**: - For QUIC-to-TCP stream forwarding: `record_bytes(bytes_in, bytes_out, ...)` called once per stream at completion (post-hoc, not per-chunk) - For HTTP/3: delegates to `HttpProxyService.handle_request()`, so all HTTP proxy metrics apply **H3 specifics** — `h3_service.rs`: - `ProtocolGuard::frontend("h3")` tracks the H3 connection - H3 request bodies: `record_bytes(data.len(), 0, ...)` called directly (not CountingBody) since H3 uses `stream.send_data()` - H3 response bodies: wrapped in CountingBody like HTTP/1 and HTTP/2 ### UDP (`rustproxy-passthrough`) **Session lifecycle** — `udp_listener.rs` / `udp_session.rs`: - `udp_session_opened()` + `connection_opened(route_id, source_ip)` on new session - `udp_session_closed()` + `connection_closed(route_id, source_ip)` on idle reap or port drain **Datagram counting** — `udp_listener.rs`: - Inbound: `record_bytes(len, 0, ...)` + `record_datagram_in()` - Outbound (backend reply): `record_bytes(0, len, ...)` + `record_datagram_out()` --- ## Sampling Loop `lib.rs` spawns a tokio task at configurable interval (default 1000ms): ```rust loop { tokio::select! { _ = cancel => break, _ = interval.tick() => { metrics.sample_all(); conn_tracker.cleanup_stale_timestamps(); http_proxy.cleanup_all_rate_limiters(); } } } ``` `sample_all()` performs in one pass: 1. Drains `global_pending_tp_in/out` into global ThroughputTracker, samples 2. Drains per-route pending counters into per-route trackers, samples each 3. Samples idle route trackers (no new data) to advance their window 4. Drains per-IP pending counters into per-IP trackers, samples each 5. Drains `pending_http_requests` into HTTP request throughput tracker 6. Prunes orphaned per-IP entries (bytes/throughput maps with no matching ip_connections key) 7. Prunes orphaned per-backend entries (error/stats maps with no matching active/total key) --- ## Data Flow: Rust to TypeScript ``` MetricsCollector.snapshot() ├── reads all AtomicU64 counters ├── iterates DashMaps (routes, IPs, backends) ├── locks ThroughputTrackers for instant/recent rates + history └── produces Metrics struct RustProxy::get_metrics() ├── calls snapshot() ├── enriches with detectedProtocols from HTTP proxy protocol cache └── returns Metrics management.rs "getMetrics" IPC command ├── calls get_metrics() ├── serde_json::to_value (camelCase) └── writes JSON to stdout RustProxyBridge (TypeScript) ├── reads JSON from Rust process stdout └── returns parsed object RustMetricsAdapter ├── setInterval polls bridge.getMetrics() every 1s ├── stores raw JSON in this.cache └── IMetrics methods read synchronously from cache SmartProxy.getMetrics() └── returns the RustMetricsAdapter instance ``` ### IPC JSON Shape (Metrics) ```json { "activeConnections": 42, "totalConnections": 1000, "bytesIn": 123456789, "bytesOut": 987654321, "throughputInBytesPerSec": 50000, "throughputOutBytesPerSec": 80000, "throughputRecentInBytesPerSec": 45000, "throughputRecentOutBytesPerSec": 75000, "routes": { "": { "activeConnections": 5, "totalConnections": 100, "bytesIn": 0, "bytesOut": 0, "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0, "throughputRecentInBytesPerSec": 0, "throughputRecentOutBytesPerSec": 0 } }, "ips": { "": { "activeConnections": 2, "totalConnections": 10, "bytesIn": 0, "bytesOut": 0, "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0, "domainRequests": { "example.com": 4821, "api.example.com": 312 } } }, "backends": { "": { "activeConnections": 3, "totalConnections": 50, "protocol": "h2", "connectErrors": 0, "handshakeErrors": 0, "requestErrors": 0, "totalConnectTimeUs": 150000, "connectCount": 50, "poolHits": 40, "poolMisses": 10, "h2Failures": 1 } }, "throughputHistory": [ { "timestampMs": 1713000000000, "bytesIn": 50000, "bytesOut": 80000 } ], "totalHttpRequests": 5000, "httpRequestsPerSec": 100, "httpRequestsPerSecRecent": 95, "activeUdpSessions": 0, "totalUdpSessions": 5, "totalDatagramsIn": 1000, "totalDatagramsOut": 1000, "frontendProtocols": { "h1Active": 10, "h1Total": 500, "h2Active": 5, "h2Total": 200, "h3Active": 1, "h3Total": 50, "wsActive": 2, "wsTotal": 30, "otherActive": 0, "otherTotal": 0 }, "backendProtocols": { "...same shape..." }, "detectedProtocols": [ { "host": "backend", "port": 443, "domain": "example.com", "protocol": "h2", "h3Port": 443, "ageSecs": 120, "lastAccessedSecs": 5, "lastProbedSecs": 120, "h2Suppressed": false, "h3Suppressed": false, "h2CooldownRemainingSecs": null, "h3CooldownRemainingSecs": null, "h2ConsecutiveFailures": null, "h3ConsecutiveFailures": null } ] } ``` ### IPC JSON Shape (Statistics) Lightweight administrative summary, fetched on-demand (not polled): ```json { "activeConnections": 42, "totalConnections": 1000, "routesCount": 5, "listeningPorts": [80, 443, 8443], "uptimeSeconds": 86400 } ``` --- ## TypeScript Consumer API `SmartProxy.getMetrics()` returns an `IMetrics` object. All members are synchronous methods reading from the polled cache. ### connections | Method | Return | Source | |---|---|---| | `active()` | `number` | `cache.activeConnections` | | `total()` | `number` | `cache.totalConnections` | | `byRoute()` | `Map` | `cache.routes[name].activeConnections` | | `byIP()` | `Map` | `cache.ips[ip].activeConnections` | | `topIPs(limit?)` | `Array<{ip, count}>` | `cache.ips` sorted by active desc, default 10 | | `domainRequestsByIP()` | `Map>` | `cache.ips[ip].domainRequests` | | `topDomainRequests(limit?)` | `Array<{ip, domain, count}>` | Flattened from all IPs, sorted by count desc, default 20 | | `frontendProtocols()` | `IProtocolDistribution` | `cache.frontendProtocols.*` | | `backendProtocols()` | `IProtocolDistribution` | `cache.backendProtocols.*` | ### throughput | Method | Return | Source | |---|---|---| | `instant()` | `{in, out}` | `cache.throughputInBytesPerSec/Out` | | `recent()` | `{in, out}` | `cache.throughputRecentInBytesPerSec/Out` | | `average()` | `{in, out}` | Falls back to `instant()` (not wired to windowed average) | | `custom(seconds)` | `{in, out}` | Falls back to `instant()` (not wired) | | `history(seconds)` | `IThroughputHistoryPoint[]` | `cache.throughputHistory` sliced to last N entries | | `byRoute(windowSeconds?)` | `Map` | `cache.routes[name].throughputIn/OutBytesPerSec` | | `byIP(windowSeconds?)` | `Map` | `cache.ips[ip].throughputIn/OutBytesPerSec` | ### requests | Method | Return | Source | |---|---|---| | `perSecond()` | `number` | `cache.httpRequestsPerSec` | | `perMinute()` | `number` | `cache.httpRequestsPerSecRecent * 60` | | `total()` | `number` | `cache.totalHttpRequests` (fallback: totalConnections) | ### totals | Method | Return | Source | |---|---|---| | `bytesIn()` | `number` | `cache.bytesIn` | | `bytesOut()` | `number` | `cache.bytesOut` | | `connections()` | `number` | `cache.totalConnections` | ### backends | Method | Return | Source | |---|---|---| | `byBackend()` | `Map` | `cache.backends[key].*` with computed `avgConnectTimeMs` and `poolHitRate` | | `protocols()` | `Map` | `cache.backends[key].protocol` | | `topByErrors(limit?)` | `IBackendMetrics[]` | Sorted by total errors desc | | `detectedProtocols()` | `IProtocolCacheEntry[]` | `cache.detectedProtocols` passthrough | `IBackendMetrics`: `{ protocol, activeConnections, totalConnections, connectErrors, handshakeErrors, requestErrors, avgConnectTimeMs, poolHitRate, h2Failures }` ### udp | Method | Return | Source | |---|---|---| | `activeSessions()` | `number` | `cache.activeUdpSessions` | | `totalSessions()` | `number` | `cache.totalUdpSessions` | | `datagramsIn()` | `number` | `cache.totalDatagramsIn` | | `datagramsOut()` | `number` | `cache.totalDatagramsOut` | ### percentiles (stub) `connectionDuration()` and `bytesTransferred()` always return zeros. Not implemented. --- ## Configuration ```typescript interface IMetricsConfig { enabled: boolean; // default true sampleIntervalMs: number; // default 1000 (1Hz sampling + TS polling) retentionSeconds: number; // default 3600 (ThroughputTracker capacity) enableDetailedTracking: boolean; enablePercentiles: boolean; cacheResultsMs: number; prometheusEnabled: boolean; // not wired prometheusPath: string; // not wired prometheusPrefix: string; // not wired } ``` Rust-side config (`MetricsConfig` in `rustproxy-config`): ```rust pub struct MetricsConfig { pub enabled: Option, pub sample_interval_ms: Option, // default 1000 pub retention_seconds: Option, // default 3600 } ``` --- ## Design Decisions **Lock-free hot path.** `record_bytes()` is the most frequently called method (per-chunk in TCP, per-64KB in HTTP). It only touches `AtomicU64` with `Relaxed` ordering and short-circuits zero-byte directions to skip DashMap lookups entirely. **CountingBody batching.** HTTP body frames are typically 16KB. Flushing to MetricsCollector every 64KB reduces DashMap shard contention by ~4x compared to per-frame recording. **RAII guards everywhere.** `ConnectionGuard`, `ConnectionTrackerGuard`, `QuicConnGuard`, `ProtocolGuard`, `FrontendProtocolTracker` all use Drop to guarantee counter cleanup on all exit paths including panics. **Saturating decrements.** Protocol counters use `fetch_update` loops instead of `fetch_sub` to prevent underflow to `u64::MAX` from concurrent close races. **Bounded memory.** Per-IP entries evicted on last connection close. Per-backend entries evicted on last connection close. Snapshot caps IPs and backends at 100 each. `sample_all()` prunes orphaned entries every second. **Two-phase throughput.** Pending bytes accumulate in lock-free atomics. The 1Hz cold path drains them into Mutex-guarded ThroughputTrackers. This means the hot path never contends on a Mutex, while the cold path does minimal work (one drain + one sample per tracker). --- ## Known Gaps | Gap | Status | |---|---| | `throughput.average()` / `throughput.custom(seconds)` | Fall back to `instant()`. Not wired to Rust windowed queries. | | `percentiles.connectionDuration()` / `percentiles.bytesTransferred()` | Stub returning zeros. No histogram in Rust. | | Prometheus export | Config fields exist but not wired to any exporter. | | `LogDeduplicator` | Implemented in `rustproxy-metrics` but not connected to any call site. | | Rate limit hit counters | Rate-limited requests return 429 but no counter is recorded in MetricsCollector. | | QUIC stream byte counting | Post-hoc (per-stream totals after close), not per-chunk like TCP. | | Throughput history in snapshot | Capped at 60 samples. TS `history(seconds)` cannot return more than 60 points regardless of `retentionSeconds`. | | Per-route total connections / bytes | Available in Rust JSON but `IMetrics.connections.byRoute()` only exposes active connections. | | Per-IP total connections / bytes | Available in Rust JSON but `IMetrics.connections.byIP()` only exposes active connections. | | IPC response typing | `RustProxyBridge` declares `result: any` for both metrics commands. No type-safe response. |