feat(metrics): add comprehensive metrics collection system
Implement real-time stats tracking including connection counts, request metrics, bandwidth usage, and route-specific monitoring. Adds MetricsCollector with observable streams for reactive monitoring integration.
This commit is contained in:
@ -30,6 +30,7 @@ import * as smartacmeHandlers from '@push.rocks/smartacme/dist_ts/handlers/index
|
||||
import * as smartlog from '@push.rocks/smartlog';
|
||||
import * as smartlogDestinationLocal from '@push.rocks/smartlog/destination-local';
|
||||
import * as taskbuffer from '@push.rocks/taskbuffer';
|
||||
import * as smartrx from '@push.rocks/smartrx';
|
||||
|
||||
export {
|
||||
lik,
|
||||
@ -45,6 +46,7 @@ export {
|
||||
smartlog,
|
||||
smartlogDestinationLocal,
|
||||
taskbuffer,
|
||||
smartrx,
|
||||
};
|
||||
|
||||
// third party scope
|
||||
|
285
ts/proxies/smart-proxy/metrics-collector.ts
Normal file
285
ts/proxies/smart-proxy/metrics-collector.ts
Normal file
@ -0,0 +1,285 @@
|
||||
import * as plugins from '../../plugins.js';
|
||||
import type { SmartProxy } from './smart-proxy.js';
|
||||
import type { IProxyStats, IProxyStatsExtended } from './models/metrics-types.js';
|
||||
import { logger } from '../../core/utils/logger.js';
|
||||
|
||||
/**
|
||||
* Collects and computes metrics for SmartProxy on-demand
|
||||
*/
|
||||
export class MetricsCollector implements IProxyStatsExtended {
|
||||
// RPS tracking (the only state we need to maintain)
|
||||
private requestTimestamps: number[] = [];
|
||||
private readonly RPS_WINDOW_SIZE = 60000; // 1 minute window
|
||||
|
||||
// Optional caching for performance
|
||||
private cachedMetrics: {
|
||||
timestamp: number;
|
||||
connectionsByRoute?: Map<string, number>;
|
||||
connectionsByIP?: Map<string, number>;
|
||||
} = { timestamp: 0 };
|
||||
|
||||
private readonly CACHE_TTL = 1000; // 1 second cache
|
||||
|
||||
// RxJS subscription for connection events
|
||||
private connectionSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||
|
||||
constructor(
|
||||
private smartProxy: SmartProxy
|
||||
) {
|
||||
// Subscription will be set up in start() method
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current number of active connections
|
||||
*/
|
||||
public getActiveConnections(): number {
|
||||
return this.smartProxy.connectionManager.getConnectionCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection counts grouped by route name
|
||||
*/
|
||||
public getConnectionsByRoute(): Map<string, number> {
|
||||
const now = Date.now();
|
||||
|
||||
// Return cached value if fresh
|
||||
if (this.cachedMetrics.connectionsByRoute &&
|
||||
now - this.cachedMetrics.timestamp < this.CACHE_TTL) {
|
||||
return new Map(this.cachedMetrics.connectionsByRoute);
|
||||
}
|
||||
|
||||
// Compute fresh value
|
||||
const routeCounts = new Map<string, number>();
|
||||
const connections = this.smartProxy.connectionManager.getConnections();
|
||||
|
||||
if (this.smartProxy.settings?.enableDetailedLogging) {
|
||||
logger.log('debug', `MetricsCollector: Computing route connections`, {
|
||||
totalConnections: connections.size,
|
||||
component: 'metrics'
|
||||
});
|
||||
}
|
||||
|
||||
for (const [_, record] of connections) {
|
||||
// Try different ways to get the route name
|
||||
const routeName = (record as any).routeName ||
|
||||
record.routeConfig?.name ||
|
||||
(record.routeConfig as any)?.routeName ||
|
||||
'unknown';
|
||||
|
||||
if (this.smartProxy.settings?.enableDetailedLogging) {
|
||||
logger.log('debug', `MetricsCollector: Connection route info`, {
|
||||
connectionId: record.id,
|
||||
routeName,
|
||||
hasRouteConfig: !!record.routeConfig,
|
||||
routeConfigName: record.routeConfig?.name,
|
||||
routeConfigKeys: record.routeConfig ? Object.keys(record.routeConfig) : [],
|
||||
component: 'metrics'
|
||||
});
|
||||
}
|
||||
|
||||
const current = routeCounts.get(routeName) || 0;
|
||||
routeCounts.set(routeName, current + 1);
|
||||
}
|
||||
|
||||
// Cache and return
|
||||
this.cachedMetrics.connectionsByRoute = routeCounts;
|
||||
this.cachedMetrics.timestamp = now;
|
||||
return new Map(routeCounts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection counts grouped by IP address
|
||||
*/
|
||||
public getConnectionsByIP(): Map<string, number> {
|
||||
const now = Date.now();
|
||||
|
||||
// Return cached value if fresh
|
||||
if (this.cachedMetrics.connectionsByIP &&
|
||||
now - this.cachedMetrics.timestamp < this.CACHE_TTL) {
|
||||
return new Map(this.cachedMetrics.connectionsByIP);
|
||||
}
|
||||
|
||||
// Compute fresh value
|
||||
const ipCounts = new Map<string, number>();
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
const ip = record.remoteIP;
|
||||
const current = ipCounts.get(ip) || 0;
|
||||
ipCounts.set(ip, current + 1);
|
||||
}
|
||||
|
||||
// Cache and return
|
||||
this.cachedMetrics.connectionsByIP = ipCounts;
|
||||
this.cachedMetrics.timestamp = now;
|
||||
return new Map(ipCounts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total number of connections since proxy start
|
||||
*/
|
||||
public getTotalConnections(): number {
|
||||
// Get from termination stats
|
||||
const stats = this.smartProxy.connectionManager.getTerminationStats();
|
||||
let total = this.smartProxy.connectionManager.getConnectionCount(); // Add active connections
|
||||
|
||||
// Add all terminated connections
|
||||
for (const reason in stats.incoming) {
|
||||
total += stats.incoming[reason];
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current requests per second rate
|
||||
*/
|
||||
public getRequestsPerSecond(): number {
|
||||
const now = Date.now();
|
||||
const windowStart = now - this.RPS_WINDOW_SIZE;
|
||||
|
||||
// Clean old timestamps
|
||||
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > windowStart);
|
||||
|
||||
// Calculate RPS based on window
|
||||
const requestsInWindow = this.requestTimestamps.length;
|
||||
return requestsInWindow / (this.RPS_WINDOW_SIZE / 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a new request for RPS tracking
|
||||
*/
|
||||
public recordRequest(): void {
|
||||
this.requestTimestamps.push(Date.now());
|
||||
|
||||
// Prevent unbounded growth
|
||||
if (this.requestTimestamps.length > 10000) {
|
||||
this.cleanupOldRequests();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total throughput (bytes transferred)
|
||||
*/
|
||||
public getThroughput(): { bytesIn: number; bytesOut: number } {
|
||||
let bytesIn = 0;
|
||||
let bytesOut = 0;
|
||||
|
||||
// Sum bytes from all active connections
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
bytesIn += record.bytesReceived;
|
||||
bytesOut += record.bytesSent;
|
||||
}
|
||||
|
||||
return { bytesIn, bytesOut };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get throughput rate (bytes per second) for last minute
|
||||
*/
|
||||
public getThroughputRate(): { bytesInPerSec: number; bytesOutPerSec: number } {
|
||||
const now = Date.now();
|
||||
let recentBytesIn = 0;
|
||||
let recentBytesOut = 0;
|
||||
|
||||
// Calculate bytes transferred in last minute from active connections
|
||||
for (const [_, record] of this.smartProxy.connectionManager.getConnections()) {
|
||||
const connectionAge = now - record.incomingStartTime;
|
||||
if (connectionAge < 60000) { // Connection started within last minute
|
||||
recentBytesIn += record.bytesReceived;
|
||||
recentBytesOut += record.bytesSent;
|
||||
} else {
|
||||
// For older connections, estimate rate based on average
|
||||
const rate = connectionAge / 60000;
|
||||
recentBytesIn += record.bytesReceived / rate;
|
||||
recentBytesOut += record.bytesSent / rate;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
bytesInPerSec: Math.round(recentBytesIn / 60),
|
||||
bytesOutPerSec: Math.round(recentBytesOut / 60)
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get top IPs by connection count
|
||||
*/
|
||||
public getTopIPs(limit: number = 10): Array<{ ip: string; connections: number }> {
|
||||
const ipCounts = this.getConnectionsByIP();
|
||||
const sorted = Array.from(ipCounts.entries())
|
||||
.sort((a, b) => b[1] - a[1])
|
||||
.slice(0, limit)
|
||||
.map(([ip, connections]) => ({ ip, connections }));
|
||||
|
||||
return sorted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an IP has reached the connection limit
|
||||
*/
|
||||
public isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean {
|
||||
const ipCounts = this.getConnectionsByIP();
|
||||
const currentConnections = ipCounts.get(ip) || 0;
|
||||
return currentConnections >= maxConnectionsPerIP;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old request timestamps
|
||||
*/
|
||||
private cleanupOldRequests(): void {
|
||||
const cutoff = Date.now() - this.RPS_WINDOW_SIZE;
|
||||
this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the metrics collector and set up subscriptions
|
||||
*/
|
||||
public start(): void {
|
||||
if (!this.smartProxy.routeConnectionHandler) {
|
||||
throw new Error('MetricsCollector: RouteConnectionHandler not available');
|
||||
}
|
||||
|
||||
// Subscribe to the newConnectionSubject from RouteConnectionHandler
|
||||
this.connectionSubscription = this.smartProxy.routeConnectionHandler.newConnectionSubject.subscribe({
|
||||
next: (record) => {
|
||||
this.recordRequest();
|
||||
|
||||
// Optional: Log connection details
|
||||
if (this.smartProxy.settings?.enableDetailedLogging) {
|
||||
logger.log('debug', `MetricsCollector: New connection recorded`, {
|
||||
connectionId: record.id,
|
||||
remoteIP: record.remoteIP,
|
||||
routeName: record.routeConfig?.name || 'unknown',
|
||||
component: 'metrics'
|
||||
});
|
||||
}
|
||||
},
|
||||
error: (err) => {
|
||||
logger.log('error', `MetricsCollector: Error in connection subscription`, {
|
||||
error: err.message,
|
||||
component: 'metrics'
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
logger.log('debug', 'MetricsCollector started', { component: 'metrics' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the metrics collector and clean up resources
|
||||
*/
|
||||
public stop(): void {
|
||||
if (this.connectionSubscription) {
|
||||
this.connectionSubscription.unsubscribe();
|
||||
this.connectionSubscription = undefined;
|
||||
}
|
||||
|
||||
logger.log('debug', 'MetricsCollector stopped', { component: 'metrics' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for stop() for backward compatibility
|
||||
*/
|
||||
public destroy(): void {
|
||||
this.stop();
|
||||
}
|
||||
}
|
@ -4,3 +4,4 @@
|
||||
// Export everything except IAcmeOptions from interfaces
|
||||
export type { ISmartProxyOptions, IConnectionRecord, TSmartProxyCertProvisionObject } from './interfaces.js';
|
||||
export * from './route-types.js';
|
||||
export * from './metrics-types.js';
|
||||
|
54
ts/proxies/smart-proxy/models/metrics-types.ts
Normal file
54
ts/proxies/smart-proxy/models/metrics-types.ts
Normal file
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Interface for proxy statistics and metrics
|
||||
*/
|
||||
export interface IProxyStats {
|
||||
/**
|
||||
* Get the current number of active connections
|
||||
*/
|
||||
getActiveConnections(): number;
|
||||
|
||||
/**
|
||||
* Get connection counts grouped by route name
|
||||
*/
|
||||
getConnectionsByRoute(): Map<string, number>;
|
||||
|
||||
/**
|
||||
* Get connection counts grouped by IP address
|
||||
*/
|
||||
getConnectionsByIP(): Map<string, number>;
|
||||
|
||||
/**
|
||||
* Get the total number of connections since proxy start
|
||||
*/
|
||||
getTotalConnections(): number;
|
||||
|
||||
/**
|
||||
* Get the current requests per second rate
|
||||
*/
|
||||
getRequestsPerSecond(): number;
|
||||
|
||||
/**
|
||||
* Get total throughput (bytes transferred)
|
||||
*/
|
||||
getThroughput(): { bytesIn: number; bytesOut: number };
|
||||
}
|
||||
|
||||
/**
|
||||
* Extended interface for additional metrics helpers
|
||||
*/
|
||||
export interface IProxyStatsExtended extends IProxyStats {
|
||||
/**
|
||||
* Get throughput rate (bytes per second) for last minute
|
||||
*/
|
||||
getThroughputRate(): { bytesInPerSec: number; bytesOutPerSec: number };
|
||||
|
||||
/**
|
||||
* Get top IPs by connection count
|
||||
*/
|
||||
getTopIPs(limit?: number): Array<{ ip: string; connections: number }>;
|
||||
|
||||
/**
|
||||
* Check if an IP has reached the connection limit
|
||||
*/
|
||||
isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean;
|
||||
}
|
@ -23,6 +23,9 @@ export class RouteConnectionHandler {
|
||||
|
||||
// Cache for route contexts to avoid recreation
|
||||
private routeContextCache: Map<string, IRouteContext> = new Map();
|
||||
|
||||
// RxJS Subject for new connections
|
||||
public newConnectionSubject = new plugins.smartrx.rxjs.Subject<IConnectionRecord>();
|
||||
|
||||
constructor(
|
||||
settings: ISmartProxyOptions,
|
||||
@ -35,6 +38,7 @@ export class RouteConnectionHandler {
|
||||
) {
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a route context object for port and host mapping functions
|
||||
@ -110,6 +114,9 @@ export class RouteConnectionHandler {
|
||||
// Connection was rejected due to limit - socket already destroyed by connection manager
|
||||
return;
|
||||
}
|
||||
|
||||
// Emit new connection event
|
||||
this.newConnectionSubject.next(record);
|
||||
const connectionId = record.id;
|
||||
|
||||
// Apply socket optimizations (apply to underlying socket)
|
||||
@ -640,6 +647,9 @@ export class RouteConnectionHandler {
|
||||
): void {
|
||||
const connectionId = record.id;
|
||||
const action = route.action as IRouteAction;
|
||||
|
||||
// Store the route config in the connection record for metrics and other uses
|
||||
record.routeConfig = route;
|
||||
|
||||
// Check if this route uses NFTables for forwarding
|
||||
if (action.forwardingEngine === 'nftables') {
|
||||
@ -957,6 +967,9 @@ export class RouteConnectionHandler {
|
||||
): Promise<void> {
|
||||
const connectionId = record.id;
|
||||
|
||||
// Store the route config in the connection record for metrics and other uses
|
||||
record.routeConfig = route;
|
||||
|
||||
if (!route.action.socketHandler) {
|
||||
logger.log('error', 'socket-handler action missing socketHandler function', {
|
||||
connectionId,
|
||||
|
@ -27,6 +27,10 @@ import { Mutex } from './utils/mutex.js';
|
||||
// Import ACME state manager
|
||||
import { AcmeStateManager } from './acme-state-manager.js';
|
||||
|
||||
// Import metrics collector
|
||||
import { MetricsCollector } from './metrics-collector.js';
|
||||
import type { IProxyStats } from './models/metrics-types.js';
|
||||
|
||||
/**
|
||||
* SmartProxy - Pure route-based API
|
||||
*
|
||||
@ -47,13 +51,13 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
private isShuttingDown: boolean = false;
|
||||
|
||||
// Component managers
|
||||
private connectionManager: ConnectionManager;
|
||||
public connectionManager: ConnectionManager;
|
||||
private securityManager: SecurityManager;
|
||||
private tlsManager: TlsManager;
|
||||
private httpProxyBridge: HttpProxyBridge;
|
||||
private timeoutManager: TimeoutManager;
|
||||
public routeManager: RouteManager; // Made public for route management
|
||||
private routeConnectionHandler: RouteConnectionHandler;
|
||||
public routeConnectionHandler: RouteConnectionHandler; // Made public for metrics
|
||||
private nftablesManager: NFTablesManager;
|
||||
|
||||
// Certificate manager for ACME and static certificates
|
||||
@ -64,6 +68,9 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
private routeUpdateLock: any = null; // Will be initialized as AsyncMutex
|
||||
private acmeStateManager: AcmeStateManager;
|
||||
|
||||
// Metrics collector
|
||||
private metricsCollector: MetricsCollector;
|
||||
|
||||
// Track port usage across route updates
|
||||
private portUsageMap: Map<number, Set<string>> = new Map();
|
||||
|
||||
@ -204,6 +211,9 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
|
||||
// Initialize ACME state manager
|
||||
this.acmeStateManager = new AcmeStateManager();
|
||||
|
||||
// Initialize metrics collector with reference to this SmartProxy instance
|
||||
this.metricsCollector = new MetricsCollector(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -383,6 +393,9 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
logger.log('info', 'Starting certificate provisioning now that ports are ready', { component: 'certificate-manager' });
|
||||
await this.certManager.provisionAllCertificates();
|
||||
}
|
||||
|
||||
// Start the metrics collector now that all components are initialized
|
||||
this.metricsCollector.start();
|
||||
|
||||
// Set up periodic connection logging and inactivity checks
|
||||
this.connectionLogger = setInterval(() => {
|
||||
@ -508,6 +521,9 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
|
||||
// Clear ACME state manager
|
||||
this.acmeStateManager.clear();
|
||||
|
||||
// Stop metrics collector
|
||||
this.metricsCollector.stop();
|
||||
|
||||
logger.log('info', 'SmartProxy shutdown complete.');
|
||||
}
|
||||
@ -905,6 +921,15 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
return this.certManager.getCertificateStatus(routeName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get proxy statistics and metrics
|
||||
*
|
||||
* @returns IProxyStats interface with various metrics methods
|
||||
*/
|
||||
public getStats(): IProxyStats {
|
||||
return this.metricsCollector;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if a domain name is valid for certificate issuance
|
||||
*/
|
||||
|
Reference in New Issue
Block a user