Files
smartproxy/readme.metrics.md

21 KiB

SmartProxy Metrics System

Architecture

Two-tier design separating the data plane from the observation plane:

Hot path (per-chunk, lock-free): All recording in the proxy data plane touches only AtomicU64 counters. No Mutex is ever acquired on the forwarding path. CountingBody batches flushes every 64KB to reduce DashMap shard contention.

Cold path (1Hz sampling): A background tokio task drains pending atomics into ThroughputTracker circular buffers (Mutex-guarded), producing per-second throughput history. Same task prunes orphaned entries and cleans up rate limiter state.

Read path (on-demand): snapshot() reads all atomics and locks ThroughputTrackers to build a serializable Metrics struct. TypeScript polls this at 1s intervals via IPC.

Data Plane (lock-free)                Background (1Hz)              Read Path
─────────────────────                 ──────────────────             ─────────
record_bytes() ──> AtomicU64 ──┐
record_http_request() ──> AtomicU64 ──┤
connection_opened/closed() ──> AtomicU64 ──┤     sample_all()        snapshot()
backend_*() ──> DashMap<AtomicU64> ──┤────> drain atomics ──────> Metrics struct
protocol_*() ──> AtomicU64 ──┤       feed ThroughputTrackers      ──> JSON
datagram_*() ──> AtomicU64 ──┘       prune orphans                ──> IPC stdout
                                                                   ──> TS cache
                                                                   ──> IMetrics API

Key Types

Type Crate Purpose
MetricsCollector rustproxy-metrics Central store. All DashMaps, atomics, and throughput trackers
ThroughputTracker rustproxy-metrics Circular buffer of 1Hz samples. Default 3600 capacity (1 hour)
ForwardMetricsCtx rustproxy-passthrough Carries Arc<MetricsCollector> + route_id + source_ip through TCP forwarding
CountingBody rustproxy-http Wraps HTTP bodies, batches byte recording per 64KB, flushes on drop
ProtocolGuard rustproxy-http RAII guard for frontend/backend protocol active/total counters
ConnectionGuard rustproxy-passthrough RAII guard calling connection_closed() on drop
RustMetricsAdapter TypeScript Polls Rust via IPC, implements IMetrics interface over cached JSON

What's Collected

Global Counters

Metric Type Updated by
Active connections AtomicU64 connection_opened/closed
Total connections (lifetime) AtomicU64 connection_opened
Total bytes in AtomicU64 record_bytes
Total bytes out AtomicU64 record_bytes
Total HTTP requests AtomicU64 record_http_request
Active UDP sessions AtomicU64 udp_session_opened/closed
Total UDP sessions AtomicU64 udp_session_opened
Total datagrams in AtomicU64 record_datagram_in
Total datagrams out AtomicU64 record_datagram_out

Per-Route Metrics (keyed by route ID string)

Metric Storage
Active connections DashMap<String, AtomicU64>
Total connections DashMap<String, AtomicU64>
Bytes in / out DashMap<String, AtomicU64>
Pending throughput (in, out) DashMap<String, (AtomicU64, AtomicU64)>
Throughput history DashMap<String, Mutex<ThroughputTracker>>

Entries are pruned via retain_routes() when routes are removed.

Per-IP Metrics (keyed by IP string)

Metric Storage
Active connections DashMap<String, AtomicU64>
Total connections DashMap<String, AtomicU64>
Bytes in / out DashMap<String, AtomicU64>
Pending throughput (in, out) DashMap<String, (AtomicU64, AtomicU64)>
Throughput history DashMap<String, Mutex<ThroughputTracker>>
Domain requests DashMap<String, DashMap<String, AtomicU64>> (IP → domain → count)

All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in sample_all() catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending.

Domain request tracking: Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Domain keys are normalized to lowercase with any trailing dot stripped so the same hostname does not fragment across multiple counters. Capped at 256 domains per IP (MAX_DOMAINS_PER_IP) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.

Per-Backend Metrics (keyed by "host:port")

