From 9bf15ff756c25a7fe05d5c6e124d580db6f15280 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 9 Jun 2025 15:02:36 +0000 Subject: [PATCH] feat(metrics): add comprehensive metrics collection system Implement real-time stats tracking including connection counts, request metrics, bandwidth usage, and route-specific monitoring. Adds MetricsCollector with observable streams for reactive monitoring integration. --- package.json | 1 + pnpm-lock.yaml | 13 +- readme.metrics.md | 591 ++++++++++++++++++ test/test.keepalive-support.node.ts | 250 ++++++++ test/test.metrics-collector.ts | 280 +++++++++ ts/plugins.ts | 2 + ts/proxies/smart-proxy/metrics-collector.ts | 285 +++++++++ ts/proxies/smart-proxy/models/index.ts | 1 + .../smart-proxy/models/metrics-types.ts | 54 ++ .../smart-proxy/route-connection-handler.ts | 13 + ts/proxies/smart-proxy/smart-proxy.ts | 29 +- 11 files changed, 1508 insertions(+), 11 deletions(-) create mode 100644 readme.metrics.md create mode 100644 test/test.keepalive-support.node.ts create mode 100644 test/test.metrics-collector.ts create mode 100644 ts/proxies/smart-proxy/metrics-collector.ts create mode 100644 ts/proxies/smart-proxy/models/metrics-types.ts diff --git a/package.json b/package.json index a79871b..2df112f 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "@push.rocks/smartnetwork": "^4.0.2", "@push.rocks/smartpromise": "^4.2.3", "@push.rocks/smartrequest": "^2.1.0", + "@push.rocks/smartrx": "^3.0.10", "@push.rocks/smartstring": "^4.0.15", "@push.rocks/taskbuffer": "^3.1.7", "@tsclass/tsclass": "^9.2.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e8e0048..160fef9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -35,6 +35,9 @@ importers: '@push.rocks/smartrequest': specifier: ^2.1.0 version: 2.1.0 + '@push.rocks/smartrx': + specifier: ^3.0.10 + version: 3.0.10 '@push.rocks/smartstring': specifier: ^4.0.15 version: 4.0.15 @@ -977,9 +980,6 @@ packages: '@push.rocks/smartrx@3.0.10': resolution: {integrity: sha512-USjIYcsSfzn14cwOsxgq/bBmWDTTzy3ouWAnW5NdMyRRzEbmeNrvmy6TRqNeDlJ2PsYNTt1rr/zGUqvIy72ITg==} - '@push.rocks/smartrx@3.0.7': - resolution: {integrity: sha512-qCWy0s3RLAgGSnaw/Gu0BNaJ59CsI6RK5OJDCCqxc7P2X/S755vuLtnAR5/0dEjdhCHXHX9ytPZx+o9g/CNiyA==} - '@push.rocks/smarts3@2.2.5': resolution: {integrity: sha512-OZjD0jBCUTJCLnwraxBcyZ3he5buXf2OEM1zipiTBChA2EcKUZWKk/a6KR5WT+NlFCIIuB23UG+U+cxsIWM91Q==} @@ -6131,11 +6131,6 @@ snapshots: '@push.rocks/smartpromise': 4.2.3 rxjs: 7.8.2 - '@push.rocks/smartrx@3.0.7': - dependencies: - '@push.rocks/smartpromise': 4.2.3 - rxjs: 7.8.2 - '@push.rocks/smarts3@2.2.5': dependencies: '@push.rocks/smartbucket': 3.3.7 @@ -6301,7 +6296,7 @@ snapshots: '@push.rocks/smartenv': 5.0.12 '@push.rocks/smartjson': 5.0.20 '@push.rocks/smartpromise': 4.2.3 - '@push.rocks/smartrx': 3.0.7 + '@push.rocks/smartrx': 3.0.10 '@tempfix/idb': 8.0.3 fake-indexeddb: 5.0.2 diff --git a/readme.metrics.md b/readme.metrics.md new file mode 100644 index 0000000..633b660 --- /dev/null +++ b/readme.metrics.md @@ -0,0 +1,591 @@ +# SmartProxy Metrics Implementation Plan + +This document outlines the plan for implementing comprehensive metrics tracking in SmartProxy. + +## Overview + +The metrics system will provide real-time insights into proxy performance, connection statistics, and throughput data. The implementation will be efficient, thread-safe, and have minimal impact on proxy performance. + +**Key Design Decisions**: + +1. **On-demand computation**: Instead of maintaining duplicate state, the MetricsCollector computes metrics on-demand from existing data structures. + +2. **SmartProxy-centric architecture**: MetricsCollector receives the SmartProxy instance, providing access to all components: + - ConnectionManager for connection data + - RouteManager for route metadata + - Settings for configuration + - Future components without API changes + +This approach: +- Eliminates synchronization issues +- Reduces memory overhead +- Simplifies the implementation +- Guarantees metrics accuracy +- Leverages existing battle-tested components +- Provides flexibility for future enhancements + +## Metrics Interface + +```typescript +interface IProxyStats { + getActiveConnections(): number; + getConnectionsByRoute(): Map; + getConnectionsByIP(): Map; + getTotalConnections(): number; + getRequestsPerSecond(): number; + getThroughput(): { bytesIn: number, bytesOut: number }; +} +``` + +## Implementation Plan + +### 1. Create MetricsCollector Class + +**Location**: `/ts/proxies/smart-proxy/metrics-collector.ts` + +```typescript +import type { SmartProxy } from './smart-proxy.js'; + +export class MetricsCollector implements IProxyStats { + constructor( + private smartProxy: SmartProxy + ) {} + + // RPS tracking (the only state we need to maintain) + private requestTimestamps: number[] = []; + private readonly RPS_WINDOW_SIZE = 60000; // 1 minute window + + // All other metrics are computed on-demand from SmartProxy's components +} +``` + +### 2. Integration Points + +Since metrics are computed on-demand from ConnectionManager's records, we only need minimal integration: + +#### A. Request Tracking for RPS + +**File**: `/ts/proxies/smart-proxy/route-connection-handler.ts` + +```typescript +// In handleNewConnection when a new connection is accepted +this.metricsCollector.recordRequest(); +``` + +#### B. SmartProxy Component Access + +Through the SmartProxy instance, MetricsCollector can access: +- `smartProxy.connectionManager` - All active connections and their details +- `smartProxy.routeManager` - Route configurations and metadata +- `smartProxy.settings` - Configuration for thresholds and limits +- `smartProxy.servers` - Server instances and port information +- Any other components as needed for future metrics + +No additional hooks needed! + +### 3. Metric Implementations + +#### A. Active Connections + +```typescript +getActiveConnections(): number { + return this.smartProxy.connectionManager.getConnectionCount(); +} +``` + +#### B. Connections by Route + +```typescript +getConnectionsByRoute(): Map { + const routeCounts = new Map(); + + // Compute from active connections + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + const routeName = record.routeName || 'unknown'; + const current = routeCounts.get(routeName) || 0; + routeCounts.set(routeName, current + 1); + } + + return routeCounts; +} +``` + +#### C. Connections by IP + +```typescript +getConnectionsByIP(): Map { + const ipCounts = new Map(); + + // Compute from active connections + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + const ip = record.remoteIP; + const current = ipCounts.get(ip) || 0; + ipCounts.set(ip, current + 1); + } + + return ipCounts; +} + +// Additional helper methods for IP tracking +getTopIPs(limit: number = 10): Array<{ip: string, connections: number}> { + const ipCounts = this.getConnectionsByIP(); + const sorted = Array.from(ipCounts.entries()) + .sort((a, b) => b[1] - a[1]) + .slice(0, limit) + .map(([ip, connections]) => ({ ip, connections })); + + return sorted; +} + +isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean { + const ipCounts = this.getConnectionsByIP(); + const currentConnections = ipCounts.get(ip) || 0; + return currentConnections >= maxConnectionsPerIP; +} +``` + +#### D. Total Connections + +```typescript +getTotalConnections(): number { + // Get from termination stats + const stats = this.smartProxy.connectionManager.getTerminationStats(); + let total = this.smartProxy.connectionManager.getConnectionCount(); // Add active connections + + // Add all terminated connections + for (const reason in stats.incoming) { + total += stats.incoming[reason]; + } + + return total; +} +``` + +#### E. Requests Per Second + +```typescript +getRequestsPerSecond(): number { + const now = Date.now(); + const windowStart = now - this.RPS_WINDOW_SIZE; + + // Clean old timestamps + this.requestTimestamps = this.requestTimestamps.filter(ts => ts > windowStart); + + // Calculate RPS based on window + const requestsInWindow = this.requestTimestamps.length; + return requestsInWindow / (this.RPS_WINDOW_SIZE / 1000); +} + +recordRequest(): void { + this.requestTimestamps.push(Date.now()); + + // Prevent unbounded growth + if (this.requestTimestamps.length > 10000) { + this.cleanupOldRequests(); + } +} +``` + +#### F. Throughput Tracking + +```typescript +getThroughput(): { bytesIn: number, bytesOut: number } { + let bytesIn = 0; + let bytesOut = 0; + + // Sum bytes from all active connections + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + bytesIn += record.bytesReceived; + bytesOut += record.bytesSent; + } + + return { bytesIn, bytesOut }; +} + +// Get throughput rate (bytes per second) for last minute +getThroughputRate(): { bytesInPerSec: number, bytesOutPerSec: number } { + const now = Date.now(); + let recentBytesIn = 0; + let recentBytesOut = 0; + let connectionCount = 0; + + // Calculate bytes transferred in last minute from active connections + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + const connectionAge = now - record.incomingStartTime; + if (connectionAge < 60000) { // Connection started within last minute + recentBytesIn += record.bytesReceived; + recentBytesOut += record.bytesSent; + connectionCount++; + } else { + // For older connections, estimate rate based on average + const rate = connectionAge / 60000; + recentBytesIn += record.bytesReceived / rate; + recentBytesOut += record.bytesSent / rate; + connectionCount++; + } + } + + return { + bytesInPerSec: Math.round(recentBytesIn / 60), + bytesOutPerSec: Math.round(recentBytesOut / 60) + }; +} +``` + +### 4. Performance Optimizations + +Since metrics are computed on-demand from existing data structures, performance optimizations are minimal: + +#### A. Caching for Frequent Queries + +```typescript +private cachedMetrics: { + timestamp: number; + connectionsByRoute?: Map; + connectionsByIP?: Map; +} = { timestamp: 0 }; + +private readonly CACHE_TTL = 1000; // 1 second cache + +getConnectionsByRoute(): Map { + const now = Date.now(); + + // Return cached value if fresh + if (this.cachedMetrics.connectionsByRoute && + now - this.cachedMetrics.timestamp < this.CACHE_TTL) { + return this.cachedMetrics.connectionsByRoute; + } + + // Compute fresh value + const routeCounts = new Map(); + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + const routeName = record.routeName || 'unknown'; + const current = routeCounts.get(routeName) || 0; + routeCounts.set(routeName, current + 1); + } + + // Cache and return + this.cachedMetrics.connectionsByRoute = routeCounts; + this.cachedMetrics.timestamp = now; + return routeCounts; +} +``` + +#### B. RPS Cleanup + +```typescript +// Only cleanup needed is for RPS timestamps +private cleanupOldRequests(): void { + const cutoff = Date.now() - this.RPS_WINDOW_SIZE; + this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff); +} +``` + +### 5. SmartProxy Integration + +#### A. Add to SmartProxy Class + +```typescript +export class SmartProxy { + private metricsCollector: MetricsCollector; + + constructor(options: ISmartProxyOptions) { + // ... existing code ... + + // Pass SmartProxy instance to MetricsCollector + this.metricsCollector = new MetricsCollector(this); + } + + // Public API + public getStats(): IProxyStats { + return this.metricsCollector; + } +} +``` + +#### B. Configuration Options + +```typescript +interface ISmartProxyOptions { + // ... existing options ... + + metrics?: { + enabled?: boolean; // Default: true + rpsWindowSize?: number; // Default: 60000 (1 minute) + throughputWindowSize?: number; // Default: 60000 (1 minute) + cleanupInterval?: number; // Default: 60000 (1 minute) + }; +} +``` + +### 6. Advanced Metrics (Future Enhancement) + +```typescript +interface IAdvancedProxyStats extends IProxyStats { + // Latency metrics + getAverageLatency(): number; + getLatencyPercentiles(): { p50: number, p95: number, p99: number }; + + // Error metrics + getErrorRate(): number; + getErrorsByType(): Map; + + // Route-specific metrics + getRouteMetrics(routeName: string): IRouteMetrics; + + // Time-series data + getHistoricalMetrics(duration: number): IHistoricalMetrics; + + // Server/Port metrics (leveraging SmartProxy access) + getPortUtilization(): Map; + getCertificateExpiry(): Map; +} + +// Example implementation showing SmartProxy component access +getPortUtilization(): Map { + const portStats = new Map(); + + // Access servers through SmartProxy + for (const [port, server] of this.smartProxy.servers) { + const connections = Array.from(this.smartProxy.connectionManager.getConnections()) + .filter(([_, record]) => record.localPort === port).length; + + // Access route configuration through SmartProxy + const routes = this.smartProxy.routeManager.getRoutesForPort(port); + const maxConnections = routes[0]?.advanced?.maxConnections || + this.smartProxy.settings.defaults?.security?.maxConnections || + 10000; + + portStats.set(port, { connections, maxConnections }); + } + + return portStats; +} +``` + +### 7. HTTP Metrics Endpoint (Optional) + +```typescript +// Expose metrics via HTTP endpoint +class MetricsHttpHandler { + handleRequest(req: IncomingMessage, res: ServerResponse): void { + if (req.url === '/metrics') { + const stats = this.proxy.getStats(); + + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + activeConnections: stats.getActiveConnections(), + totalConnections: stats.getTotalConnections(), + requestsPerSecond: stats.getRequestsPerSecond(), + throughput: stats.getThroughput(), + connectionsByRoute: Object.fromEntries(stats.getConnectionsByRoute()), + connectionsByIP: Object.fromEntries(stats.getConnectionsByIP()), + topIPs: stats.getTopIPs(20) + })); + } + } +} +``` + +### 8. Testing Strategy + +The simplified design makes testing much easier since we can mock the ConnectionManager's data: + +#### A. Unit Tests + +```typescript +// test/test.metrics-collector.ts +tap.test('MetricsCollector computes metrics correctly', async () => { + // Mock ConnectionManager with test data + const mockConnectionManager = { + getConnectionCount: () => 2, + getConnections: () => new Map([ + ['conn1', { remoteIP: '192.168.1.1', routeName: 'api', bytesReceived: 1000, bytesSent: 500 }], + ['conn2', { remoteIP: '192.168.1.1', routeName: 'web', bytesReceived: 2000, bytesSent: 1000 }] + ]), + getTerminationStats: () => ({ incoming: { normal: 10, timeout: 2 } }) + }; + + const collector = new MetricsCollector(mockConnectionManager as any); + + expect(collector.getActiveConnections()).toEqual(2); + expect(collector.getConnectionsByIP().get('192.168.1.1')).toEqual(2); + expect(collector.getTotalConnections()).toEqual(14); // 2 active + 12 terminated +}); +``` + +#### B. Integration Tests + +```typescript +// test/test.metrics-integration.ts +tap.test('SmartProxy provides accurate metrics', async () => { + const proxy = new SmartProxy({ /* config */ }); + await proxy.start(); + + // Create connections and verify metrics + const stats = proxy.getStats(); + expect(stats.getActiveConnections()).toEqual(0); +}); +``` + +#### C. Performance Tests + +```typescript +// test/test.metrics-performance.ts +tap.test('Metrics collection has minimal performance impact', async () => { + // Measure proxy performance with and without metrics + // Ensure overhead is < 1% +}); +``` + +### 9. Implementation Phases + +#### Phase 1: Core Metrics (Days 1-2) +- [ ] Create MetricsCollector class +- [ ] Implement all metric methods (reading from ConnectionManager) +- [ ] Add RPS tracking +- [ ] Add to SmartProxy with getStats() method + +#### Phase 2: Testing & Optimization (Days 3-4) +- [ ] Add comprehensive unit tests with mocked data +- [ ] Add integration tests with real proxy +- [ ] Implement caching for performance +- [ ] Add RPS cleanup mechanism + +#### Phase 3: Advanced Features (Days 5-7) +- [ ] Add HTTP metrics endpoint +- [ ] Implement Prometheus export format +- [ ] Add IP-based rate limiting helpers +- [ ] Create monitoring dashboard example + +**Note**: The simplified design reduces implementation time from 4 weeks to 1 week! + +### 10. Usage Examples + +```typescript +// Basic usage +const proxy = new SmartProxy({ + routes: [...], + metrics: { enabled: true } +}); + +await proxy.start(); + +// Get metrics +const stats = proxy.getStats(); +console.log(`Active connections: ${stats.getActiveConnections()}`); +console.log(`RPS: ${stats.getRequestsPerSecond()}`); +console.log(`Throughput: ${JSON.stringify(stats.getThroughput())}`); + +// Monitor specific routes +const routeConnections = stats.getConnectionsByRoute(); +for (const [route, count] of routeConnections) { + console.log(`Route ${route}: ${count} connections`); +} + +// Monitor connections by IP +const ipConnections = stats.getConnectionsByIP(); +for (const [ip, count] of ipConnections) { + console.log(`IP ${ip}: ${count} connections`); +} + +// Get top IPs by connection count +const topIPs = stats.getTopIPs(10); +console.log('Top 10 IPs:', topIPs); + +// Check if IP should be rate limited +if (stats.isIPBlocked('192.168.1.100', 100)) { + console.log('IP has too many connections'); +} +``` + +### 11. Monitoring Integration + +```typescript +// Export to monitoring systems +class PrometheusExporter { + export(stats: IProxyStats): string { + return ` +# HELP smartproxy_active_connections Current number of active connections +# TYPE smartproxy_active_connections gauge +smartproxy_active_connections ${stats.getActiveConnections()} + +# HELP smartproxy_total_connections Total connections since start +# TYPE smartproxy_total_connections counter +smartproxy_total_connections ${stats.getTotalConnections()} + +# HELP smartproxy_requests_per_second Current requests per second +# TYPE smartproxy_requests_per_second gauge +smartproxy_requests_per_second ${stats.getRequestsPerSecond()} + `; + } +} +``` + +### 12. Documentation + +- Add metrics section to main README +- Create metrics API documentation +- Add monitoring setup guide +- Provide dashboard configuration examples + +## Success Criteria + +1. **Performance**: Metrics collection adds < 1% overhead +2. **Accuracy**: All metrics are accurate within 1% margin +3. **Memory**: No memory leaks over 24-hour operation +4. **Thread Safety**: No race conditions under high load +5. **Usability**: Simple, intuitive API for accessing metrics + +## Privacy and Security Considerations + +### IP Address Tracking + +1. **Privacy Compliance**: + - Consider GDPR and other privacy regulations when storing IP addresses + - Implement configurable IP anonymization (e.g., mask last octet) + - Add option to disable IP tracking entirely + +2. **Security**: + - Use IP metrics for rate limiting and DDoS protection + - Implement automatic blocking for IPs exceeding connection limits + - Consider integration with IP reputation services + +3. **Implementation Options**: +```typescript +interface IMetricsOptions { + trackIPs?: boolean; // Default: true + anonymizeIPs?: boolean; // Default: false + maxConnectionsPerIP?: number; // Default: 100 + ipBlockDuration?: number; // Default: 3600000 (1 hour) +} +``` + +## Future Enhancements + +1. **Distributed Metrics**: Aggregate metrics across multiple proxy instances +2. **Historical Storage**: Store metrics in time-series database +3. **Alerting**: Built-in alerting based on metric thresholds +4. **Custom Metrics**: Allow users to define custom metrics +5. **GraphQL API**: Provide GraphQL endpoint for flexible metric queries +6. **IP Analytics**: + - Geographic distribution of connections + - Automatic anomaly detection for IP patterns + - Integration with threat intelligence feeds + +## Benefits of the Simplified Design + +By using a SmartProxy-centric architecture with on-demand computation: + +1. **Zero Synchronization Issues**: Metrics always reflect the true state +2. **Minimal Memory Overhead**: No duplicate data structures +3. **Simpler Implementation**: ~200 lines instead of ~1000 lines +4. **Easier Testing**: Can mock SmartProxy components +5. **Better Performance**: No overhead from state updates +6. **Guaranteed Accuracy**: Single source of truth +7. **Faster Development**: 1 week instead of 4 weeks +8. **Future Flexibility**: Access to all SmartProxy components without API changes +9. **Holistic Metrics**: Can correlate data across components (connections, routes, settings, certificates, etc.) +10. **Clean Architecture**: MetricsCollector is a true SmartProxy component, not an isolated module + +This approach leverages the existing, well-tested SmartProxy infrastructure while providing a clean, simple metrics API that can grow with the proxy's capabilities. \ No newline at end of file diff --git a/test/test.keepalive-support.node.ts b/test/test.keepalive-support.node.ts new file mode 100644 index 0000000..a4573b1 --- /dev/null +++ b/test/test.keepalive-support.node.ts @@ -0,0 +1,250 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as net from 'net'; +import { SmartProxy } from '../ts/index.js'; +import * as plugins from '../ts/plugins.js'; + +tap.test('keepalive support - verify keepalive connections are properly handled', async (tools) => { + console.log('\n=== KeepAlive Support Test ==='); + console.log('Purpose: Verify that keepalive connections are not prematurely cleaned up'); + + // Create a simple echo backend + const echoBackend = net.createServer((socket) => { + socket.on('data', (data) => { + // Echo back received data + try { + socket.write(data); + } catch (err) { + // Ignore write errors during shutdown + } + }); + + socket.on('error', (err) => { + // Ignore errors from backend sockets + console.log(`Backend socket error (expected during cleanup): ${err.code}`); + }); + }); + + await new Promise((resolve) => { + echoBackend.listen(9998, () => { + console.log('✓ Echo backend started on port 9998'); + resolve(); + }); + }); + + // Test 1: Standard keepalive treatment + console.log('\n--- Test 1: Standard KeepAlive Treatment ---'); + + const proxy1 = new SmartProxy({ + routes: [{ + name: 'keepalive-route', + match: { ports: 8590 }, + action: { + type: 'forward', + target: { host: 'localhost', port: 9998 } + } + }], + keepAlive: true, + keepAliveTreatment: 'standard', + inactivityTimeout: 5000, // 5 seconds for faster testing + enableDetailedLogging: false, + }); + + await proxy1.start(); + console.log('✓ Proxy with standard keepalive started on port 8590'); + + // Create a keepalive connection + const client1 = net.connect(8590, 'localhost'); + + // Add error handler to prevent unhandled errors + client1.on('error', (err) => { + console.log(`Client1 error (expected during cleanup): ${err.code}`); + }); + + await new Promise((resolve) => { + client1.on('connect', () => { + console.log('Client connected'); + client1.setKeepAlive(true, 1000); + resolve(); + }); + }); + + // Send initial data + client1.write('Hello keepalive\n'); + + // Wait for echo + await new Promise((resolve) => { + client1.once('data', (data) => { + console.log(`Received echo: ${data.toString().trim()}`); + resolve(); + }); + }); + + // Check connection is marked as keepalive + const cm1 = (proxy1 as any).connectionManager; + const connections1 = cm1.getConnections(); + let keepAliveCount = 0; + + for (const [id, record] of connections1) { + if (record.hasKeepAlive) { + keepAliveCount++; + console.log(`KeepAlive connection ${id}: hasKeepAlive=${record.hasKeepAlive}`); + } + } + + expect(keepAliveCount).toEqual(1); + + // Wait to ensure it's not cleaned up prematurely + await plugins.smartdelay.delayFor(6000); + + const afterWaitCount1 = cm1.getConnectionCount(); + console.log(`Connections after 6s wait: ${afterWaitCount1}`); + expect(afterWaitCount1).toEqual(1); // Should still be connected + + // Send more data to keep it alive + client1.write('Still alive\n'); + + // Clean up test 1 + client1.destroy(); + await proxy1.stop(); + await plugins.smartdelay.delayFor(500); // Wait for port to be released + + // Test 2: Extended keepalive treatment + console.log('\n--- Test 2: Extended KeepAlive Treatment ---'); + + const proxy2 = new SmartProxy({ + routes: [{ + name: 'keepalive-extended', + match: { ports: 8591 }, + action: { + type: 'forward', + target: { host: 'localhost', port: 9998 } + } + }], + keepAlive: true, + keepAliveTreatment: 'extended', + keepAliveInactivityMultiplier: 6, + inactivityTimeout: 2000, // 2 seconds base, 12 seconds with multiplier + enableDetailedLogging: false, + }); + + await proxy2.start(); + console.log('✓ Proxy with extended keepalive started on port 8591'); + + const client2 = net.connect(8591, 'localhost'); + + // Add error handler to prevent unhandled errors + client2.on('error', (err) => { + console.log(`Client2 error (expected during cleanup): ${err.code}`); + }); + + await new Promise((resolve) => { + client2.on('connect', () => { + console.log('Client connected with extended timeout'); + client2.setKeepAlive(true, 1000); + resolve(); + }); + }); + + // Send initial data + client2.write('Extended keepalive\n'); + + // Check connection + const cm2 = (proxy2 as any).connectionManager; + await plugins.smartdelay.delayFor(1000); + + const connections2 = cm2.getConnections(); + for (const [id, record] of connections2) { + console.log(`Extended connection ${id}: hasKeepAlive=${record.hasKeepAlive}, treatment=extended`); + } + + // Wait 3 seconds (would timeout with standard treatment) + await plugins.smartdelay.delayFor(3000); + + const midWaitCount = cm2.getConnectionCount(); + console.log(`Connections after 3s (base timeout exceeded): ${midWaitCount}`); + expect(midWaitCount).toEqual(1); // Should still be connected due to extended treatment + + // Clean up test 2 + client2.destroy(); + await proxy2.stop(); + await plugins.smartdelay.delayFor(500); // Wait for port to be released + + // Test 3: Immortal keepalive treatment + console.log('\n--- Test 3: Immortal KeepAlive Treatment ---'); + + const proxy3 = new SmartProxy({ + routes: [{ + name: 'keepalive-immortal', + match: { ports: 8592 }, + action: { + type: 'forward', + target: { host: 'localhost', port: 9998 } + } + }], + keepAlive: true, + keepAliveTreatment: 'immortal', + inactivityTimeout: 1000, // 1 second - should be ignored for immortal + enableDetailedLogging: false, + }); + + await proxy3.start(); + console.log('✓ Proxy with immortal keepalive started on port 8592'); + + const client3 = net.connect(8592, 'localhost'); + + // Add error handler to prevent unhandled errors + client3.on('error', (err) => { + console.log(`Client3 error (expected during cleanup): ${err.code}`); + }); + + await new Promise((resolve) => { + client3.on('connect', () => { + console.log('Client connected with immortal treatment'); + client3.setKeepAlive(true, 1000); + resolve(); + }); + }); + + // Send initial data + client3.write('Immortal connection\n'); + + // Wait well beyond normal timeout + await plugins.smartdelay.delayFor(5000); + + const cm3 = (proxy3 as any).connectionManager; + const immortalCount = cm3.getConnectionCount(); + console.log(`Immortal connections after 5s inactivity: ${immortalCount}`); + expect(immortalCount).toEqual(1); // Should never timeout + + // Verify zombie detection doesn't affect immortal connections + console.log('\n--- Verifying zombie detection respects keepalive ---'); + + // Manually trigger inactivity check + cm3.performOptimizedInactivityCheck(); + + await plugins.smartdelay.delayFor(1000); + + const afterCheckCount = cm3.getConnectionCount(); + console.log(`Connections after manual inactivity check: ${afterCheckCount}`); + expect(afterCheckCount).toEqual(1); // Should still be alive + + // Clean up + client3.destroy(); + await proxy3.stop(); + + // Close backend and wait for it to fully close + await new Promise((resolve) => { + echoBackend.close(() => { + console.log('Echo backend closed'); + resolve(); + }); + }); + + console.log('\n✓ All keepalive tests passed:'); + console.log(' - Standard treatment works correctly'); + console.log(' - Extended treatment applies multiplier'); + console.log(' - Immortal treatment never times out'); + console.log(' - Zombie detection respects keepalive settings'); +}); + +tap.start(); \ No newline at end of file diff --git a/test/test.metrics-collector.ts b/test/test.metrics-collector.ts new file mode 100644 index 0000000..a861fcb --- /dev/null +++ b/test/test.metrics-collector.ts @@ -0,0 +1,280 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import { SmartProxy } from '../ts/index.js'; +import * as net from 'net'; +import * as plugins from '../ts/plugins.js'; + +tap.test('MetricsCollector provides accurate metrics', async (tools) => { + console.log('\n=== MetricsCollector Test ==='); + + // Create a simple echo server for testing + const echoServer = net.createServer((socket) => { + socket.on('data', (data) => { + socket.write(data); + }); + socket.on('error', () => {}); // Ignore errors + }); + + await new Promise((resolve) => { + echoServer.listen(9995, () => { + console.log('✓ Echo server started on port 9995'); + resolve(); + }); + }); + + // Create SmartProxy with test routes + const proxy = new SmartProxy({ + routes: [ + { + name: 'test-route-1', + match: { ports: 8700 }, + action: { + type: 'forward', + target: { host: 'localhost', port: 9995 } + } + }, + { + name: 'test-route-2', + match: { ports: 8701 }, + action: { + type: 'forward', + target: { host: 'localhost', port: 9995 } + } + } + ], + enableDetailedLogging: true, + }); + + await proxy.start(); + console.log('✓ Proxy started on ports 8700 and 8701'); + + // Get stats interface + const stats = proxy.getStats(); + + // Test 1: Initial state + console.log('\n--- Test 1: Initial State ---'); + expect(stats.getActiveConnections()).toEqual(0); + expect(stats.getTotalConnections()).toEqual(0); + expect(stats.getRequestsPerSecond()).toEqual(0); + expect(stats.getConnectionsByRoute().size).toEqual(0); + expect(stats.getConnectionsByIP().size).toEqual(0); + + const throughput = stats.getThroughput(); + expect(throughput.bytesIn).toEqual(0); + expect(throughput.bytesOut).toEqual(0); + console.log('✓ Initial metrics are all zero'); + + // Test 2: Create connections and verify metrics + console.log('\n--- Test 2: Active Connections ---'); + const clients: net.Socket[] = []; + + // Create 3 connections to route 1 + for (let i = 0; i < 3; i++) { + const client = net.connect(8700, 'localhost'); + clients.push(client); + await new Promise((resolve) => { + client.on('connect', resolve); + client.on('error', () => resolve()); + }); + } + + // Create 2 connections to route 2 + for (let i = 0; i < 2; i++) { + const client = net.connect(8701, 'localhost'); + clients.push(client); + await new Promise((resolve) => { + client.on('connect', resolve); + client.on('error', () => resolve()); + }); + } + + // Wait for connections to be fully established and routed + await plugins.smartdelay.delayFor(300); + + // Verify connection counts + expect(stats.getActiveConnections()).toEqual(5); + expect(stats.getTotalConnections()).toEqual(5); + console.log(`✓ Active connections: ${stats.getActiveConnections()}`); + console.log(`✓ Total connections: ${stats.getTotalConnections()}`); + + // Test 3: Connections by route + console.log('\n--- Test 3: Connections by Route ---'); + const routeConnections = stats.getConnectionsByRoute(); + console.log('Route connections:', Array.from(routeConnections.entries())); + + // Check if we have the expected counts + let route1Count = 0; + let route2Count = 0; + for (const [routeName, count] of routeConnections) { + if (routeName === 'test-route-1') route1Count = count; + if (routeName === 'test-route-2') route2Count = count; + } + + expect(route1Count).toEqual(3); + expect(route2Count).toEqual(2); + console.log('✓ Route test-route-1 has 3 connections'); + console.log('✓ Route test-route-2 has 2 connections'); + + // Test 4: Connections by IP + console.log('\n--- Test 4: Connections by IP ---'); + const ipConnections = stats.getConnectionsByIP(); + // All connections are from localhost (127.0.0.1 or ::1) + let totalIPConnections = 0; + for (const [ip, count] of ipConnections) { + console.log(` IP ${ip}: ${count} connections`); + totalIPConnections += count; + } + expect(totalIPConnections).toEqual(5); + console.log('✓ Total connections by IP matches active connections'); + + // Test 5: RPS calculation + console.log('\n--- Test 5: Requests Per Second ---'); + const rps = stats.getRequestsPerSecond(); + console.log(` Current RPS: ${rps.toFixed(2)}`); + // We created 5 connections, so RPS should be > 0 + expect(rps).toBeGreaterThan(0); + console.log('✓ RPS is greater than 0'); + + // Test 6: Throughput + console.log('\n--- Test 6: Throughput ---'); + // Send some data through connections + for (const client of clients) { + if (!client.destroyed) { + client.write('Hello metrics!\n'); + } + } + + // Wait for data to be transmitted + await plugins.smartdelay.delayFor(100); + + const throughputAfter = stats.getThroughput(); + console.log(` Bytes in: ${throughputAfter.bytesIn}`); + console.log(` Bytes out: ${throughputAfter.bytesOut}`); + expect(throughputAfter.bytesIn).toBeGreaterThan(0); + expect(throughputAfter.bytesOut).toBeGreaterThan(0); + console.log('✓ Throughput shows bytes transferred'); + + // Test 7: Close some connections + console.log('\n--- Test 7: Connection Cleanup ---'); + // Close first 2 clients + clients[0].destroy(); + clients[1].destroy(); + + await plugins.smartdelay.delayFor(100); + + expect(stats.getActiveConnections()).toEqual(3); + expect(stats.getTotalConnections()).toEqual(5); // Total should remain the same + console.log(`✓ Active connections reduced to ${stats.getActiveConnections()}`); + console.log(`✓ Total connections still ${stats.getTotalConnections()}`); + + // Test 8: Helper methods + console.log('\n--- Test 8: Helper Methods ---'); + + // Test getTopIPs + const topIPs = (stats as any).getTopIPs(5); + expect(topIPs.length).toBeGreaterThan(0); + console.log('✓ getTopIPs returns IP list'); + + // Test isIPBlocked + const isBlocked = (stats as any).isIPBlocked('127.0.0.1', 10); + expect(isBlocked).toEqual(false); // Should not be blocked with limit of 10 + console.log('✓ isIPBlocked works correctly'); + + // Test throughput rate + const throughputRate = (stats as any).getThroughputRate(); + console.log(` Throughput rate: ${throughputRate.bytesInPerSec} bytes/sec in, ${throughputRate.bytesOutPerSec} bytes/sec out`); + console.log('✓ getThroughputRate calculates rates'); + + // Cleanup + console.log('\n--- Cleanup ---'); + for (const client of clients) { + if (!client.destroyed) { + client.destroy(); + } + } + + await proxy.stop(); + echoServer.close(); + + console.log('\n✓ All MetricsCollector tests passed'); +}); + +// Test with mock data for unit testing +tap.test('MetricsCollector unit test with mock data', async () => { + console.log('\n=== MetricsCollector Unit Test ==='); + + // Create a mock SmartProxy with mock ConnectionManager + const mockConnections = new Map([ + ['conn1', { + remoteIP: '192.168.1.1', + routeName: 'api', + bytesReceived: 1000, + bytesSent: 500, + incomingStartTime: Date.now() - 5000 + }], + ['conn2', { + remoteIP: '192.168.1.1', + routeName: 'web', + bytesReceived: 2000, + bytesSent: 1500, + incomingStartTime: Date.now() - 10000 + }], + ['conn3', { + remoteIP: '192.168.1.2', + routeName: 'api', + bytesReceived: 500, + bytesSent: 250, + incomingStartTime: Date.now() - 3000 + }] + ]); + + const mockSmartProxy = { + connectionManager: { + getConnectionCount: () => mockConnections.size, + getConnections: () => mockConnections, + getTerminationStats: () => ({ + incoming: { normal: 10, timeout: 2, error: 1 } + }) + } + }; + + // Import MetricsCollector directly + const { MetricsCollector } = await import('../ts/proxies/smart-proxy/metrics-collector.js'); + const metrics = new MetricsCollector(mockSmartProxy as any); + + // Test metrics calculation + console.log('\n--- Testing with Mock Data ---'); + + expect(metrics.getActiveConnections()).toEqual(3); + console.log(`✓ Active connections: ${metrics.getActiveConnections()}`); + + expect(metrics.getTotalConnections()).toEqual(16); // 3 active + 13 terminated + console.log(`✓ Total connections: ${metrics.getTotalConnections()}`); + + const routeConns = metrics.getConnectionsByRoute(); + expect(routeConns.get('api')).toEqual(2); + expect(routeConns.get('web')).toEqual(1); + console.log('✓ Connections by route calculated correctly'); + + const ipConns = metrics.getConnectionsByIP(); + expect(ipConns.get('192.168.1.1')).toEqual(2); + expect(ipConns.get('192.168.1.2')).toEqual(1); + console.log('✓ Connections by IP calculated correctly'); + + const throughput = metrics.getThroughput(); + expect(throughput.bytesIn).toEqual(3500); + expect(throughput.bytesOut).toEqual(2250); + console.log(`✓ Throughput: ${throughput.bytesIn} bytes in, ${throughput.bytesOut} bytes out`); + + // Test RPS tracking + metrics.recordRequest(); + metrics.recordRequest(); + metrics.recordRequest(); + + const rps = metrics.getRequestsPerSecond(); + expect(rps).toBeGreaterThan(0); + console.log(`✓ RPS tracking works: ${rps.toFixed(2)} req/sec`); + + console.log('\n✓ All unit tests passed'); +}); + +export default tap.start(); \ No newline at end of file diff --git a/ts/plugins.ts b/ts/plugins.ts index 815455b..b492488 100644 --- a/ts/plugins.ts +++ b/ts/plugins.ts @@ -30,6 +30,7 @@ import * as smartacmeHandlers from '@push.rocks/smartacme/dist_ts/handlers/index import * as smartlog from '@push.rocks/smartlog'; import * as smartlogDestinationLocal from '@push.rocks/smartlog/destination-local'; import * as taskbuffer from '@push.rocks/taskbuffer'; +import * as smartrx from '@push.rocks/smartrx'; export { lik, @@ -45,6 +46,7 @@ export { smartlog, smartlogDestinationLocal, taskbuffer, + smartrx, }; // third party scope diff --git a/ts/proxies/smart-proxy/metrics-collector.ts b/ts/proxies/smart-proxy/metrics-collector.ts new file mode 100644 index 0000000..38baa07 --- /dev/null +++ b/ts/proxies/smart-proxy/metrics-collector.ts @@ -0,0 +1,285 @@ +import * as plugins from '../../plugins.js'; +import type { SmartProxy } from './smart-proxy.js'; +import type { IProxyStats, IProxyStatsExtended } from './models/metrics-types.js'; +import { logger } from '../../core/utils/logger.js'; + +/** + * Collects and computes metrics for SmartProxy on-demand + */ +export class MetricsCollector implements IProxyStatsExtended { + // RPS tracking (the only state we need to maintain) + private requestTimestamps: number[] = []; + private readonly RPS_WINDOW_SIZE = 60000; // 1 minute window + + // Optional caching for performance + private cachedMetrics: { + timestamp: number; + connectionsByRoute?: Map; + connectionsByIP?: Map; + } = { timestamp: 0 }; + + private readonly CACHE_TTL = 1000; // 1 second cache + + // RxJS subscription for connection events + private connectionSubscription?: plugins.smartrx.rxjs.Subscription; + + constructor( + private smartProxy: SmartProxy + ) { + // Subscription will be set up in start() method + } + + /** + * Get the current number of active connections + */ + public getActiveConnections(): number { + return this.smartProxy.connectionManager.getConnectionCount(); + } + + /** + * Get connection counts grouped by route name + */ + public getConnectionsByRoute(): Map { + const now = Date.now(); + + // Return cached value if fresh + if (this.cachedMetrics.connectionsByRoute && + now - this.cachedMetrics.timestamp < this.CACHE_TTL) { + return new Map(this.cachedMetrics.connectionsByRoute); + } + + // Compute fresh value + const routeCounts = new Map(); + const connections = this.smartProxy.connectionManager.getConnections(); + + if (this.smartProxy.settings?.enableDetailedLogging) { + logger.log('debug', `MetricsCollector: Computing route connections`, { + totalConnections: connections.size, + component: 'metrics' + }); + } + + for (const [_, record] of connections) { + // Try different ways to get the route name + const routeName = (record as any).routeName || + record.routeConfig?.name || + (record.routeConfig as any)?.routeName || + 'unknown'; + + if (this.smartProxy.settings?.enableDetailedLogging) { + logger.log('debug', `MetricsCollector: Connection route info`, { + connectionId: record.id, + routeName, + hasRouteConfig: !!record.routeConfig, + routeConfigName: record.routeConfig?.name, + routeConfigKeys: record.routeConfig ? Object.keys(record.routeConfig) : [], + component: 'metrics' + }); + } + + const current = routeCounts.get(routeName) || 0; + routeCounts.set(routeName, current + 1); + } + + // Cache and return + this.cachedMetrics.connectionsByRoute = routeCounts; + this.cachedMetrics.timestamp = now; + return new Map(routeCounts); + } + + /** + * Get connection counts grouped by IP address + */ + public getConnectionsByIP(): Map { + const now = Date.now(); + + // Return cached value if fresh + if (this.cachedMetrics.connectionsByIP && + now - this.cachedMetrics.timestamp < this.CACHE_TTL) { + return new Map(this.cachedMetrics.connectionsByIP); + } + + // Compute fresh value + const ipCounts = new Map(); + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + const ip = record.remoteIP; + const current = ipCounts.get(ip) || 0; + ipCounts.set(ip, current + 1); + } + + // Cache and return + this.cachedMetrics.connectionsByIP = ipCounts; + this.cachedMetrics.timestamp = now; + return new Map(ipCounts); + } + + /** + * Get the total number of connections since proxy start + */ + public getTotalConnections(): number { + // Get from termination stats + const stats = this.smartProxy.connectionManager.getTerminationStats(); + let total = this.smartProxy.connectionManager.getConnectionCount(); // Add active connections + + // Add all terminated connections + for (const reason in stats.incoming) { + total += stats.incoming[reason]; + } + + return total; + } + + /** + * Get the current requests per second rate + */ + public getRequestsPerSecond(): number { + const now = Date.now(); + const windowStart = now - this.RPS_WINDOW_SIZE; + + // Clean old timestamps + this.requestTimestamps = this.requestTimestamps.filter(ts => ts > windowStart); + + // Calculate RPS based on window + const requestsInWindow = this.requestTimestamps.length; + return requestsInWindow / (this.RPS_WINDOW_SIZE / 1000); + } + + /** + * Record a new request for RPS tracking + */ + public recordRequest(): void { + this.requestTimestamps.push(Date.now()); + + // Prevent unbounded growth + if (this.requestTimestamps.length > 10000) { + this.cleanupOldRequests(); + } + } + + /** + * Get total throughput (bytes transferred) + */ + public getThroughput(): { bytesIn: number; bytesOut: number } { + let bytesIn = 0; + let bytesOut = 0; + + // Sum bytes from all active connections + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + bytesIn += record.bytesReceived; + bytesOut += record.bytesSent; + } + + return { bytesIn, bytesOut }; + } + + /** + * Get throughput rate (bytes per second) for last minute + */ + public getThroughputRate(): { bytesInPerSec: number; bytesOutPerSec: number } { + const now = Date.now(); + let recentBytesIn = 0; + let recentBytesOut = 0; + + // Calculate bytes transferred in last minute from active connections + for (const [_, record] of this.smartProxy.connectionManager.getConnections()) { + const connectionAge = now - record.incomingStartTime; + if (connectionAge < 60000) { // Connection started within last minute + recentBytesIn += record.bytesReceived; + recentBytesOut += record.bytesSent; + } else { + // For older connections, estimate rate based on average + const rate = connectionAge / 60000; + recentBytesIn += record.bytesReceived / rate; + recentBytesOut += record.bytesSent / rate; + } + } + + return { + bytesInPerSec: Math.round(recentBytesIn / 60), + bytesOutPerSec: Math.round(recentBytesOut / 60) + }; + } + + /** + * Get top IPs by connection count + */ + public getTopIPs(limit: number = 10): Array<{ ip: string; connections: number }> { + const ipCounts = this.getConnectionsByIP(); + const sorted = Array.from(ipCounts.entries()) + .sort((a, b) => b[1] - a[1]) + .slice(0, limit) + .map(([ip, connections]) => ({ ip, connections })); + + return sorted; + } + + /** + * Check if an IP has reached the connection limit + */ + public isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean { + const ipCounts = this.getConnectionsByIP(); + const currentConnections = ipCounts.get(ip) || 0; + return currentConnections >= maxConnectionsPerIP; + } + + /** + * Clean up old request timestamps + */ + private cleanupOldRequests(): void { + const cutoff = Date.now() - this.RPS_WINDOW_SIZE; + this.requestTimestamps = this.requestTimestamps.filter(ts => ts > cutoff); + } + + /** + * Start the metrics collector and set up subscriptions + */ + public start(): void { + if (!this.smartProxy.routeConnectionHandler) { + throw new Error('MetricsCollector: RouteConnectionHandler not available'); + } + + // Subscribe to the newConnectionSubject from RouteConnectionHandler + this.connectionSubscription = this.smartProxy.routeConnectionHandler.newConnectionSubject.subscribe({ + next: (record) => { + this.recordRequest(); + + // Optional: Log connection details + if (this.smartProxy.settings?.enableDetailedLogging) { + logger.log('debug', `MetricsCollector: New connection recorded`, { + connectionId: record.id, + remoteIP: record.remoteIP, + routeName: record.routeConfig?.name || 'unknown', + component: 'metrics' + }); + } + }, + error: (err) => { + logger.log('error', `MetricsCollector: Error in connection subscription`, { + error: err.message, + component: 'metrics' + }); + } + }); + + logger.log('debug', 'MetricsCollector started', { component: 'metrics' }); + } + + /** + * Stop the metrics collector and clean up resources + */ + public stop(): void { + if (this.connectionSubscription) { + this.connectionSubscription.unsubscribe(); + this.connectionSubscription = undefined; + } + + logger.log('debug', 'MetricsCollector stopped', { component: 'metrics' }); + } + + /** + * Alias for stop() for backward compatibility + */ + public destroy(): void { + this.stop(); + } +} \ No newline at end of file diff --git a/ts/proxies/smart-proxy/models/index.ts b/ts/proxies/smart-proxy/models/index.ts index 4e31417..b62fcf1 100644 --- a/ts/proxies/smart-proxy/models/index.ts +++ b/ts/proxies/smart-proxy/models/index.ts @@ -4,3 +4,4 @@ // Export everything except IAcmeOptions from interfaces export type { ISmartProxyOptions, IConnectionRecord, TSmartProxyCertProvisionObject } from './interfaces.js'; export * from './route-types.js'; +export * from './metrics-types.js'; diff --git a/ts/proxies/smart-proxy/models/metrics-types.ts b/ts/proxies/smart-proxy/models/metrics-types.ts new file mode 100644 index 0000000..6dc2cb0 --- /dev/null +++ b/ts/proxies/smart-proxy/models/metrics-types.ts @@ -0,0 +1,54 @@ +/** + * Interface for proxy statistics and metrics + */ +export interface IProxyStats { + /** + * Get the current number of active connections + */ + getActiveConnections(): number; + + /** + * Get connection counts grouped by route name + */ + getConnectionsByRoute(): Map; + + /** + * Get connection counts grouped by IP address + */ + getConnectionsByIP(): Map; + + /** + * Get the total number of connections since proxy start + */ + getTotalConnections(): number; + + /** + * Get the current requests per second rate + */ + getRequestsPerSecond(): number; + + /** + * Get total throughput (bytes transferred) + */ + getThroughput(): { bytesIn: number; bytesOut: number }; +} + +/** + * Extended interface for additional metrics helpers + */ +export interface IProxyStatsExtended extends IProxyStats { + /** + * Get throughput rate (bytes per second) for last minute + */ + getThroughputRate(): { bytesInPerSec: number; bytesOutPerSec: number }; + + /** + * Get top IPs by connection count + */ + getTopIPs(limit?: number): Array<{ ip: string; connections: number }>; + + /** + * Check if an IP has reached the connection limit + */ + isIPBlocked(ip: string, maxConnectionsPerIP: number): boolean; +} \ No newline at end of file diff --git a/ts/proxies/smart-proxy/route-connection-handler.ts b/ts/proxies/smart-proxy/route-connection-handler.ts index bd1b67d..913840e 100644 --- a/ts/proxies/smart-proxy/route-connection-handler.ts +++ b/ts/proxies/smart-proxy/route-connection-handler.ts @@ -23,6 +23,9 @@ export class RouteConnectionHandler { // Cache for route contexts to avoid recreation private routeContextCache: Map = new Map(); + + // RxJS Subject for new connections + public newConnectionSubject = new plugins.smartrx.rxjs.Subject(); constructor( settings: ISmartProxyOptions, @@ -35,6 +38,7 @@ export class RouteConnectionHandler { ) { this.settings = settings; } + /** * Create a route context object for port and host mapping functions @@ -110,6 +114,9 @@ export class RouteConnectionHandler { // Connection was rejected due to limit - socket already destroyed by connection manager return; } + + // Emit new connection event + this.newConnectionSubject.next(record); const connectionId = record.id; // Apply socket optimizations (apply to underlying socket) @@ -640,6 +647,9 @@ export class RouteConnectionHandler { ): void { const connectionId = record.id; const action = route.action as IRouteAction; + + // Store the route config in the connection record for metrics and other uses + record.routeConfig = route; // Check if this route uses NFTables for forwarding if (action.forwardingEngine === 'nftables') { @@ -957,6 +967,9 @@ export class RouteConnectionHandler { ): Promise { const connectionId = record.id; + // Store the route config in the connection record for metrics and other uses + record.routeConfig = route; + if (!route.action.socketHandler) { logger.log('error', 'socket-handler action missing socketHandler function', { connectionId, diff --git a/ts/proxies/smart-proxy/smart-proxy.ts b/ts/proxies/smart-proxy/smart-proxy.ts index ee32066..94c9f84 100644 --- a/ts/proxies/smart-proxy/smart-proxy.ts +++ b/ts/proxies/smart-proxy/smart-proxy.ts @@ -27,6 +27,10 @@ import { Mutex } from './utils/mutex.js'; // Import ACME state manager import { AcmeStateManager } from './acme-state-manager.js'; +// Import metrics collector +import { MetricsCollector } from './metrics-collector.js'; +import type { IProxyStats } from './models/metrics-types.js'; + /** * SmartProxy - Pure route-based API * @@ -47,13 +51,13 @@ export class SmartProxy extends plugins.EventEmitter { private isShuttingDown: boolean = false; // Component managers - private connectionManager: ConnectionManager; + public connectionManager: ConnectionManager; private securityManager: SecurityManager; private tlsManager: TlsManager; private httpProxyBridge: HttpProxyBridge; private timeoutManager: TimeoutManager; public routeManager: RouteManager; // Made public for route management - private routeConnectionHandler: RouteConnectionHandler; + public routeConnectionHandler: RouteConnectionHandler; // Made public for metrics private nftablesManager: NFTablesManager; // Certificate manager for ACME and static certificates @@ -64,6 +68,9 @@ export class SmartProxy extends plugins.EventEmitter { private routeUpdateLock: any = null; // Will be initialized as AsyncMutex private acmeStateManager: AcmeStateManager; + // Metrics collector + private metricsCollector: MetricsCollector; + // Track port usage across route updates private portUsageMap: Map> = new Map(); @@ -204,6 +211,9 @@ export class SmartProxy extends plugins.EventEmitter { // Initialize ACME state manager this.acmeStateManager = new AcmeStateManager(); + + // Initialize metrics collector with reference to this SmartProxy instance + this.metricsCollector = new MetricsCollector(this); } /** @@ -383,6 +393,9 @@ export class SmartProxy extends plugins.EventEmitter { logger.log('info', 'Starting certificate provisioning now that ports are ready', { component: 'certificate-manager' }); await this.certManager.provisionAllCertificates(); } + + // Start the metrics collector now that all components are initialized + this.metricsCollector.start(); // Set up periodic connection logging and inactivity checks this.connectionLogger = setInterval(() => { @@ -508,6 +521,9 @@ export class SmartProxy extends plugins.EventEmitter { // Clear ACME state manager this.acmeStateManager.clear(); + + // Stop metrics collector + this.metricsCollector.stop(); logger.log('info', 'SmartProxy shutdown complete.'); } @@ -905,6 +921,15 @@ export class SmartProxy extends plugins.EventEmitter { return this.certManager.getCertificateStatus(routeName); } + /** + * Get proxy statistics and metrics + * + * @returns IProxyStats interface with various metrics methods + */ + public getStats(): IProxyStats { + return this.metricsCollector; + } + /** * Validates if a domain name is valid for certificate issuance */