Files
smartproxy/ts/proxies/smart-proxy/metrics-collector.ts

377 lines
11 KiB
TypeScript
Raw Normal View History

import * as plugins from '../../plugins.js';
import type { SmartProxy } from './smart-proxy.js';
2025-06-22 22:28:37 +00:00
import type {
IMetrics,
IThroughputData,
IThroughputHistoryPoint,
IByteTracker
} from './models/metrics-types.js';
import { ThroughputTracker } from './throughput-tracker.js';
import { logger } from '../../core/utils/logger.js';
/**
2025-06-22 22:28:37 +00:00
* Collects and provides metrics for SmartProxy with clean API
*/
2025-06-22 22:28:37 +00:00
export class MetricsCollector implements IMetrics {
// Throughput tracking
private throughputTracker: ThroughputTracker;
2025-06-22 22:28:37 +00:00
// Request tracking
private requestTimestamps: number[] = [];
private totalRequests: number = 0;
2025-06-22 22:28:37 +00:00
// Connection byte tracking for per-route/IP metrics
private connectionByteTrackers = new Map<string, IByteTracker>();
2025-06-22 22:28:37 +00:00
// Subscriptions
private samplingInterval?: NodeJS.Timeout;
private connectionSubscription?: plugins.smartrx.rxjs.Subscription;
2025-06-22 22:28:37 +00:00
// Configuration
private readonly sampleIntervalMs: number;
private readonly retentionSeconds: number;
constructor(
2025-06-22 22:28:37 +00:00
private smartProxy: SmartProxy,
config?: {
sampleIntervalMs?: number;
retentionSeconds?: number;
}
) {
2025-06-22 22:28:37 +00:00
this.sampleIntervalMs = config?.sampleIntervalMs || 1000;
this.retentionSeconds = config?.retentionSeconds || 3600;
this.throughputTracker = new ThroughputTracker(this.retentionSeconds);
}
2025-06-22 22:28:37 +00:00
// Connection metrics implementation
public connections = {
active: (): number => {
return this.smartProxy.connectionManager.getConnectionCount();
},
2025-06-22 22:28:37 +00:00
total: (): number => {
const stats = this.smartProxy.connectionManager.getTerminationStats();
let total = this.smartProxy.connectionManager.getConnectionCount();
for (const reason in stats.incoming) {
total += stats.incoming[reason];
}
return total;
},
2025-06-22 22:28:37 +00:00
byRoute: (): Map<string, number> => {
const routeCounts = new Map<string, number>();
const connections = this.smartProxy.connectionManager.getConnections();
for (const [_, record] of connections) {
const routeName = (record as any).routeName ||
record.routeConfig?.name ||
'unknown';
const current = routeCounts.get(routeName) || 0;
routeCounts.set(routeName, current + 1);
}
return routeCounts;
},
2025-06-22 22:28:37 +00:00
byIP: (): Map<string, number> => {
const ipCounts = new Map<string, number>();
2025-06-22 22:28:37 +00:00
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
const ip = record.remoteIP;
const current = ipCounts.get(ip) || 0;
ipCounts.set(ip, current + 1);
}
2025-06-22 22:28:37 +00:00
return ipCounts;
},
2025-06-22 22:28:37 +00:00
topIPs: (limit: number = 10): Array<{ ip: string; count: number }> => {
const ipCounts = this.connections.byIP();
return Array.from(ipCounts.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, limit)
.map(([ip, count]) => ({ ip, count }));
}
};
2025-06-22 22:28:37 +00:00
// Throughput metrics implementation
public throughput = {
instant: (): IThroughputData => {
return this.throughputTracker.getRate(1);
},
2025-06-22 22:28:37 +00:00
recent: (): IThroughputData => {
return this.throughputTracker.getRate(10);
},
2025-06-22 22:28:37 +00:00
average: (): IThroughputData => {
return this.throughputTracker.getRate(60);
},
2025-06-22 22:28:37 +00:00
custom: (seconds: number): IThroughputData => {
return this.throughputTracker.getRate(seconds);
},
2025-06-22 22:28:37 +00:00
history: (seconds: number): Array<IThroughputHistoryPoint> => {
return this.throughputTracker.getHistory(seconds);
},
2025-06-22 22:28:37 +00:00
byRoute: (windowSeconds: number = 60): Map<string, IThroughputData> => {
const routeThroughput = new Map<string, IThroughputData>();
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);
// Aggregate bytes by route from trackers
const routeBytes = new Map<string, { in: number; out: number }>();
for (const [_, tracker] of this.connectionByteTrackers) {
if (tracker.lastUpdate > windowStart) {
const current = routeBytes.get(tracker.routeName) || { in: 0, out: 0 };
current.in += tracker.bytesIn;
current.out += tracker.bytesOut;
routeBytes.set(tracker.routeName, current);
}
}
// Convert to rates
for (const [route, bytes] of routeBytes) {
routeThroughput.set(route, {
in: Math.round(bytes.in / windowSeconds),
out: Math.round(bytes.out / windowSeconds)
});
}
return routeThroughput;
},
byIP: (windowSeconds: number = 60): Map<string, IThroughputData> => {
const ipThroughput = new Map<string, IThroughputData>();
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);
// Aggregate bytes by IP from trackers
const ipBytes = new Map<string, { in: number; out: number }>();
for (const [_, tracker] of this.connectionByteTrackers) {
if (tracker.lastUpdate > windowStart) {
const current = ipBytes.get(tracker.remoteIP) || { in: 0, out: 0 };
current.in += tracker.bytesIn;
current.out += tracker.bytesOut;
ipBytes.set(tracker.remoteIP, current);
}
}
// Convert to rates
for (const [ip, bytes] of ipBytes) {
ipThroughput.set(ip, {
in: Math.round(bytes.in / windowSeconds),
out: Math.round(bytes.out / windowSeconds)
});
}
return ipThroughput;
}
};
2025-06-22 22:28:37 +00:00
// Request metrics implementation
public requests = {
perSecond: (): number => {
const now = Date.now();
const oneSecondAgo = now - 1000;
// Clean old timestamps
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > now - 60000);
// Count requests in last second
const recentRequests = this.requestTimestamps.filter(ts => ts > oneSecondAgo);
return recentRequests.length;
},
2025-06-22 22:28:37 +00:00
perMinute: (): number => {
const now = Date.now();
const oneMinuteAgo = now - 60000;
// Count requests in last minute
const recentRequests = this.requestTimestamps.filter(ts => ts > oneMinuteAgo);
return recentRequests.length;
},
2025-06-22 22:28:37 +00:00
total: (): number => {
return this.totalRequests;
}
};
2025-06-22 22:28:37 +00:00
// Totals implementation
public totals = {
bytesIn: (): number => {
let total = 0;
// Sum from all active connections
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
total += record.bytesReceived;
}
// TODO: Add historical data from terminated connections
return total;
},
2025-06-22 22:28:37 +00:00
bytesOut: (): number => {
let total = 0;
// Sum from all active connections
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
total += record.bytesSent;
}
// TODO: Add historical data from terminated connections
return total;
},
connections: (): number => {
return this.connections.total();
}
2025-06-22 22:28:37 +00:00
};
2025-06-22 22:28:37 +00:00
// Percentiles implementation (placeholder for now)
public percentiles = {
connectionDuration: (): { p50: number; p95: number; p99: number } => {
// TODO: Implement percentile calculations
return { p50: 0, p95: 0, p99: 0 };
},
2025-06-22 22:28:37 +00:00
bytesTransferred: (): {
in: { p50: number; p95: number; p99: number };
out: { p50: number; p95: number; p99: number };
} => {
// TODO: Implement percentile calculations
return {
in: { p50: 0, p95: 0, p99: 0 },
out: { p50: 0, p95: 0, p99: 0 }
};
}
2025-06-22 22:28:37 +00:00
};
/**
2025-06-22 22:28:37 +00:00
* Record a new request
*/
2025-06-22 22:28:37 +00:00
public recordRequest(connectionId: string, routeName: string, remoteIP: string): void {
const now = Date.now();
2025-06-22 22:28:37 +00:00
this.requestTimestamps.push(now);
this.totalRequests++;
2025-06-22 22:28:37 +00:00
// Initialize byte tracker for this connection
this.connectionByteTrackers.set(connectionId, {
connectionId,
routeName,
remoteIP,
bytesIn: 0,
bytesOut: 0,
lastUpdate: now
});
2025-06-22 22:28:37 +00:00
// Cleanup old request timestamps (keep last minute only)
if (this.requestTimestamps.length > 1000) {
const cutoff = now - 60000;
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff);
}
}
/**
2025-06-22 22:28:37 +00:00
* Record bytes transferred for a connection
*/
2025-06-22 22:28:37 +00:00
public recordBytes(connectionId: string, bytesIn: number, bytesOut: number): void {
// Update global throughput tracker
this.throughputTracker.recordBytes(bytesIn, bytesOut);
2025-06-22 22:28:37 +00:00
// Update connection-specific tracker
const tracker = this.connectionByteTrackers.get(connectionId);
if (tracker) {
tracker.bytesIn += bytesIn;
tracker.bytesOut += bytesOut;
tracker.lastUpdate = Date.now();
}
}
/**
2025-06-22 22:28:37 +00:00
* Clean up tracking for a closed connection
*/
2025-06-22 22:28:37 +00:00
public removeConnection(connectionId: string): void {
this.connectionByteTrackers.delete(connectionId);
}
/**
2025-06-22 22:28:37 +00:00
* Start the metrics collector
*/
public start(): void {
if (!this.smartProxy.routeConnectionHandler) {
throw new Error('MetricsCollector: RouteConnectionHandler not available');
}
2025-06-22 22:28:37 +00:00
// Start periodic sampling
this.samplingInterval = setInterval(() => {
this.throughputTracker.takeSample();
// Clean up old connection trackers (connections closed more than 5 minutes ago)
const cutoff = Date.now() - 300000;
for (const [id, tracker] of this.connectionByteTrackers) {
if (tracker.lastUpdate < cutoff) {
this.connectionByteTrackers.delete(id);
}
}
}, this.sampleIntervalMs);
// Subscribe to new connections
this.connectionSubscription = this.smartProxy.routeConnectionHandler.newConnectionSubject.subscribe({
next: (record) => {
2025-06-22 22:28:37 +00:00
const routeName = record.routeConfig?.name || 'unknown';
this.recordRequest(record.id, routeName, record.remoteIP);
if (this.smartProxy.settings?.enableDetailedLogging) {
logger.log('debug', `MetricsCollector: New connection recorded`, {
connectionId: record.id,
remoteIP: record.remoteIP,
2025-06-22 22:28:37 +00:00
routeName,
component: 'metrics'
});
}
},
error: (err) => {
logger.log('error', `MetricsCollector: Error in connection subscription`, {
error: err.message,
component: 'metrics'
});
}
});
logger.log('debug', 'MetricsCollector started', { component: 'metrics' });
}
/**
2025-06-22 22:28:37 +00:00
* Stop the metrics collector
*/
public stop(): void {
2025-06-22 22:28:37 +00:00
if (this.samplingInterval) {
clearInterval(this.samplingInterval);
this.samplingInterval = undefined;
}
if (this.connectionSubscription) {
this.connectionSubscription.unsubscribe();
this.connectionSubscription = undefined;
}
logger.log('debug', 'MetricsCollector stopped', { component: 'metrics' });
}
/**
2025-06-22 22:28:37 +00:00
* Alias for stop() for compatibility
*/
public destroy(): void {
this.stop();
}
}