From 82ca0381e95a13d512596a7386fa11b61ef39814 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 23 Jun 2025 13:19:39 +0000 Subject: [PATCH] fix(metrics): fix metrics --- readme.hints.md | 23 ++- ts/proxies/smart-proxy/metrics-collector.ts | 160 ++++++------------ .../smart-proxy/models/metrics-types.ts | 10 +- 3 files changed, 71 insertions(+), 122 deletions(-) diff --git a/readme.hints.md b/readme.hints.md index d22b4d0..f4b996d 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -11,10 +11,11 @@ - Hour 2: 2GB total / 60s = 34 MB/s ✗ (appears doubled!) - Hour 3: 3GB total / 60s = 50 MB/s ✗ (keeps rising!) -**Solution**: Implemented snapshot-based byte tracking that calculates actual bytes transferred within each time window: -- Store periodic snapshots of byte counts with timestamps -- Calculate delta between window start and end snapshots -- Divide delta by window duration for accurate throughput +**Solution**: Implemented dedicated ThroughputTracker instances for each route and IP address: +- Each route and IP gets its own throughput tracker with per-second sampling +- Samples are taken every second and stored in a circular buffer +- Rate calculations use actual samples within the requested window +- Default window is now 1 second for real-time accuracy ### What Gets Counted (Network Interface Throughput) @@ -53,15 +54,19 @@ The byte tracking is designed to match network interface throughput (what Unifi/ ### Metrics Architecture -The metrics system has three layers: +The metrics system has multiple layers: 1. **Connection Records** (`record.bytesReceived/bytesSent`): Track total bytes per connection -2. **ThroughputTracker**: Accumulates bytes between samples for global rate calculations (resets each second) -3. **connectionByteTrackers**: Track bytes per connection with snapshots for accurate windowed per-route/IP metrics +2. **Global ThroughputTracker**: Accumulates bytes between samples for overall rate calculations +3. **Per-Route ThroughputTrackers**: Dedicated tracker for each route with per-second sampling +4. **Per-IP ThroughputTrackers**: Dedicated tracker for each IP with per-second sampling +5. **connectionByteTrackers**: Track cumulative bytes and metadata for active connections Key features: -- Global throughput uses sampling with accumulator reset (accurate) -- Per-route/IP throughput uses snapshots to calculate window-specific deltas (accurate) +- All throughput trackers sample every second (1Hz) +- Each tracker maintains a circular buffer of samples (default: 1 hour retention) +- Rate calculations are accurate for any requested window (default: 1 second) - All byte counting happens exactly once at the data flow point +- Unused route/IP trackers are automatically cleaned up when connections close ### Understanding "High" Byte Counts diff --git a/ts/proxies/smart-proxy/metrics-collector.ts b/ts/proxies/smart-proxy/metrics-collector.ts index d268c23..0722ab1 100644 --- a/ts/proxies/smart-proxy/metrics-collector.ts +++ b/ts/proxies/smart-proxy/metrics-collector.ts @@ -15,6 +15,8 @@ import { logger } from '../../core/utils/logger.js'; export class MetricsCollector implements IMetrics { // Throughput tracking private throughputTracker: ThroughputTracker; + private routeThroughputTrackers = new Map(); + private ipThroughputTrackers = new Map(); // Request tracking private requestTimestamps: number[] = []; @@ -119,109 +121,31 @@ export class MetricsCollector implements IMetrics { return this.throughputTracker.getHistory(seconds); }, - byRoute: (windowSeconds: number = 60): Map => { + byRoute: (windowSeconds: number = 1): Map => { const routeThroughput = new Map(); - const now = Date.now(); - const windowStart = now - (windowSeconds * 1000); - // Aggregate bytes by route - calculate actual bytes transferred in window - const routeData = new Map(); - - for (const [_, tracker] of this.connectionByteTrackers) { - // Only include connections that were active within the window - if (tracker.lastUpdate > windowStart) { - let windowBytesIn = 0; - let windowBytesOut = 0; - - if (tracker.windowSnapshots && tracker.windowSnapshots.length > 0) { - // Find the earliest snapshot within or just before the window - let startSnapshot = { timestamp: tracker.startTime, bytesIn: 0, bytesOut: 0 }; - for (const snapshot of tracker.windowSnapshots) { - if (snapshot.timestamp <= windowStart) { - startSnapshot = snapshot; - } else { - break; - } - } - - // Calculate bytes transferred since window start - windowBytesIn = tracker.bytesIn - startSnapshot.bytesIn; - windowBytesOut = tracker.bytesOut - startSnapshot.bytesOut; - } else if (tracker.startTime > windowStart) { - // Connection started within window, use all its bytes - windowBytesIn = tracker.bytesIn; - windowBytesOut = tracker.bytesOut; - } - - // Add to route totals - const current = routeData.get(tracker.routeName) || { bytesIn: 0, bytesOut: 0 }; - current.bytesIn += windowBytesIn; - current.bytesOut += windowBytesOut; - routeData.set(tracker.routeName, current); + // Get throughput from each route's dedicated tracker + for (const [route, tracker] of this.routeThroughputTrackers) { + const rate = tracker.getRate(windowSeconds); + if (rate.in > 0 || rate.out > 0) { + routeThroughput.set(route, rate); } } - // Convert to rates (bytes per second) - for (const [route, data] of routeData) { - routeThroughput.set(route, { - in: Math.round(data.bytesIn / windowSeconds), - out: Math.round(data.bytesOut / windowSeconds) - }); - } - return routeThroughput; }, - byIP: (windowSeconds: number = 60): Map => { + byIP: (windowSeconds: number = 1): Map => { const ipThroughput = new Map(); - const now = Date.now(); - const windowStart = now - (windowSeconds * 1000); - // Aggregate bytes by IP - calculate actual bytes transferred in window - const ipData = new Map(); - - for (const [_, tracker] of this.connectionByteTrackers) { - // Only include connections that were active within the window - if (tracker.lastUpdate > windowStart) { - let windowBytesIn = 0; - let windowBytesOut = 0; - - if (tracker.windowSnapshots && tracker.windowSnapshots.length > 0) { - // Find the earliest snapshot within or just before the window - let startSnapshot = { timestamp: tracker.startTime, bytesIn: 0, bytesOut: 0 }; - for (const snapshot of tracker.windowSnapshots) { - if (snapshot.timestamp <= windowStart) { - startSnapshot = snapshot; - } else { - break; - } - } - - // Calculate bytes transferred since window start - windowBytesIn = tracker.bytesIn - startSnapshot.bytesIn; - windowBytesOut = tracker.bytesOut - startSnapshot.bytesOut; - } else if (tracker.startTime > windowStart) { - // Connection started within window, use all its bytes - windowBytesIn = tracker.bytesIn; - windowBytesOut = tracker.bytesOut; - } - - // Add to IP totals - const current = ipData.get(tracker.remoteIP) || { bytesIn: 0, bytesOut: 0 }; - current.bytesIn += windowBytesIn; - current.bytesOut += windowBytesOut; - ipData.set(tracker.remoteIP, current); + // Get throughput from each IP's dedicated tracker + for (const [ip, tracker] of this.ipThroughputTrackers) { + const rate = tracker.getRate(windowSeconds); + if (rate.in > 0 || rate.out > 0) { + ipThroughput.set(ip, rate); } } - // Convert to rates (bytes per second) - for (const [ip, data] of ipData) { - ipThroughput.set(ip, { - in: Math.round(data.bytesIn / windowSeconds), - out: Math.round(data.bytesOut / windowSeconds) - }); - } - return ipThroughput; } }; @@ -322,8 +246,7 @@ export class MetricsCollector implements IMetrics { bytesIn: 0, bytesOut: 0, startTime: now, - lastUpdate: now, - windowSnapshots: [] // Initialize empty snapshots array + lastUpdate: now }); // Cleanup old request timestamps @@ -353,21 +276,21 @@ export class MetricsCollector implements IMetrics { tracker.bytesOut += bytesOut; tracker.lastUpdate = Date.now(); - // Initialize snapshots array if not present - if (!tracker.windowSnapshots) { - tracker.windowSnapshots = []; + // Update per-route throughput tracker + let routeTracker = this.routeThroughputTrackers.get(tracker.routeName); + if (!routeTracker) { + routeTracker = new ThroughputTracker(this.retentionSeconds); + this.routeThroughputTrackers.set(tracker.routeName, routeTracker); } + routeTracker.recordBytes(bytesIn, bytesOut); - // Add current snapshot - we'll use these for accurate windowed calculations - tracker.windowSnapshots.push({ - timestamp: Date.now(), - bytesIn: tracker.bytesIn, - bytesOut: tracker.bytesOut - }); - - // Keep only snapshots from last 5 minutes to prevent memory growth - const fiveMinutesAgo = Date.now() - 300000; - tracker.windowSnapshots = tracker.windowSnapshots.filter(s => s.timestamp > fiveMinutesAgo); + // Update per-IP throughput tracker + let ipTracker = this.ipThroughputTrackers.get(tracker.remoteIP); + if (!ipTracker) { + ipTracker = new ThroughputTracker(this.retentionSeconds); + this.ipThroughputTrackers.set(tracker.remoteIP, ipTracker); + } + ipTracker.recordBytes(bytesIn, bytesOut); } } @@ -388,8 +311,19 @@ export class MetricsCollector implements IMetrics { // Start periodic sampling this.samplingInterval = setInterval(() => { + // Sample global throughput this.throughputTracker.takeSample(); + // Sample per-route throughput + for (const [_, tracker] of this.routeThroughputTrackers) { + tracker.takeSample(); + } + + // Sample per-IP throughput + for (const [_, tracker] of this.ipThroughputTrackers) { + tracker.takeSample(); + } + // Clean up old connection trackers (connections closed more than 5 minutes ago) const cutoff = Date.now() - 300000; for (const [id, tracker] of this.connectionByteTrackers) { @@ -397,6 +331,22 @@ export class MetricsCollector implements IMetrics { this.connectionByteTrackers.delete(id); } } + + // Clean up unused route trackers + const activeRoutes = new Set(Array.from(this.connectionByteTrackers.values()).map(t => t.routeName)); + for (const [route, _] of this.routeThroughputTrackers) { + if (!activeRoutes.has(route)) { + this.routeThroughputTrackers.delete(route); + } + } + + // Clean up unused IP trackers + const activeIPs = new Set(Array.from(this.connectionByteTrackers.values()).map(t => t.remoteIP)); + for (const [ip, _] of this.ipThroughputTrackers) { + if (!activeIPs.has(ip)) { + this.ipThroughputTrackers.delete(ip); + } + } }, this.sampleIntervalMs); // Subscribe to new connections diff --git a/ts/proxies/smart-proxy/models/metrics-types.ts b/ts/proxies/smart-proxy/models/metrics-types.ts index 94ce869..d958ca1 100644 --- a/ts/proxies/smart-proxy/models/metrics-types.ts +++ b/ts/proxies/smart-proxy/models/metrics-types.ts @@ -49,8 +49,8 @@ export interface IMetrics { average(): IThroughputData; // Last 60 seconds custom(seconds: number): IThroughputData; history(seconds: number): Array; - byRoute(windowSeconds?: number): Map; - byIP(windowSeconds?: number): Map; + byRoute(windowSeconds?: number): Map; // Default: 1 second + byIP(windowSeconds?: number): Map; // Default: 1 second }; // Request metrics @@ -109,10 +109,4 @@ export interface IByteTracker { bytesOut: number; startTime: number; lastUpdate: number; - // Track bytes at window boundaries for rate calculation - windowSnapshots?: Array<{ - timestamp: number; - bytesIn: number; - bytesOut: number; - }>; } \ No newline at end of file