Metric Storage
Active connections DashMap<String, AtomicU64>
Total connections DashMap<String, AtomicU64>
Detected protocol (h1/h2/h3) DashMap<String, String>
Connect errors DashMap<String, AtomicU64>
Handshake errors DashMap<String, AtomicU64>
Request errors DashMap<String, AtomicU64>
Total connect time (microseconds) DashMap<String, AtomicU64>
Connect count DashMap<String, AtomicU64>
Pool hits DashMap<String, AtomicU64>
Pool misses DashMap<String, AtomicU64>
H2 failures (fallback to H1) DashMap<String, AtomicU64>

All per-backend maps are evicted when active count reaches 0. Pruned via retain_backends().

Frontend Protocol Distribution

Tracked via ProtocolGuard RAII guards and FrontendProtocolTracker. Five protocol categories, each with active + total counters (AtomicU64):

Protocol Where detected
h1 FrontendProtocolTracker on first HTTP/1.x request
h2 FrontendProtocolTracker on first HTTP/2 request
h3 ProtocolGuard::frontend("h3") in H3ProxyService
ws ProtocolGuard::frontend("ws") on WebSocket upgrade
other Fallback (TCP passthrough without HTTP)

Uses fetch_update for saturating decrements to prevent underflow races. The same saturating-close pattern is also used for connection and UDP active gauges.

Backend Protocol Distribution

Same five categories (h1/h2/h3/ws/other), tracked via ProtocolGuard::backend() at connection establishment. Backend h2 failures (fallback to h1) are separately counted.

Throughput History

ThroughputTracker is a circular buffer storing ThroughputSample { timestamp_ms, bytes_in, bytes_out } at 1Hz.

  • Global tracker: 1 instance, default 3600 capacity
  • Per-route trackers: 1 per active route
  • Per-IP trackers: 1 per connected IP (evicted with the IP)
  • HTTP request tracker: reuses ThroughputTracker with bytes_in = request count, bytes_out = 0

Query methods:

  • instant() — last 1 second average
  • recent() — last 10 seconds average
  • throughput(N) — last N seconds average
  • history(N) — last N raw samples in chronological order

Snapshots return 60 samples of global throughput history.

Protocol Detection Cache

Not part of MetricsCollector. Maintained by HttpProxyService's protocol detection system. Injected into the metrics snapshot at read time by get_metrics().

Each entry records: host, port, domain, detected protocol (h1/h2/h3), H3 port, age, last accessed, last probed, suppression flags, cooldown timers, consecutive failure counts.


Instrumentation Points

TCP Passthrough (rustproxy-passthrough)

Connection lifecycletcp_listener.rs:

  • Accept: conn_tracker.connection_opened(&ip) (rate limiter) + ConnectionTrackerGuard RAII
  • Route match: metrics.connection_opened(route_id, source_ip) + ConnectionGuard RAII
  • Close: Both guards call their respective _closed() methods on drop

Byte recordingforwarder.rs (forward_bidirectional_with_timeouts):

  • Initial peeked data recorded immediately
  • Per-chunk in both directions: record_bytes(n, 0, ...) / record_bytes(0, n, ...)
  • Same pattern in forward_bidirectional_split_with_timeouts (tcp_listener.rs) for TLS-terminated paths

HTTP Proxy (rustproxy-http)

Request countingproxy_service.rs:

  • record_http_request() called once per request after route matching succeeds

Body byte countingcounting_body.rs wrapping:

  • Request bodies: CountingBody::new(body, ..., Direction::In) — counts client-to-upstream bytes
  • Response bodies: CountingBody::new(body, ..., Direction::Out) — counts upstream-to-client bytes
  • Batched flush every 64KB (BYTE_FLUSH_THRESHOLD = 65_536), remainder flushed on drop
  • Also updates connection_activity atomic (idle watchdog) and active_requests counter (streaming detection)

