451 lines
14 KiB
TypeScript
451 lines
14 KiB
TypeScript
import * as plugins from '../../plugins.js';
|
|
import type { SmartProxy } from './smart-proxy.js';
|
|
import type {
|
|
IMetrics,
|
|
IThroughputData,
|
|
IThroughputHistoryPoint,
|
|
IByteTracker
|
|
} from './models/metrics-types.js';
|
|
import { ThroughputTracker } from './throughput-tracker.js';
|
|
import { logger } from '../../core/utils/logger.js';
|
|
|
|
/**
|
|
* Collects and provides metrics for SmartProxy with clean API
|
|
*/
|
|
export class MetricsCollector implements IMetrics {
|
|
// Throughput tracking
|
|
private throughputTracker: ThroughputTracker;
|
|
|
|
// Request tracking
|
|
private requestTimestamps: number[] = [];
|
|
private totalRequests: number = 0;
|
|
|
|
// Connection byte tracking for per-route/IP metrics
|
|
private connectionByteTrackers = new Map<string, IByteTracker>();
|
|
|
|
// Subscriptions
|
|
private samplingInterval?: NodeJS.Timeout;
|
|
private connectionSubscription?: plugins.smartrx.rxjs.Subscription;
|
|
|
|
// Configuration
|
|
private readonly sampleIntervalMs: number;
|
|
private readonly retentionSeconds: number;
|
|
|
|
constructor(
|
|
private smartProxy: SmartProxy,
|
|
config?: {
|
|
sampleIntervalMs?: number;
|
|
retentionSeconds?: number;
|
|
}
|
|
) {
|
|
this.sampleIntervalMs = config?.sampleIntervalMs || 1000;
|
|
this.retentionSeconds = config?.retentionSeconds || 3600;
|
|
this.throughputTracker = new ThroughputTracker(this.retentionSeconds);
|
|
}
|
|
|
|
// Connection metrics implementation
|
|
public connections = {
|
|
active: (): number => {
|
|
return this.smartProxy.connectionManager.getConnectionCount();
|
|
},
|
|
|
|
total: (): number => {
|
|
const stats = this.smartProxy.connectionManager.getTerminationStats();
|
|
let total = this.smartProxy.connectionManager.getConnectionCount();
|
|
|
|
for (const reason in stats.incoming) {
|
|
total += stats.incoming[reason];
|
|
}
|
|
|
|
return total;
|
|
},
|
|
|
|
byRoute: (): Map<string, number> => {
|
|
const routeCounts = new Map<string, number>();
|
|
const connections = this.smartProxy.connectionManager.getConnections();
|
|
|
|
for (const [_, record] of connections) {
|
|
const routeName = (record as any).routeName ||
|
|
record.routeConfig?.name ||
|
|
'unknown';
|
|
|
|
const current = routeCounts.get(routeName) || 0;
|
|
routeCounts.set(routeName, current + 1);
|
|
}
|
|
|
|
return routeCounts;
|
|
},
|
|
|
|
byIP: (): Map<string, number> => {
|
|
const ipCounts = new Map<string, number>();
|
|
|
|
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
|
const ip = record.remoteIP;
|
|
const current = ipCounts.get(ip) || 0;
|
|
ipCounts.set(ip, current + 1);
|
|
}
|
|
|
|
return ipCounts;
|
|
},
|
|
|
|
topIPs: (limit: number = 10): Array<{ ip: string; count: number }> => {
|
|
const ipCounts = this.connections.byIP();
|
|
return Array.from(ipCounts.entries())
|
|
.sort((a, b) => b[1] - a[1])
|
|
.slice(0, limit)
|
|
.map(([ip, count]) => ({ ip, count }));
|
|
}
|
|
};
|
|
|
|
// Throughput metrics implementation
|
|
public throughput = {
|
|
instant: (): IThroughputData => {
|
|
return this.throughputTracker.getRate(1);
|
|
},
|
|
|
|
recent: (): IThroughputData => {
|
|
return this.throughputTracker.getRate(10);
|
|
},
|
|
|
|
average: (): IThroughputData => {
|
|
return this.throughputTracker.getRate(60);
|
|
},
|
|
|
|
custom: (seconds: number): IThroughputData => {
|
|
return this.throughputTracker.getRate(seconds);
|
|
},
|
|
|
|
history: (seconds: number): Array<IThroughputHistoryPoint> => {
|
|
return this.throughputTracker.getHistory(seconds);
|
|
},
|
|
|
|
byRoute: (windowSeconds: number = 60): Map<string, IThroughputData> => {
|
|
const routeThroughput = new Map<string, IThroughputData>();
|
|
const now = Date.now();
|
|
const windowStart = now - (windowSeconds * 1000);
|
|
|
|
// Aggregate bytes by route - calculate actual bytes transferred in window
|
|
const routeData = new Map<string, { bytesIn: number; bytesOut: number }>();
|
|
|
|
for (const [_, tracker] of this.connectionByteTrackers) {
|
|
// Only include connections that were active within the window
|
|
if (tracker.lastUpdate > windowStart) {
|
|
let windowBytesIn = 0;
|
|
let windowBytesOut = 0;
|
|
|
|
if (tracker.windowSnapshots && tracker.windowSnapshots.length > 0) {
|
|
// Find the earliest snapshot within or just before the window
|
|
let startSnapshot = { timestamp: tracker.startTime, bytesIn: 0, bytesOut: 0 };
|
|
for (const snapshot of tracker.windowSnapshots) {
|
|
if (snapshot.timestamp <= windowStart) {
|
|
startSnapshot = snapshot;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Calculate bytes transferred since window start
|
|
windowBytesIn = tracker.bytesIn - startSnapshot.bytesIn;
|
|
windowBytesOut = tracker.bytesOut - startSnapshot.bytesOut;
|
|
} else if (tracker.startTime > windowStart) {
|
|
// Connection started within window, use all its bytes
|
|
windowBytesIn = tracker.bytesIn;
|
|
windowBytesOut = tracker.bytesOut;
|
|
}
|
|
|
|
// Add to route totals
|
|
const current = routeData.get(tracker.routeName) || { bytesIn: 0, bytesOut: 0 };
|
|
current.bytesIn += windowBytesIn;
|
|
current.bytesOut += windowBytesOut;
|
|
routeData.set(tracker.routeName, current);
|
|
}
|
|
}
|
|
|
|
// Convert to rates (bytes per second)
|
|
for (const [route, data] of routeData) {
|
|
routeThroughput.set(route, {
|
|
in: Math.round(data.bytesIn / windowSeconds),
|
|
out: Math.round(data.bytesOut / windowSeconds)
|
|
});
|
|
}
|
|
|
|
return routeThroughput;
|
|
},
|
|
|
|
byIP: (windowSeconds: number = 60): Map<string, IThroughputData> => {
|
|
const ipThroughput = new Map<string, IThroughputData>();
|
|
const now = Date.now();
|
|
const windowStart = now - (windowSeconds * 1000);
|
|
|
|
// Aggregate bytes by IP - calculate actual bytes transferred in window
|
|
const ipData = new Map<string, { bytesIn: number; bytesOut: number }>();
|
|
|
|
for (const [_, tracker] of this.connectionByteTrackers) {
|
|
// Only include connections that were active within the window
|
|
if (tracker.lastUpdate > windowStart) {
|
|
let windowBytesIn = 0;
|
|
let windowBytesOut = 0;
|
|
|
|
if (tracker.windowSnapshots && tracker.windowSnapshots.length > 0) {
|
|
// Find the earliest snapshot within or just before the window
|
|
let startSnapshot = { timestamp: tracker.startTime, bytesIn: 0, bytesOut: 0 };
|
|
for (const snapshot of tracker.windowSnapshots) {
|
|
if (snapshot.timestamp <= windowStart) {
|
|
startSnapshot = snapshot;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Calculate bytes transferred since window start
|
|
windowBytesIn = tracker.bytesIn - startSnapshot.bytesIn;
|
|
windowBytesOut = tracker.bytesOut - startSnapshot.bytesOut;
|
|
} else if (tracker.startTime > windowStart) {
|
|
// Connection started within window, use all its bytes
|
|
windowBytesIn = tracker.bytesIn;
|
|
windowBytesOut = tracker.bytesOut;
|
|
}
|
|
|
|
// Add to IP totals
|
|
const current = ipData.get(tracker.remoteIP) || { bytesIn: 0, bytesOut: 0 };
|
|
current.bytesIn += windowBytesIn;
|
|
current.bytesOut += windowBytesOut;
|
|
ipData.set(tracker.remoteIP, current);
|
|
}
|
|
}
|
|
|
|
// Convert to rates (bytes per second)
|
|
for (const [ip, data] of ipData) {
|
|
ipThroughput.set(ip, {
|
|
in: Math.round(data.bytesIn / windowSeconds),
|
|
out: Math.round(data.bytesOut / windowSeconds)
|
|
});
|
|
}
|
|
|
|
return ipThroughput;
|
|
}
|
|
};
|
|
|
|
// Request metrics implementation
|
|
public requests = {
|
|
perSecond: (): number => {
|
|
const now = Date.now();
|
|
const oneSecondAgo = now - 1000;
|
|
|
|
// Clean old timestamps
|
|
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > now - 60000);
|
|
|
|
// Count requests in last second
|
|
const recentRequests = this.requestTimestamps.filter(ts => ts > oneSecondAgo);
|
|
return recentRequests.length;
|
|
},
|
|
|
|
perMinute: (): number => {
|
|
const now = Date.now();
|
|
const oneMinuteAgo = now - 60000;
|
|
|
|
// Count requests in last minute
|
|
const recentRequests = this.requestTimestamps.filter(ts => ts > oneMinuteAgo);
|
|
return recentRequests.length;
|
|
},
|
|
|
|
total: (): number => {
|
|
return this.totalRequests;
|
|
}
|
|
};
|
|
|
|
// Totals implementation
|
|
public totals = {
|
|
bytesIn: (): number => {
|
|
let total = 0;
|
|
|
|
// Sum from all active connections
|
|
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
|
total += record.bytesReceived;
|
|
}
|
|
|
|
// TODO: Add historical data from terminated connections
|
|
|
|
return total;
|
|
},
|
|
|
|
bytesOut: (): number => {
|
|
let total = 0;
|
|
|
|
// Sum from all active connections
|
|
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
|
total += record.bytesSent;
|
|
}
|
|
|
|
// TODO: Add historical data from terminated connections
|
|
|
|
return total;
|
|
},
|
|
|
|
connections: (): number => {
|
|
return this.connections.total();
|
|
}
|
|
};
|
|
|
|
// Percentiles implementation (placeholder for now)
|
|
public percentiles = {
|
|
connectionDuration: (): { p50: number; p95: number; p99: number } => {
|
|
// TODO: Implement percentile calculations
|
|
return { p50: 0, p95: 0, p99: 0 };
|
|
},
|
|
|
|
bytesTransferred: (): {
|
|
in: { p50: number; p95: number; p99: number };
|
|
out: { p50: number; p95: number; p99: number };
|
|
} => {
|
|
// TODO: Implement percentile calculations
|
|
return {
|
|
in: { p50: 0, p95: 0, p99: 0 },
|
|
out: { p50: 0, p95: 0, p99: 0 }
|
|
};
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Record a new request
|
|
*/
|
|
public recordRequest(connectionId: string, routeName: string, remoteIP: string): void {
|
|
const now = Date.now();
|
|
this.requestTimestamps.push(now);
|
|
this.totalRequests++;
|
|
|
|
// Initialize byte tracker for this connection
|
|
this.connectionByteTrackers.set(connectionId, {
|
|
connectionId,
|
|
routeName,
|
|
remoteIP,
|
|
bytesIn: 0,
|
|
bytesOut: 0,
|
|
startTime: now,
|
|
lastUpdate: now,
|
|
windowSnapshots: [] // Initialize empty snapshots array
|
|
});
|
|
|
|
// Cleanup old request timestamps
|
|
if (this.requestTimestamps.length > 5000) {
|
|
// First try to clean up old timestamps (older than 1 minute)
|
|
const cutoff = now - 60000;
|
|
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff);
|
|
|
|
// If still too many, enforce hard cap of 5000 most recent
|
|
if (this.requestTimestamps.length > 5000) {
|
|
this.requestTimestamps = this.requestTimestamps.slice(-5000);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Record bytes transferred for a connection
|
|
*/
|
|
public recordBytes(connectionId: string, bytesIn: number, bytesOut: number): void {
|
|
// Update global throughput tracker
|
|
this.throughputTracker.recordBytes(bytesIn, bytesOut);
|
|
|
|
// Update connection-specific tracker
|
|
const tracker = this.connectionByteTrackers.get(connectionId);
|
|
if (tracker) {
|
|
tracker.bytesIn += bytesIn;
|
|
tracker.bytesOut += bytesOut;
|
|
tracker.lastUpdate = Date.now();
|
|
|
|
// Initialize snapshots array if not present
|
|
if (!tracker.windowSnapshots) {
|
|
tracker.windowSnapshots = [];
|
|
}
|
|
|
|
// Add current snapshot - we'll use these for accurate windowed calculations
|
|
tracker.windowSnapshots.push({
|
|
timestamp: Date.now(),
|
|
bytesIn: tracker.bytesIn,
|
|
bytesOut: tracker.bytesOut
|
|
});
|
|
|
|
// Keep only snapshots from last 5 minutes to prevent memory growth
|
|
const fiveMinutesAgo = Date.now() - 300000;
|
|
tracker.windowSnapshots = tracker.windowSnapshots.filter(s => s.timestamp > fiveMinutesAgo);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clean up tracking for a closed connection
|
|
*/
|
|
public removeConnection(connectionId: string): void {
|
|
this.connectionByteTrackers.delete(connectionId);
|
|
}
|
|
|
|
/**
|
|
* Start the metrics collector
|
|
*/
|
|
public start(): void {
|
|
if (!this.smartProxy.routeConnectionHandler) {
|
|
throw new Error('MetricsCollector: RouteConnectionHandler not available');
|
|
}
|
|
|
|
// Start periodic sampling
|
|
this.samplingInterval = setInterval(() => {
|
|
this.throughputTracker.takeSample();
|
|
|
|
// Clean up old connection trackers (connections closed more than 5 minutes ago)
|
|
const cutoff = Date.now() - 300000;
|
|
for (const [id, tracker] of this.connectionByteTrackers) {
|
|
if (tracker.lastUpdate < cutoff) {
|
|
this.connectionByteTrackers.delete(id);
|
|
}
|
|
}
|
|
}, this.sampleIntervalMs);
|
|
|
|
// Subscribe to new connections
|
|
this.connectionSubscription = this.smartProxy.routeConnectionHandler.newConnectionSubject.subscribe({
|
|
next: (record) => {
|
|
const routeName = record.routeConfig?.name || 'unknown';
|
|
this.recordRequest(record.id, routeName, record.remoteIP);
|
|
|
|
if (this.smartProxy.settings?.enableDetailedLogging) {
|
|
logger.log('debug', `MetricsCollector: New connection recorded`, {
|
|
connectionId: record.id,
|
|
remoteIP: record.remoteIP,
|
|
routeName,
|
|
component: 'metrics'
|
|
});
|
|
}
|
|
},
|
|
error: (err) => {
|
|
logger.log('error', `MetricsCollector: Error in connection subscription`, {
|
|
error: err.message,
|
|
component: 'metrics'
|
|
});
|
|
}
|
|
});
|
|
|
|
logger.log('debug', 'MetricsCollector started', { component: 'metrics' });
|
|
}
|
|
|
|
/**
|
|
* Stop the metrics collector
|
|
*/
|
|
public stop(): void {
|
|
if (this.samplingInterval) {
|
|
clearInterval(this.samplingInterval);
|
|
this.samplingInterval = undefined;
|
|
}
|
|
|
|
if (this.connectionSubscription) {
|
|
this.connectionSubscription.unsubscribe();
|
|
this.connectionSubscription = undefined;
|
|
}
|
|
|
|
logger.log('debug', 'MetricsCollector stopped', { component: 'metrics' });
|
|
}
|
|
|
|
/**
|
|
* Alias for stop() for compatibility
|
|
*/
|
|
public destroy(): void {
|
|
this.stop();
|
|
}
|
|
} |