21 KiB
SmartProxy Metrics System
Architecture
Two-tier design separating the data plane from the observation plane:
Hot path (per-chunk, lock-free): All recording in the proxy data plane touches only AtomicU64 counters. No Mutex is ever acquired on the forwarding path. CountingBody batches flushes every 64KB to reduce DashMap shard contention.
Cold path (1Hz sampling): A background tokio task drains pending atomics into ThroughputTracker circular buffers (Mutex-guarded), producing per-second throughput history. Same task prunes orphaned entries and cleans up rate limiter state.
Read path (on-demand): snapshot() reads all atomics and locks ThroughputTrackers to build a serializable Metrics struct. TypeScript polls this at 1s intervals via IPC.
Data Plane (lock-free) Background (1Hz) Read Path
───────────────────── ────────────────── ─────────
record_bytes() ──> AtomicU64 ──┐
record_http_request() ──> AtomicU64 ──┤
connection_opened/closed() ──> AtomicU64 ──┤ sample_all() snapshot()
backend_*() ──> DashMap<AtomicU64> ──┤────> drain atomics ──────> Metrics struct
protocol_*() ──> AtomicU64 ──┤ feed ThroughputTrackers ──> JSON
datagram_*() ──> AtomicU64 ──┘ prune orphans ──> IPC stdout
──> TS cache
──> IMetrics API
Key Types
| Type | Crate | Purpose |
|---|---|---|
MetricsCollector |
rustproxy-metrics |
Central store. All DashMaps, atomics, and throughput trackers |
ThroughputTracker |
rustproxy-metrics |
Circular buffer of 1Hz samples. Default 3600 capacity (1 hour) |
ForwardMetricsCtx |
rustproxy-passthrough |
Carries Arc<MetricsCollector> + route_id + source_ip through TCP forwarding |
CountingBody |
rustproxy-http |
Wraps HTTP bodies, batches byte recording per 64KB, flushes on drop |
ProtocolGuard |
rustproxy-http |
RAII guard for frontend/backend protocol active/total counters |
ConnectionGuard |
rustproxy-passthrough |
RAII guard calling connection_closed() on drop |
RustMetricsAdapter |
TypeScript | Polls Rust via IPC, implements IMetrics interface over cached JSON |
What's Collected
Global Counters
| Metric | Type | Updated by |
|---|---|---|
| Active connections | AtomicU64 | connection_opened/closed |
| Total connections (lifetime) | AtomicU64 | connection_opened |
| Total bytes in | AtomicU64 | record_bytes |
| Total bytes out | AtomicU64 | record_bytes |
| Total HTTP requests | AtomicU64 | record_http_request |
| Active UDP sessions | AtomicU64 | udp_session_opened/closed |
| Total UDP sessions | AtomicU64 | udp_session_opened |
| Total datagrams in | AtomicU64 | record_datagram_in |
| Total datagrams out | AtomicU64 | record_datagram_out |
Per-Route Metrics (keyed by route ID string)
| Metric | Storage |
|---|---|
| Active connections | DashMap<String, AtomicU64> |
| Total connections | DashMap<String, AtomicU64> |
| Bytes in / out | DashMap<String, AtomicU64> |
| Pending throughput (in, out) | DashMap<String, (AtomicU64, AtomicU64)> |
| Throughput history | DashMap<String, Mutex<ThroughputTracker>> |
Entries are pruned via retain_routes() when routes are removed.
Per-IP Metrics (keyed by IP string)
| Metric | Storage |
|---|---|
| Active connections | DashMap<String, AtomicU64> |
| Total connections | DashMap<String, AtomicU64> |
| Bytes in / out | DashMap<String, AtomicU64> |
| Pending throughput (in, out) | DashMap<String, (AtomicU64, AtomicU64)> |
| Throughput history | DashMap<String, Mutex<ThroughputTracker>> |
| Domain requests | DashMap<String, DashMap<String, AtomicU64>> (IP → domain → count) |
All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in sample_all() catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending.
Domain request tracking: Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Domain keys are normalized to lowercase with any trailing dot stripped so the same hostname does not fragment across multiple counters. Capped at 256 domains per IP (MAX_DOMAINS_PER_IP) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.
Per-Backend Metrics (keyed by "host:port")
| Metric | Storage |
|---|---|
| Active connections | DashMap<String, AtomicU64> |
| Total connections | DashMap<String, AtomicU64> |
| Detected protocol (h1/h2/h3) | DashMap<String, String> |
| Connect errors | DashMap<String, AtomicU64> |
| Handshake errors | DashMap<String, AtomicU64> |
| Request errors | DashMap<String, AtomicU64> |
| Total connect time (microseconds) | DashMap<String, AtomicU64> |
| Connect count | DashMap<String, AtomicU64> |
| Pool hits | DashMap<String, AtomicU64> |
| Pool misses | DashMap<String, AtomicU64> |
| H2 failures (fallback to H1) | DashMap<String, AtomicU64> |
All per-backend maps are evicted when active count reaches 0. Pruned via retain_backends().
Frontend Protocol Distribution
Tracked via ProtocolGuard RAII guards and FrontendProtocolTracker. Five protocol categories, each with active + total counters (AtomicU64):
| Protocol | Where detected |
|---|---|
| h1 | FrontendProtocolTracker on first HTTP/1.x request |
| h2 | FrontendProtocolTracker on first HTTP/2 request |
| h3 | ProtocolGuard::frontend("h3") in H3ProxyService |
| ws | ProtocolGuard::frontend("ws") on WebSocket upgrade |
| other | Fallback (TCP passthrough without HTTP) |
Uses fetch_update for saturating decrements to prevent underflow races. The same saturating-close pattern is also used for connection and UDP active gauges.
Backend Protocol Distribution
Same five categories (h1/h2/h3/ws/other), tracked via ProtocolGuard::backend() at connection establishment. Backend h2 failures (fallback to h1) are separately counted.
Throughput History
ThroughputTracker is a circular buffer storing ThroughputSample { timestamp_ms, bytes_in, bytes_out } at 1Hz.
- Global tracker: 1 instance, default 3600 capacity
- Per-route trackers: 1 per active route
- Per-IP trackers: 1 per connected IP (evicted with the IP)
- HTTP request tracker: reuses ThroughputTracker with bytes_in = request count, bytes_out = 0
Query methods:
instant()— last 1 second averagerecent()— last 10 seconds averagethroughput(N)— last N seconds averagehistory(N)— last N raw samples in chronological order
Snapshots return 60 samples of global throughput history.
Protocol Detection Cache
Not part of MetricsCollector. Maintained by HttpProxyService's protocol detection system. Injected into the metrics snapshot at read time by get_metrics().
Each entry records: host, port, domain, detected protocol (h1/h2/h3), H3 port, age, last accessed, last probed, suppression flags, cooldown timers, consecutive failure counts.
Instrumentation Points
TCP Passthrough (rustproxy-passthrough)
Connection lifecycle — tcp_listener.rs:
- Accept:
conn_tracker.connection_opened(&ip)(rate limiter) +ConnectionTrackerGuardRAII - Route match:
metrics.connection_opened(route_id, source_ip)+ConnectionGuardRAII - Close: Both guards call their respective
_closed()methods on drop
Byte recording — forwarder.rs (forward_bidirectional_with_timeouts):
- Initial peeked data recorded immediately
- Per-chunk in both directions:
record_bytes(n, 0, ...)/record_bytes(0, n, ...) - Same pattern in
forward_bidirectional_split_with_timeouts(tcp_listener.rs) for TLS-terminated paths
HTTP Proxy (rustproxy-http)
Request counting — proxy_service.rs:
record_http_request()called once per request after route matching succeeds
Body byte counting — counting_body.rs wrapping:
- Request bodies:
CountingBody::new(body, ..., Direction::In)— counts client-to-upstream bytes - Response bodies:
CountingBody::new(body, ..., Direction::Out)— counts upstream-to-client bytes - Batched flush every 64KB (
BYTE_FLUSH_THRESHOLD = 65_536), remainder flushed on drop - Also updates
connection_activityatomic (idle watchdog) andactive_requestscounter (streaming detection)
Backend metrics — proxy_service.rs:
backend_connection_opened(key, connect_time)— after TCP/TLS connect succeedsbackend_connection_closed(key)— on teardownbackend_connect_error(key)— TCP/TLS connect failure or timeoutbackend_handshake_error(key)— H1/H2 protocol handshake failurebackend_request_error(key)— send_request failurebackend_h2_failure(key)— H2 attempted, fell back to H1backend_pool_hit(key)/backend_pool_miss(key)— connection pool reuseset_backend_protocol(key, proto)— records detected protocol
WebSocket — proxy_service.rs:
- Does NOT use CountingBody; records bytes directly per-chunk in both directions of the bidirectional copy loop
QUIC (rustproxy-passthrough)
Connection level — quic_handler.rs:
connection_opened/connection_closedviaQuicConnGuardRAIIconn_tracker.connection_opened/closedfor rate limiting
Stream level:
- For QUIC-to-TCP stream forwarding:
record_bytes(bytes_in, bytes_out, ...)called once per stream at completion (post-hoc, not per-chunk) - For HTTP/3: delegates to
HttpProxyService.handle_request(), so all HTTP proxy metrics apply
H3 specifics — h3_service.rs:
ProtocolGuard::frontend("h3")tracks the H3 connection- H3 request bodies:
record_bytes(data.len(), 0, ...)called directly (not CountingBody) since H3 usesstream.send_data() - H3 response bodies: wrapped in CountingBody like HTTP/1 and HTTP/2
UDP (rustproxy-passthrough)
Session lifecycle — udp_listener.rs / udp_session.rs:
udp_session_opened()+connection_opened(route_id, source_ip)on new sessionudp_session_closed()+connection_closed(route_id, source_ip)on idle reap or port drain
Datagram counting — udp_listener.rs:
- Inbound:
record_bytes(len, 0, ...)+record_datagram_in() - Outbound (backend reply):
record_bytes(0, len, ...)+record_datagram_out()
Sampling Loop
lib.rs spawns a tokio task at configurable interval (default 1000ms):
loop {
tokio::select! {
_ = cancel => break,
_ = interval.tick() => {
metrics.sample_all();
conn_tracker.cleanup_stale_timestamps();
http_proxy.cleanup_all_rate_limiters();
}
}
}
sample_all() performs in one pass:
- Drains
global_pending_tp_in/outinto global ThroughputTracker, samples - Drains per-route pending counters into per-route trackers, samples each
- Samples idle route trackers (no new data) to advance their window
- Drains per-IP pending counters into per-IP trackers, samples each
- Drains
pending_http_requestsinto HTTP request throughput tracker - Prunes orphaned per-IP entries (bytes/throughput maps with no matching ip_connections key)
- Prunes orphaned per-backend entries (error/stats maps with no matching active/total key)
Data Flow: Rust to TypeScript
MetricsCollector.snapshot()
├── reads all AtomicU64 counters
├── iterates DashMaps (routes, IPs, backends)
├── locks ThroughputTrackers for instant/recent rates + history
└── produces Metrics struct
RustProxy::get_metrics()
├── calls snapshot()
├── enriches with detectedProtocols from HTTP proxy protocol cache
└── returns Metrics
management.rs "getMetrics" IPC command
├── calls get_metrics()
├── serde_json::to_value (camelCase)
└── writes JSON to stdout
RustProxyBridge (TypeScript)
├── reads JSON from Rust process stdout
└── returns parsed object
RustMetricsAdapter
├── setInterval polls bridge.getMetrics() every 1s
├── stores raw JSON in this.cache
└── IMetrics methods read synchronously from cache
SmartProxy.getMetrics()
└── returns the RustMetricsAdapter instance
IPC JSON Shape (Metrics)
{
"activeConnections": 42,
"totalConnections": 1000,
"bytesIn": 123456789,
"bytesOut": 987654321,
"throughputInBytesPerSec": 50000,
"throughputOutBytesPerSec": 80000,
"throughputRecentInBytesPerSec": 45000,
"throughputRecentOutBytesPerSec": 75000,
"routes": {
"<route-id>": {
"activeConnections": 5,
"totalConnections": 100,
"bytesIn": 0, "bytesOut": 0,
"throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0,
"throughputRecentInBytesPerSec": 0, "throughputRecentOutBytesPerSec": 0
}
},
"ips": {
"<ip>": {
"activeConnections": 2, "totalConnections": 10,
"bytesIn": 0, "bytesOut": 0,
"throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0,
"domainRequests": {
"example.com": 4821,
"api.example.com": 312
}
}
},
"backends": {
"<host:port>": {
"activeConnections": 3, "totalConnections": 50,
"protocol": "h2",
"connectErrors": 0, "handshakeErrors": 0, "requestErrors": 0,
"totalConnectTimeUs": 150000, "connectCount": 50,
"poolHits": 40, "poolMisses": 10, "h2Failures": 1
}
},
"throughputHistory": [
{ "timestampMs": 1713000000000, "bytesIn": 50000, "bytesOut": 80000 }
],
"totalHttpRequests": 5000,
"httpRequestsPerSec": 100,
"httpRequestsPerSecRecent": 95,
"activeUdpSessions": 0, "totalUdpSessions": 5,
"totalDatagramsIn": 1000, "totalDatagramsOut": 1000,
"frontendProtocols": {
"h1Active": 10, "h1Total": 500,
"h2Active": 5, "h2Total": 200,
"h3Active": 1, "h3Total": 50,
"wsActive": 2, "wsTotal": 30,
"otherActive": 0, "otherTotal": 0
},
"backendProtocols": { "...same shape..." },
"detectedProtocols": [
{
"host": "backend", "port": 443, "domain": "example.com",
"protocol": "h2", "h3Port": 443,
"ageSecs": 120, "lastAccessedSecs": 5, "lastProbedSecs": 120,
"h2Suppressed": false, "h3Suppressed": false,
"h2CooldownRemainingSecs": null, "h3CooldownRemainingSecs": null,
"h2ConsecutiveFailures": null, "h3ConsecutiveFailures": null
}
]
}
IPC JSON Shape (Statistics)
Lightweight administrative summary, fetched on-demand (not polled):
{
"activeConnections": 42,
"totalConnections": 1000,
"routesCount": 5,
"listeningPorts": [80, 443, 8443],
"uptimeSeconds": 86400
}
TypeScript Consumer API
SmartProxy.getMetrics() returns an IMetrics object. All members are synchronous methods reading from the polled cache.
connections
| Method | Return | Source |
|---|---|---|
active() |
number |
cache.activeConnections |
total() |
number |
cache.totalConnections |
byRoute() |
Map<string, number> |
cache.routes[name].activeConnections |
byIP() |
Map<string, number> |
cache.ips[ip].activeConnections |
topIPs(limit?) |
Array<{ip, count}> |
cache.ips sorted by active desc, default 10 |
domainRequestsByIP() |
Map<string, Map<string, number>> |
cache.ips[ip].domainRequests |
topDomainRequests(limit?) |
Array<{ip, domain, count}> |
Flattened from all IPs, sorted by count desc, default 20 |
frontendProtocols() |
IProtocolDistribution |
cache.frontendProtocols.* |
backendProtocols() |
IProtocolDistribution |
cache.backendProtocols.* |
throughput
| Method | Return | Source |
|---|---|---|
instant() |
{in, out} |
cache.throughputInBytesPerSec/Out |
recent() |
{in, out} |
cache.throughputRecentInBytesPerSec/Out |
average() |
{in, out} |
Falls back to instant() (not wired to windowed average) |
custom(seconds) |
{in, out} |
Falls back to instant() (not wired) |
history(seconds) |
IThroughputHistoryPoint[] |
cache.throughputHistory sliced to last N entries |
byRoute(windowSeconds?) |
Map<string, {in, out}> |
cache.routes[name].throughputIn/OutBytesPerSec |
byIP(windowSeconds?) |
Map<string, {in, out}> |
cache.ips[ip].throughputIn/OutBytesPerSec |
requests
| Method | Return | Source |
|---|---|---|
perSecond() |
number |
cache.httpRequestsPerSec |
perMinute() |
number |
cache.httpRequestsPerSecRecent * 60 |
total() |
number |
cache.totalHttpRequests (fallback: totalConnections) |
totals
| Method | Return | Source |
|---|---|---|
bytesIn() |
number |
cache.bytesIn |
bytesOut() |
number |
cache.bytesOut |
connections() |
number |
cache.totalConnections |
backends
| Method | Return | Source |
|---|---|---|
byBackend() |
Map<string, IBackendMetrics> |
cache.backends[key].* with computed avgConnectTimeMs and poolHitRate |
protocols() |
Map<string, string> |
cache.backends[key].protocol |
topByErrors(limit?) |
IBackendMetrics[] |
Sorted by total errors desc |
detectedProtocols() |
IProtocolCacheEntry[] |
cache.detectedProtocols passthrough |
IBackendMetrics: { protocol, activeConnections, totalConnections, connectErrors, handshakeErrors, requestErrors, avgConnectTimeMs, poolHitRate, h2Failures }
udp
| Method | Return | Source |
|---|---|---|
activeSessions() |
number |
cache.activeUdpSessions |
totalSessions() |
number |
cache.totalUdpSessions |
datagramsIn() |
number |
cache.totalDatagramsIn |
datagramsOut() |
number |
cache.totalDatagramsOut |
percentiles (stub)
connectionDuration() and bytesTransferred() always return zeros. Not implemented.
Configuration
interface IMetricsConfig {
enabled: boolean; // default true
sampleIntervalMs: number; // default 1000 (1Hz sampling + TS polling)
retentionSeconds: number; // default 3600 (ThroughputTracker capacity)
enableDetailedTracking: boolean;
enablePercentiles: boolean;
cacheResultsMs: number;
prometheusEnabled: boolean; // not wired
prometheusPath: string; // not wired
prometheusPrefix: string; // not wired
}
Rust-side config (MetricsConfig in rustproxy-config):
pub struct MetricsConfig {
pub enabled: Option<bool>,
pub sample_interval_ms: Option<u64>, // default 1000
pub retention_seconds: Option<u64>, // default 3600
}
Design Decisions
Lock-free hot path. record_bytes() is the most frequently called method (per-chunk in TCP, per-64KB in HTTP). It only touches AtomicU64 with Relaxed ordering and short-circuits zero-byte directions to skip DashMap lookups entirely.
CountingBody batching. HTTP body frames are typically 16KB. Flushing to MetricsCollector every 64KB reduces DashMap shard contention by ~4x compared to per-frame recording.
RAII guards everywhere. ConnectionGuard, ConnectionTrackerGuard, QuicConnGuard, ProtocolGuard, FrontendProtocolTracker all use Drop to guarantee counter cleanup on all exit paths including panics.
Saturating decrements. Protocol counters use fetch_update loops instead of fetch_sub to prevent underflow to u64::MAX from concurrent close races.
Bounded memory. Per-IP entries evicted on last connection close. Per-backend entries evicted on last connection close. Snapshot caps IPs and backends at 100 each. sample_all() prunes orphaned entries every second.
Two-phase throughput. Pending bytes accumulate in lock-free atomics. The 1Hz cold path drains them into Mutex-guarded ThroughputTrackers. This means the hot path never contends on a Mutex, while the cold path does minimal work (one drain + one sample per tracker).
Known Gaps
| Gap | Status |
|---|---|
throughput.average() / throughput.custom(seconds) |
Fall back to instant(). Not wired to Rust windowed queries. |
percentiles.connectionDuration() / percentiles.bytesTransferred() |
Stub returning zeros. No histogram in Rust. |
| Prometheus export | Config fields exist but not wired to any exporter. |
LogDeduplicator |
Implemented in rustproxy-metrics but not connected to any call site. |
| Rate limit hit counters | Rate-limited requests return 429 but no counter is recorded in MetricsCollector. |
| QUIC stream byte counting | Post-hoc (per-stream totals after close), not per-chunk like TCP. |
| Throughput history in snapshot | Capped at 60 samples. TS history(seconds) cannot return more than 60 points regardless of retentionSeconds. |
| Per-route total connections / bytes | Available in Rust JSON but IMetrics.connections.byRoute() only exposes active connections. |
| Per-IP total connections / bytes | Available in Rust JSON but IMetrics.connections.byIP() only exposes active connections. |
| IPC response typing | RustProxyBridge declares result: any for both metrics commands. No type-safe response. |