Backend metricsproxy_service.rs:

  • backend_connection_opened(key, connect_time) — after TCP/TLS connect succeeds
  • backend_connection_closed(key) — on teardown
  • backend_connect_error(key) — TCP/TLS connect failure or timeout
  • backend_handshake_error(key) — H1/H2 protocol handshake failure
  • backend_request_error(key) — send_request failure
  • backend_h2_failure(key) — H2 attempted, fell back to H1
  • backend_pool_hit(key) / backend_pool_miss(key) — connection pool reuse
  • set_backend_protocol(key, proto) — records detected protocol

WebSocketproxy_service.rs:

  • Does NOT use CountingBody; records bytes directly per-chunk in both directions of the bidirectional copy loop

QUIC (rustproxy-passthrough)

Connection levelquic_handler.rs:

  • connection_opened / connection_closed via QuicConnGuard RAII
  • conn_tracker.connection_opened/closed for rate limiting

Stream level:

  • For QUIC-to-TCP stream forwarding: record_bytes(bytes_in, bytes_out, ...) called once per stream at completion (post-hoc, not per-chunk)
  • For HTTP/3: delegates to HttpProxyService.handle_request(), so all HTTP proxy metrics apply

H3 specificsh3_service.rs:

  • ProtocolGuard::frontend("h3") tracks the H3 connection
  • H3 request bodies: record_bytes(data.len(), 0, ...) called directly (not CountingBody) since H3 uses stream.send_data()
  • H3 response bodies: wrapped in CountingBody like HTTP/1 and HTTP/2

UDP (rustproxy-passthrough)

Session lifecycleudp_listener.rs / udp_session.rs:

  • udp_session_opened() + connection_opened(route_id, source_ip) on new session
  • udp_session_closed() + connection_closed(route_id, source_ip) on idle reap or port drain

Datagram countingudp_listener.rs:

  • Inbound: record_bytes(len, 0, ...) + record_datagram_in()
  • Outbound (backend reply): record_bytes(0, len, ...) + record_datagram_out()

Sampling Loop

lib.rs spawns a tokio task at configurable interval (default 1000ms):

loop {
    tokio::select! {
        _ = cancel => break,
        _ = interval.tick() => {
            metrics.sample_all();
            conn_tracker.cleanup_stale_timestamps();
            http_proxy.cleanup_all_rate_limiters();
        }
    }
}

sample_all() performs in one pass:

  1. Drains global_pending_tp_in/out into global ThroughputTracker, samples
  2. Drains per-route pending counters into per-route trackers, samples each
  3. Samples idle route trackers (no new data) to advance their window
  4. Drains per-IP pending counters into per-IP trackers, samples each
  5. Drains pending_http_requests into HTTP request throughput tracker
  6. Prunes orphaned per-IP entries (bytes/throughput maps with no matching ip_connections key)
  7. Prunes orphaned per-backend entries (error/stats maps with no matching active/total key)

Data Flow: Rust to TypeScript

MetricsCollector.snapshot()
    ├── reads all AtomicU64 counters
    ├── iterates DashMaps (routes, IPs, backends)
    ├── locks ThroughputTrackers for instant/recent rates + history
    └── produces Metrics struct

RustProxy::get_metrics()
    ├── calls snapshot()
    ├── enriches with detectedProtocols from HTTP proxy protocol cache
    └── returns Metrics

management.rs "getMetrics" IPC command
    ├── calls get_metrics()
    ├── serde_json::to_value (camelCase)
    └── writes JSON to stdout

RustProxyBridge (TypeScript)
    ├── reads JSON from Rust process stdout
    └── returns parsed object

RustMetricsAdapter
    ├── setInterval polls bridge.getMetrics() every 1s
    ├── stores raw JSON in this.cache
    └── IMetrics methods read synchronously from cache

SmartProxy.getMetrics()
    └── returns the RustMetricsAdapter instance

IPC JSON Shape (Metrics)

