fix(metrics): improve metrics
This commit is contained in:
@ -1,258 +1,341 @@
|
||||
import * as plugins from '../../plugins.js';
|
||||
import type { SmartProxy } from './smart-proxy.js';
|
||||
import type { IProxyStats, IProxyStatsExtended } from './models/metrics-types.js';
|
||||
import type {
|
||||
IMetrics,
|
||||
IThroughputData,
|
||||
IThroughputHistoryPoint,
|
||||
IByteTracker
|
||||
} from './models/metrics-types.js';
|
||||
import { ThroughputTracker } from './throughput-tracker.js';
|
||||
import { logger } from '../../core/utils/logger.js';
|
||||
|
||||
/**
|
||||
* Collects and computes metrics for SmartProxy on-demand
|
||||
* Collects and provides metrics for SmartProxy with clean API
|
||||
*/
|
||||
export class MetricsCollector implements IProxyStatsExtended {
|
||||
// RPS tracking (the only state we need to maintain)
|
||||
export class MetricsCollector implements IMetrics {
|
||||
// Throughput tracking
|
||||
private throughputTracker: ThroughputTracker;
|
||||
|
||||
// Request tracking
|
||||
private requestTimestamps: number[] = [];
|
||||
private readonly RPS_WINDOW_SIZE = 60000; // 1 minute window
|
||||
private readonly MAX_TIMESTAMPS = 5000; // Maximum timestamps to keep
|
||||
private totalRequests: number = 0;
|
||||
|
||||
// Optional caching for performance
|
||||
private cachedMetrics: {
|
||||
timestamp: number;
|
||||
connectionsByRoute?: Map<string, number>;
|
||||
connectionsByIP?: Map<string, number>;
|
||||
} = { timestamp: 0 };
|
||||
// Connection byte tracking for per-route/IP metrics
|
||||
private connectionByteTrackers = new Map<string, IByteTracker>();
|
||||
|
||||
private readonly CACHE_TTL = 1000; // 1 second cache
|
||||
|
||||
// RxJS subscription for connection events
|
||||
// Subscriptions
|
||||
private samplingInterval?: NodeJS.Timeout;
|
||||
private connectionSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||
|
||||
// Configuration
|
||||
private readonly sampleIntervalMs: number;
|
||||
private readonly retentionSeconds: number;
|
||||
|
||||
constructor(
|
||||
private smartProxy: SmartProxy
|
||||
private smartProxy: SmartProxy,
|
||||
config?: {
|
||||
sampleIntervalMs?: number;
|
||||
retentionSeconds?: number;
|
||||
}
|
||||
) {
|
||||
// Subscription will be set up in start() method
|
||||
this.sampleIntervalMs = config?.sampleIntervalMs || 1000;
|
||||
this.retentionSeconds = config?.retentionSeconds || 3600;
|
||||
this.throughputTracker = new ThroughputTracker(this.retentionSeconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current number of active connections
|
||||
*/
|
||||
public getActiveConnections(): number {
|
||||
return this.smartProxy.connectionManager.getConnectionCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection counts grouped by route name
|
||||
*/
|
||||
public getConnectionsByRoute(): Map<string, number> {
|
||||
const now = Date.now();
|
||||
// Connection metrics implementation
|
||||
public connections = {
|
||||
active: (): number => {
|
||||
return this.smartProxy.connectionManager.getConnectionCount();
|
||||
},
|
||||
|
||||
// Return cached value if fresh
|
||||
if (this.cachedMetrics.connectionsByRoute &&
|
||||
now - this.cachedMetrics.timestamp < this.CACHE_TTL) {
|
||||
return new Map(this.cachedMetrics.connectionsByRoute);
|
||||
}
|
||||
|
||||
// Compute fresh value
|
||||
const routeCounts = new Map<string, number>();
|
||||
const connections = this.smartProxy.connectionManager.getConnections();
|
||||
|
||||
if (this.smartProxy.settings?.enableDetailedLogging) {
|
||||
logger.log('debug', `MetricsCollector: Computing route connections`, {
|
||||
totalConnections: connections.size,
|
||||
component: 'metrics'
|
||||
});
|
||||
}
|
||||
|
||||
for (const [_, record] of connections) {
|
||||
// Try different ways to get the route name
|
||||
const routeName = (record as any).routeName ||
|
||||
record.routeConfig?.name ||
|
||||
(record.routeConfig as any)?.routeName ||
|
||||
'unknown';
|
||||
total: (): number => {
|
||||
const stats = this.smartProxy.connectionManager.getTerminationStats();
|
||||
let total = this.smartProxy.connectionManager.getConnectionCount();
|
||||
|
||||
if (this.smartProxy.settings?.enableDetailedLogging) {
|
||||
logger.log('debug', `MetricsCollector: Connection route info`, {
|
||||
connectionId: record.id,
|
||||
routeName,
|
||||
hasRouteConfig: !!record.routeConfig,
|
||||
routeConfigName: record.routeConfig?.name,
|
||||
routeConfigKeys: record.routeConfig ? Object.keys(record.routeConfig) : [],
|
||||
component: 'metrics'
|
||||
for (const reason in stats.incoming) {
|
||||
total += stats.incoming[reason];
|
||||
}
|
||||
|
||||
return total;
|
||||
},
|
||||
|
||||
byRoute: (): Map<string, number> => {
|
||||
const routeCounts = new Map<string, number>();
|
||||
const connections = this.smartProxy.connectionManager.getConnections();
|
||||
|
||||
for (const [_, record] of connections) {
|
||||
const routeName = (record as any).routeName ||
|
||||
record.routeConfig?.name ||
|
||||
'unknown';
|
||||
|
||||
const current = routeCounts.get(routeName) || 0;
|
||||
routeCounts.set(routeName, current + 1);
|
||||
}
|
||||
|
||||
return routeCounts;
|
||||
},
|
||||
|
||||
byIP: (): Map<string, number> => {
|
||||
const ipCounts = new Map<string, number>();
|
||||
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
const ip = record.remoteIP;
|
||||
const current = ipCounts.get(ip) || 0;
|
||||
ipCounts.set(ip, current + 1);
|
||||
}
|
||||
|
||||
return ipCounts;
|
||||
},
|
||||
|
||||
topIPs: (limit: number = 10): Array<{ ip: string; count: number }> => {
|
||||
const ipCounts = this.connections.byIP();
|
||||
return Array.from(ipCounts.entries())
|
||||
.sort((a, b) => b[1] - a[1])
|
||||
.slice(0, limit)
|
||||
.map(([ip, count]) => ({ ip, count }));
|
||||
}
|
||||
};
|
||||
|
||||
// Throughput metrics implementation
|
||||
public throughput = {
|
||||
instant: (): IThroughputData => {
|
||||
return this.throughputTracker.getRate(1);
|
||||
},
|
||||
|
||||
recent: (): IThroughputData => {
|
||||
return this.throughputTracker.getRate(10);
|
||||
},
|
||||
|
||||
average: (): IThroughputData => {
|
||||
return this.throughputTracker.getRate(60);
|
||||
},
|
||||
|
||||
custom: (seconds: number): IThroughputData => {
|
||||
return this.throughputTracker.getRate(seconds);
|
||||
},
|
||||
|
||||
history: (seconds: number): Array<IThroughputHistoryPoint> => {
|
||||
return this.throughputTracker.getHistory(seconds);
|
||||
},
|
||||
|
||||
byRoute: (windowSeconds: number = 60): Map<string, IThroughputData> => {
|
||||
const routeThroughput = new Map<string, IThroughputData>();
|
||||
const now = Date.now();
|
||||
const windowStart = now - (windowSeconds * 1000);
|
||||
|
||||
// Aggregate bytes by route from trackers
|
||||
const routeBytes = new Map<string, { in: number; out: number }>();
|
||||
|
||||
for (const [_, tracker] of this.connectionByteTrackers) {
|
||||
if (tracker.lastUpdate > windowStart) {
|
||||
const current = routeBytes.get(tracker.routeName) || { in: 0, out: 0 };
|
||||
current.in += tracker.bytesIn;
|
||||
current.out += tracker.bytesOut;
|
||||
routeBytes.set(tracker.routeName, current);
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to rates
|
||||
for (const [route, bytes] of routeBytes) {
|
||||
routeThroughput.set(route, {
|
||||
in: Math.round(bytes.in / windowSeconds),
|
||||
out: Math.round(bytes.out / windowSeconds)
|
||||
});
|
||||
}
|
||||
|
||||
const current = routeCounts.get(routeName) || 0;
|
||||
routeCounts.set(routeName, current + 1);
|
||||
}
|
||||
return routeThroughput;
|
||||
},
|
||||
|
||||
// Cache and return
|
||||
this.cachedMetrics.connectionsByRoute = routeCounts;
|
||||
this.cachedMetrics.timestamp = now;
|
||||
return new Map(routeCounts);
|
||||
}
|
||||
byIP: (windowSeconds: number = 60): Map<string, IThroughputData> => {
|
||||
const ipThroughput = new Map<string, IThroughputData>();
|
||||
const now = Date.now();
|
||||
const windowStart = now - (windowSeconds * 1000);
|
||||
|
||||
// Aggregate bytes by IP from trackers
|
||||
const ipBytes = new Map<string, { in: number; out: number }>();
|
||||
|
||||
for (const [_, tracker] of this.connectionByteTrackers) {
|
||||
if (tracker.lastUpdate > windowStart) {
|
||||
const current = ipBytes.get(tracker.remoteIP) || { in: 0, out: 0 };
|
||||
current.in += tracker.bytesIn;
|
||||
current.out += tracker.bytesOut;
|
||||
ipBytes.set(tracker.remoteIP, current);
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to rates
|
||||
for (const [ip, bytes] of ipBytes) {
|
||||
ipThroughput.set(ip, {
|
||||
in: Math.round(bytes.in / windowSeconds),
|
||||
out: Math.round(bytes.out / windowSeconds)
|
||||
});
|
||||
}
|
||||
|
||||
return ipThroughput;
|
||||
}
|
||||
};
|
||||
|
||||
// Request metrics implementation
|
||||
public requests = {
|
||||
perSecond: (): number => {
|
||||
const now = Date.now();
|
||||
const oneSecondAgo = now - 1000;
|
||||
|
||||
// Clean old timestamps
|
||||
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > now - 60000);
|
||||
|
||||
// Count requests in last second
|
||||
const recentRequests = this.requestTimestamps.filter(ts => ts > oneSecondAgo);
|
||||
return recentRequests.length;
|
||||
},
|
||||
|
||||
perMinute: (): number => {
|
||||
const now = Date.now();
|
||||
const oneMinuteAgo = now - 60000;
|
||||
|
||||
// Count requests in last minute
|
||||
const recentRequests = this.requestTimestamps.filter(ts => ts > oneMinuteAgo);
|
||||
return recentRequests.length;
|
||||
},
|
||||
|
||||
total: (): number => {
|
||||
return this.totalRequests;
|
||||
}
|
||||
};
|
||||
|
||||
// Totals implementation
|
||||
public totals = {
|
||||
bytesIn: (): number => {
|
||||
let total = 0;
|
||||
|
||||
// Sum from all active connections
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
total += record.bytesReceived;
|
||||
}
|
||||
|
||||
// TODO: Add historical data from terminated connections
|
||||
|
||||
return total;
|
||||
},
|
||||
|
||||
bytesOut: (): number => {
|
||||
let total = 0;
|
||||
|
||||
// Sum from all active connections
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
total += record.bytesSent;
|
||||
}
|
||||
|
||||
// TODO: Add historical data from terminated connections
|
||||
|
||||
return total;
|
||||
},
|
||||
|
||||
connections: (): number => {
|
||||
return this.connections.total();
|
||||
}
|
||||
};
|
||||
|
||||
// Percentiles implementation (placeholder for now)
|
||||
public percentiles = {
|
||||
connectionDuration: (): { p50: number; p95: number; p99: number } => {
|
||||
// TODO: Implement percentile calculations
|
||||
return { p50: 0, p95: 0, p99: 0 };
|
||||
},
|
||||
|
||||
bytesTransferred: (): {
|
||||
in: { p50: number; p95: number; p99: number };
|
||||
out: { p50: number; p95: number; p99: number };
|
||||
} => {
|
||||
// TODO: Implement percentile calculations
|
||||
return {
|
||||
in: { p50: 0, p95: 0, p99: 0 },
|
||||
out: { p50: 0, p95: 0, p99: 0 }
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get connection counts grouped by IP address
|
||||
* Record a new request
|
||||
*/
|
||||
public getConnectionsByIP(): Map<string, number> {
|
||||
const now = Date.now();
|
||||
|
||||
// Return cached value if fresh
|
||||
if (this.cachedMetrics.connectionsByIP &&
|
||||
now - this.cachedMetrics.timestamp < this.CACHE_TTL) {
|
||||
return new Map(this.cachedMetrics.connectionsByIP);
|
||||
}
|
||||
|
||||
// Compute fresh value
|
||||
const ipCounts = new Map<string, number>();
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
const ip = record.remoteIP;
|
||||
const current = ipCounts.get(ip) || 0;
|
||||
ipCounts.set(ip, current + 1);
|
||||
}
|
||||
|
||||
// Cache and return
|
||||
this.cachedMetrics.connectionsByIP = ipCounts;
|
||||
this.cachedMetrics.timestamp = now;
|
||||
return new Map(ipCounts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total number of connections since proxy start
|
||||
*/
|
||||
public getTotalConnections(): number {
|
||||
// Get from termination stats
|
||||
const stats = this.smartProxy.connectionManager.getTerminationStats();
|
||||
let total = this.smartProxy.connectionManager.getConnectionCount(); // Add active connections
|
||||
|
||||
// Add all terminated connections
|
||||
for (const reason in stats.incoming) {
|
||||
total += stats.incoming[reason];
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current requests per second rate
|
||||
*/
|
||||
public getRequestsPerSecond(): number {
|
||||
const now = Date.now();
|
||||
const windowStart = now - this.RPS_WINDOW_SIZE;
|
||||
|
||||
// Clean old timestamps
|
||||
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > windowStart);
|
||||
|
||||
// Calculate RPS based on window
|
||||
const requestsInWindow = this.requestTimestamps.length;
|
||||
return requestsInWindow / (this.RPS_WINDOW_SIZE / 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a new request for RPS tracking
|
||||
*/
|
||||
public recordRequest(): void {
|
||||
public recordRequest(connectionId: string, routeName: string, remoteIP: string): void {
|
||||
const now = Date.now();
|
||||
this.requestTimestamps.push(now);
|
||||
this.totalRequests++;
|
||||
|
||||
// Prevent unbounded growth - clean up more aggressively
|
||||
if (this.requestTimestamps.length > this.MAX_TIMESTAMPS) {
|
||||
// Keep only timestamps within the window
|
||||
const cutoff = now - this.RPS_WINDOW_SIZE;
|
||||
// Initialize byte tracker for this connection
|
||||
this.connectionByteTrackers.set(connectionId, {
|
||||
connectionId,
|
||||
routeName,
|
||||
remoteIP,
|
||||
bytesIn: 0,
|
||||
bytesOut: 0,
|
||||
lastUpdate: now
|
||||
});
|
||||
|
||||
// Cleanup old request timestamps (keep last minute only)
|
||||
if (this.requestTimestamps.length > 1000) {
|
||||
const cutoff = now - 60000;
|
||||
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total throughput (bytes transferred)
|
||||
* Record bytes transferred for a connection
|
||||
*/
|
||||
public getThroughput(): { bytesIn: number; bytesOut: number } {
|
||||
let bytesIn = 0;
|
||||
let bytesOut = 0;
|
||||
public recordBytes(connectionId: string, bytesIn: number, bytesOut: number): void {
|
||||
// Update global throughput tracker
|
||||
this.throughputTracker.recordBytes(bytesIn, bytesOut);
|
||||
|
||||
// Sum bytes from all active connections
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
bytesIn += record.bytesReceived;
|
||||
bytesOut += record.bytesSent;
|
||||
// Update connection-specific tracker
|
||||
const tracker = this.connectionByteTrackers.get(connectionId);
|
||||
if (tracker) {
|
||||
tracker.bytesIn += bytesIn;
|
||||
tracker.bytesOut += bytesOut;
|
||||
tracker.lastUpdate = Date.now();
|
||||
}
|
||||
|
||||
return { bytesIn, bytesOut };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get throughput rate (bytes per second) for last minute
|
||||
* Clean up tracking for a closed connection
|
||||
*/
|
||||
public getThroughputRate(): { bytesInPerSec: number; bytesOutPerSec: number } {
|
||||
const now = Date.now();
|
||||
let recentBytesIn = 0;
|
||||
let recentBytesOut = 0;
|
||||
|
||||
// Calculate bytes transferred in last minute from active connections
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
const connectionAge = now - record.incomingStartTime;
|
||||
if (connectionAge < 60000) { // Connection started within last minute
|
||||
recentBytesIn += record.bytesReceived;
|
||||
recentBytesOut += record.bytesSent;
|
||||
} else {
|
||||
// For older connections, estimate rate based on average
|
||||
const rate = connectionAge / 60000;
|
||||
recentBytesIn += record.bytesReceived / rate;
|
||||
recentBytesOut += record.bytesSent / rate;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
bytesInPerSec: Math.round(recentBytesIn / 60),
|
||||
bytesOutPerSec: Math.round(recentBytesOut / 60)
|
||||
};
|
||||
public removeConnection(connectionId: string): void {
|
||||
this.connectionByteTrackers.delete(connectionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get top IPs by connection count
|
||||
*/
|
||||
public getTopIPs(limit: number = 10): Array<{ ip: string; connections: number }> {
|
||||
const ipCounts = this.getConnectionsByIP();
|
||||
const sorted = Array.from(ipCounts.entries())
|
||||
.sort((a, b) => b[1] - a[1])
|
||||
.slice(0, limit)
|
||||
.map(([ip, connections]) => ({ ip, connections }));
|
||||
|
||||
return sorted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an IP has reached the connection limit
|
||||
*/
|
||||
public isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean {
|
||||
const ipCounts = this.getConnectionsByIP();
|
||||
const currentConnections = ipCounts.get(ip) || 0;
|
||||
return currentConnections >= maxConnectionsPerIP;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old request timestamps
|
||||
*/
|
||||
private cleanupOldRequests(): void {
|
||||
const cutoff = Date.now() - this.RPS_WINDOW_SIZE;
|
||||
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the metrics collector and set up subscriptions
|
||||
* Start the metrics collector
|
||||
*/
|
||||
public start(): void {
|
||||
if (!this.smartProxy.routeConnectionHandler) {
|
||||
throw new Error('MetricsCollector: RouteConnectionHandler not available');
|
||||
}
|
||||
|
||||
// Subscribe to the newConnectionSubject from RouteConnectionHandler
|
||||
// Start periodic sampling
|
||||
this.samplingInterval = setInterval(() => {
|
||||
this.throughputTracker.takeSample();
|
||||
|
||||
// Clean up old connection trackers (connections closed more than 5 minutes ago)
|
||||
const cutoff = Date.now() - 300000;
|
||||
for (const [id, tracker] of this.connectionByteTrackers) {
|
||||
if (tracker.lastUpdate < cutoff) {
|
||||
this.connectionByteTrackers.delete(id);
|
||||
}
|
||||
}
|
||||
}, this.sampleIntervalMs);
|
||||
|
||||
// Subscribe to new connections
|
||||
this.connectionSubscription = this.smartProxy.routeConnectionHandler.newConnectionSubject.subscribe({
|
||||
next: (record) => {
|
||||
this.recordRequest();
|
||||
const routeName = record.routeConfig?.name || 'unknown';
|
||||
this.recordRequest(record.id, routeName, record.remoteIP);
|
||||
|
||||
// Optional: Log connection details
|
||||
if (this.smartProxy.settings?.enableDetailedLogging) {
|
||||
logger.log('debug', `MetricsCollector: New connection recorded`, {
|
||||
connectionId: record.id,
|
||||
remoteIP: record.remoteIP,
|
||||
routeName: record.routeConfig?.name || 'unknown',
|
||||
routeName,
|
||||
component: 'metrics'
|
||||
});
|
||||
}
|
||||
@ -269,9 +352,14 @@ export class MetricsCollector implements IProxyStatsExtended {
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the metrics collector and clean up resources
|
||||
* Stop the metrics collector
|
||||
*/
|
||||
public stop(): void {
|
||||
if (this.samplingInterval) {
|
||||
clearInterval(this.samplingInterval);
|
||||
this.samplingInterval = undefined;
|
||||
}
|
||||
|
||||
if (this.connectionSubscription) {
|
||||
this.connectionSubscription.unsubscribe();
|
||||
this.connectionSubscription = undefined;
|
||||
@ -281,7 +369,7 @@ export class MetricsCollector implements IProxyStatsExtended {
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for stop() for backward compatibility
|
||||
* Alias for stop() for compatibility
|
||||
*/
|
||||
public destroy(): void {
|
||||
this.stop();
|
||||
|
Reference in New Issue
Block a user