import * as plugins from '../../plugins.js'; import type { SmartProxy } from './smart-proxy.js'; import type { IProxyStats, IProxyStatsExtended } from './models/metrics-types.js'; import { logger } from '../../core/utils/logger.js'; /** * Collects and computes metrics for SmartProxy on-demand */ export class MetricsCollector implements IProxyStatsExtended { // RPS tracking (the only state we need to maintain) private requestTimestamps: number[] = []; private readonly RPS_WINDOW_SIZE = 60000; // 1 minute window private readonly MAX_TIMESTAMPS = 5000; // Maximum timestamps to keep // Optional caching for performance private cachedMetrics: { timestamp: number; connectionsByRoute?: Map; connectionsByIP?: Map; } = { timestamp: 0 }; private readonly CACHE_TTL = 1000; // 1 second cache // RxJS subscription for connection events private connectionSubscription?: plugins.smartrx.rxjs.Subscription; constructor( private smartProxy: SmartProxy ) { // Subscription will be set up in start() method } /** * Get the current number of active connections */ public getActiveConnections(): number { return this.smartProxy.connectionManager.getConnectionCount(); } /** * Get connection counts grouped by route name */ public getConnectionsByRoute(): Map { const now = Date.now(); // Return cached value if fresh if (this.cachedMetrics.connectionsByRoute && now - this.cachedMetrics.timestamp < this.CACHE_TTL) { return new Map(this.cachedMetrics.connectionsByRoute); } // Compute fresh value const routeCounts = new Map(); const connections = this.smartProxy.connectionManager.getConnections(); if (this.smartProxy.settings?.enableDetailedLogging) { logger.log('debug', `MetricsCollector: Computing route connections`, { totalConnections: connections.size, component: 'metrics' }); } for (const [_, record] of connections) { // Try different ways to get the route name const routeName = (record as any).routeName || record.routeConfig?.name || (record.routeConfig as any)?.routeName || 'unknown'; if (this.smartProxy.settings?.enableDetailedLogging) { logger.log('debug', `MetricsCollector: Connection route info`, { connectionId: record.id, routeName, hasRouteConfig: !!record.routeConfig, routeConfigName: record.routeConfig?.name, routeConfigKeys: record.routeConfig ? Object.keys(record.routeConfig) : [], component: 'metrics' }); } const current = routeCounts.get(routeName) || 0; routeCounts.set(routeName, current + 1); } // Cache and return this.cachedMetrics.connectionsByRoute = routeCounts; this.cachedMetrics.timestamp = now; return new Map(routeCounts); } /** * Get connection counts grouped by IP address */ public getConnectionsByIP(): Map { const now = Date.now(); // Return cached value if fresh if (this.cachedMetrics.connectionsByIP && now - this.cachedMetrics.timestamp < this.CACHE_TTL) { return new Map(this.cachedMetrics.connectionsByIP); } // Compute fresh value const ipCounts = new Map(); for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { const ip = record.remoteIP; const current = ipCounts.get(ip) || 0; ipCounts.set(ip, current + 1); } // Cache and return this.cachedMetrics.connectionsByIP = ipCounts; this.cachedMetrics.timestamp = now; return new Map(ipCounts); } /** * Get the total number of connections since proxy start */ public getTotalConnections(): number { // Get from termination stats const stats = this.smartProxy.connectionManager.getTerminationStats(); let total = this.smartProxy.connectionManager.getConnectionCount(); // Add active connections // Add all terminated connections for (const reason in stats.incoming) { total += stats.incoming[reason]; } return total; } /** * Get the current requests per second rate */ public getRequestsPerSecond(): number { const now = Date.now(); const windowStart = now - this.RPS_WINDOW_SIZE; // Clean old timestamps this.requestTimestamps = this.requestTimestamps.filter(ts => ts > windowStart); // Calculate RPS based on window const requestsInWindow = this.requestTimestamps.length; return requestsInWindow / (this.RPS_WINDOW_SIZE / 1000); } /** * Record a new request for RPS tracking */ public recordRequest(): void { const now = Date.now(); this.requestTimestamps.push(now); // Prevent unbounded growth - clean up more aggressively if (this.requestTimestamps.length > this.MAX_TIMESTAMPS) { // Keep only timestamps within the window const cutoff = now - this.RPS_WINDOW_SIZE; this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff); } } /** * Get total throughput (bytes transferred) */ public getThroughput(): { bytesIn: number; bytesOut: number } { let bytesIn = 0; let bytesOut = 0; // Sum bytes from all active connections for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { bytesIn += record.bytesReceived; bytesOut += record.bytesSent; } return { bytesIn, bytesOut }; } /** * Get throughput rate (bytes per second) for last minute */ public getThroughputRate(): { bytesInPerSec: number; bytesOutPerSec: number } { const now = Date.now(); let recentBytesIn = 0; let recentBytesOut = 0; // Calculate bytes transferred in last minute from active connections for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { const connectionAge = now - record.incomingStartTime; if (connectionAge < 60000) { // Connection started within last minute recentBytesIn += record.bytesReceived; recentBytesOut += record.bytesSent; } else { // For older connections, estimate rate based on average const rate = connectionAge / 60000; recentBytesIn += record.bytesReceived / rate; recentBytesOut += record.bytesSent / rate; } } return { bytesInPerSec: Math.round(recentBytesIn / 60), bytesOutPerSec: Math.round(recentBytesOut / 60) }; } /** * Get top IPs by connection count */ public getTopIPs(limit: number = 10): Array<{ ip: string; connections: number }> { const ipCounts = this.getConnectionsByIP(); const sorted = Array.from(ipCounts.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, limit) .map(([ip, connections]) => ({ ip, connections })); return sorted; } /** * Check if an IP has reached the connection limit */ public isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean { const ipCounts = this.getConnectionsByIP(); const currentConnections = ipCounts.get(ip) || 0; return currentConnections >= maxConnectionsPerIP; } /** * Clean up old request timestamps */ private cleanupOldRequests(): void { const cutoff = Date.now() - this.RPS_WINDOW_SIZE; this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff); } /** * Start the metrics collector and set up subscriptions */ public start(): void { if (!this.smartProxy.routeConnectionHandler) { throw new Error('MetricsCollector: RouteConnectionHandler not available'); } // Subscribe to the newConnectionSubject from RouteConnectionHandler this.connectionSubscription = this.smartProxy.routeConnectionHandler.newConnectionSubject.subscribe({ next: (record) => { this.recordRequest(); // Optional: Log connection details if (this.smartProxy.settings?.enableDetailedLogging) { logger.log('debug', `MetricsCollector: New connection recorded`, { connectionId: record.id, remoteIP: record.remoteIP, routeName: record.routeConfig?.name || 'unknown', component: 'metrics' }); } }, error: (err) => { logger.log('error', `MetricsCollector: Error in connection subscription`, { error: err.message, component: 'metrics' }); } }); logger.log('debug', 'MetricsCollector started', { component: 'metrics' }); } /** * Stop the metrics collector and clean up resources */ public stop(): void { if (this.connectionSubscription) { this.connectionSubscription.unsubscribe(); this.connectionSubscription = undefined; } logger.log('debug', 'MetricsCollector stopped', { component: 'metrics' }); } /** * Alias for stop() for backward compatibility */ public destroy(): void { this.stop(); } }