{
  "activeConnections": 42,
  "totalConnections": 1000,
  "bytesIn": 123456789,
  "bytesOut": 987654321,
  "throughputInBytesPerSec": 50000,
  "throughputOutBytesPerSec": 80000,
  "throughputRecentInBytesPerSec": 45000,
  "throughputRecentOutBytesPerSec": 75000,
  "routes": {
    "<route-id>": {
      "activeConnections": 5,
      "totalConnections": 100,
      "bytesIn": 0, "bytesOut": 0,
      "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0,
      "throughputRecentInBytesPerSec": 0, "throughputRecentOutBytesPerSec": 0
    }
  },
  "ips": {
    "<ip>": {
      "activeConnections": 2, "totalConnections": 10,
      "bytesIn": 0, "bytesOut": 0,
      "throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0,
      "domainRequests": {
        "example.com": 4821,
        "api.example.com": 312
      }
    }
  },
  "backends": {
    "<host:port>": {
      "activeConnections": 3, "totalConnections": 50,
      "protocol": "h2",
      "connectErrors": 0, "handshakeErrors": 0, "requestErrors": 0,
      "totalConnectTimeUs": 150000, "connectCount": 50,
      "poolHits": 40, "poolMisses": 10, "h2Failures": 1
    }
  },
  "throughputHistory": [
    { "timestampMs": 1713000000000, "bytesIn": 50000, "bytesOut": 80000 }
  ],
  "totalHttpRequests": 5000,
  "httpRequestsPerSec": 100,
  "httpRequestsPerSecRecent": 95,
  "activeUdpSessions": 0, "totalUdpSessions": 5,
  "totalDatagramsIn": 1000, "totalDatagramsOut": 1000,
  "frontendProtocols": {
    "h1Active": 10, "h1Total": 500,
    "h2Active": 5, "h2Total": 200,
    "h3Active": 1, "h3Total": 50,
    "wsActive": 2, "wsTotal": 30,
    "otherActive": 0, "otherTotal": 0
  },
  "backendProtocols": { "...same shape..." },
  "detectedProtocols": [
    {
      "host": "backend", "port": 443, "domain": "example.com",
      "protocol": "h2", "h3Port": 443,
      "ageSecs": 120, "lastAccessedSecs": 5, "lastProbedSecs": 120,
      "h2Suppressed": false, "h3Suppressed": false,
      "h2CooldownRemainingSecs": null, "h3CooldownRemainingSecs": null,
      "h2ConsecutiveFailures": null, "h3ConsecutiveFailures": null
    }
  ]
}

IPC JSON Shape (Statistics)

Lightweight administrative summary, fetched on-demand (not polled):

{
  "activeConnections": 42,
  "totalConnections": 1000,
  "routesCount": 5,
  "listeningPorts": [80, 443, 8443],
  "uptimeSeconds": 86400
}

TypeScript Consumer API

SmartProxy.getMetrics() returns an IMetrics object. All members are synchronous methods reading from the polled cache.

connections

Method Return Source
active() number cache.activeConnections
total() number cache.totalConnections
byRoute() Map<string, number> cache.routes[name].activeConnections
byIP() Map<string, number> cache.ips[ip].activeConnections
topIPs(limit?) Array<{ip, count}> cache.ips sorted by active desc, default 10
domainRequestsByIP() Map<string, Map<string, number>> cache.ips[ip].domainRequests
topDomainRequests(limit?) Array<{ip, domain, count}> Flattened from all IPs, sorted by count desc, default 20
frontendProtocols() IProtocolDistribution cache.frontendProtocols.*
backendProtocols() IProtocolDistribution cache.backendProtocols.*

throughput

Method Return Source
instant() {in, out} cache.throughputInBytesPerSec/Out
recent() {in, out} cache.throughputRecentInBytesPerSec/Out
average() {in, out} Falls back to instant() (not wired to windowed average)
custom(seconds) {in, out} Falls back to instant() (not wired)
history(seconds) IThroughputHistoryPoint[] cache.throughputHistory sliced to last N entries
byRoute(windowSeconds?) Map<string, {in, out}> cache.routes[name].throughputIn/OutBytesPerSec
byIP(windowSeconds?) Map<string, {in, out}> cache.ips[ip].throughputIn/OutBytesPerSec

requests

Method Return Source
perSecond() number cache.httpRequestsPerSec
perMinute() number cache.httpRequestsPerSecRecent * 60
total() number cache.totalHttpRequests (fallback: totalConnections)

totals

Method Return Source
bytesIn() number cache.bytesIn
bytesOut() number cache.bytesOut
connections() number cache.totalConnections

backends

Method Return Source
byBackend() Map<string, IBackendMetrics> cache.backends[key].* with computed avgConnectTimeMs and poolHitRate
protocols() Map<string, string> cache.backends[key].protocol
topByErrors(limit?) IBackendMetrics[] Sorted by total errors desc
detectedProtocols() IProtocolCacheEntry[] cache.detectedProtocols passthrough

IBackendMetrics: { protocol, activeConnections, totalConnections, connectErrors, handshakeErrors, requestErrors, avgConnectTimeMs, poolHitRate, h2Failures }

udp

Method Return Source
activeSessions() number cache.activeUdpSessions
totalSessions() number cache.totalUdpSessions
datagramsIn() number cache.totalDatagramsIn
datagramsOut() number cache.totalDatagramsOut

percentiles (stub)

connectionDuration() and bytesTransferred() always return zeros. Not implemented.


Configuration

interface IMetricsConfig {
  enabled: boolean;              // default true
  sampleIntervalMs: number;      // default 1000 (1Hz sampling + TS polling)
  retentionSeconds: number;      // default 3600 (ThroughputTracker capacity)
  enableDetailedTracking: boolean;
  enablePercentiles: boolean;
  cacheResultsMs: number;
  prometheusEnabled: boolean;    // not wired
  prometheusPath: string;        // not wired
  prometheusPrefix: string;      // not wired
}

Rust-side config (MetricsConfig in rustproxy-config):

pub struct MetricsConfig {
    pub enabled: Option<bool>,
    pub sample_interval_ms: Option<u64>,   // default 1000
    pub retention_seconds: Option<u64>,    // default 3600
}

Design Decisions

Lock-free hot path. record_bytes() is the most frequently called method (per-chunk in TCP, per-64KB in HTTP). It only touches AtomicU64 with Relaxed ordering and short-circuits zero-byte directions to skip DashMap lookups entirely.

CountingBody batching. HTTP body frames are typically 16KB. Flushing to MetricsCollector every 64KB reduces DashMap shard contention by ~4x compared to per-frame recording.

RAII guards everywhere. ConnectionGuard, ConnectionTrackerGuard, QuicConnGuard, ProtocolGuard, FrontendProtocolTracker all use Drop to guarantee counter cleanup on all exit paths including panics.

Saturating decrements. Protocol counters use fetch_update loops instead of fetch_sub to prevent underflow to u64::MAX from concurrent close races.

Bounded memory. Per-IP entries evicted on last connection close. Per-backend entries evicted on last connection close. Snapshot caps IPs and backends at 100 each. sample_all() prunes orphaned entries every second.

Two-phase throughput. Pending bytes accumulate in lock-free atomics. The 1Hz cold path drains them into Mutex-guarded ThroughputTrackers. This means the hot path never contends on a Mutex, while the cold path does minimal work (one drain + one sample per tracker).


Known Gaps

Gap Status
throughput.average() / throughput.custom(seconds) Fall back to instant(). Not wired to Rust windowed queries.
percentiles.connectionDuration() / percentiles.bytesTransferred() Stub returning zeros. No histogram in Rust.
Prometheus export Config fields exist but not wired to any exporter.
LogDeduplicator Implemented in rustproxy-metrics but not connected to any call site.
Rate limit hit counters Rate-limited requests return 429 but no counter is recorded in MetricsCollector.
QUIC stream byte counting Post-hoc (per-stream totals after close), not per-chunk like TCP.
Throughput history in snapshot Capped at 60 samples. TS history(seconds) cannot return more than 60 points regardless of retentionSeconds.
Per-route total connections / bytes Available in Rust JSON but IMetrics.connections.byRoute() only exposes active connections.
Per-IP total connections / bytes Available in Rust JSON but IMetrics.connections.byIP() only exposes active connections.
IPC response typing RustProxyBridge declares result: any for both metrics commands. No type-safe response.