From 829ae0d6a34aa1b413eabf5ce593d7546f6c101d Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Sat, 31 May 2025 18:01:09 +0000 Subject: [PATCH] fix(refactor): remove deprecated Port80Handler and related utilities - Deleted event-utils.ts which contained deprecated Port80Handler and its subscribers. - Updated index.ts to remove the export of event-utils. - Refactored ConnectionManager to extend LifecycleComponent for better resource management. - Added BinaryHeap implementation for efficient priority queue operations. - Introduced EnhancedConnectionPool for managing pooled connections with lifecycle management. - Implemented LifecycleComponent to manage timers and event listeners automatically. - Added comprehensive tests for BinaryHeap and LifecycleComponent to ensure functionality. --- readme.hints.md | 13 +- readme.plan.md | 11 +- test/core/utils/test.binary-heap.ts | 206 +++++++++ test/core/utils/test.lifecycle-component.ts | 252 +++++++++++ ts/common/eventUtils.ts | 34 -- ts/common/types.ts | 91 ---- ts/core/utils/binary-heap.ts | 225 ++++++++++ ts/core/utils/enhanced-connection-pool.ts | 420 +++++++++++++++++++ ts/core/utils/event-system.ts | 376 ----------------- ts/core/utils/event-utils.ts | 25 -- ts/core/utils/index.ts | 5 +- ts/core/utils/lifecycle-component.ts | 213 ++++++++++ ts/proxies/smart-proxy/connection-manager.ts | 42 +- 13 files changed, 1354 insertions(+), 559 deletions(-) create mode 100644 test/core/utils/test.binary-heap.ts create mode 100644 test/core/utils/test.lifecycle-component.ts delete mode 100644 ts/common/eventUtils.ts delete mode 100644 ts/common/types.ts create mode 100644 ts/core/utils/binary-heap.ts create mode 100644 ts/core/utils/enhanced-connection-pool.ts delete mode 100644 ts/core/utils/event-system.ts delete mode 100644 ts/core/utils/event-utils.ts create mode 100644 ts/core/utils/lifecycle-component.ts diff --git a/readme.hints.md b/readme.hints.md index 3b22728..55bf36d 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -401,7 +401,16 @@ const routes: IRouteConfig[] = [{ - ✅ Migrated nftables-proxy filesystem operations to async (except stopSync for exit handlers) - ✅ All tests passing for new utilities -### 6. Next Steps (Remaining Phases) -- **Phase 2**: Implement LifecycleComponent for resource management +### 6. Phase 2 Progress Status +🔨 **Phase 2 IN PROGRESS** - Resource Lifecycle Management: +- ✅ Created LifecycleComponent base class for automatic resource cleanup +- ✅ Created BinaryHeap data structure for priority queue operations +- ✅ Created EnhancedConnectionPool with backpressure and health checks +- ✅ Cleaned up legacy code (removed ts/common/, event-utils.ts, event-system.ts) +- 📋 TODO: Migrate existing components to extend LifecycleComponent +- 📋 TODO: Add integration tests for resource management + +### 7. Next Steps (Remaining Work) +- **Phase 2 (cont)**: Migrate components to use LifecycleComponent - **Phase 3**: Add worker threads for CPU-intensive operations - **Phase 4**: Performance monitoring dashboard \ No newline at end of file diff --git a/readme.plan.md b/readme.plan.md index 805d49f..d79799f 100644 --- a/readme.plan.md +++ b/readme.plan.md @@ -186,7 +186,7 @@ Phase 2.2 (Connection Pool) ───────────────── - **Phase 3** optimizations depend on stable async foundation - **Phase 4** monitoring requires all components to be instrumented -## Phase 2: Resource Management (Week 2) +## Phase 2: Resource Management (Week 2) 🔨 IN PROGRESS ### 2.1 Timer Lifecycle Management @@ -1178,13 +1178,14 @@ export const PerformanceFlags = { - [ ] Performance test to verify event loop improvements ### Phase 2: Resource Management -- [ ] Implement LifecycleComponent base class +- [x] Implement LifecycleComponent base class - [ ] Migrate components to extend LifecycleComponent -- [ ] Implement BinaryHeap data structure -- [ ] Create EnhancedConnectionPool with queue support -- [ ] Add connection validation and health checks +- [x] Implement BinaryHeap data structure +- [x] Create EnhancedConnectionPool with queue support +- [x] Add connection validation and health checks - [ ] Implement proper timer cleanup in all components - [ ] Add integration tests for resource management +- [x] Clean up legacy code (removed ts/common/, event-utils.ts, event-system.ts) ### Phase 3: Performance Optimizations - [ ] Implement HashUtils for efficient object hashing diff --git a/test/core/utils/test.binary-heap.ts b/test/core/utils/test.binary-heap.ts new file mode 100644 index 0000000..bfd78d5 --- /dev/null +++ b/test/core/utils/test.binary-heap.ts @@ -0,0 +1,206 @@ +import { tap, expect } from '@git.zone/tstest/tapbundle'; +import { BinaryHeap } from '../../../ts/core/utils/binary-heap.js'; + +interface TestItem { + id: string; + priority: number; + value: string; +} + +tap.test('should create empty heap', async () => { + const heap = new BinaryHeap((a, b) => a - b); + + expect(heap.size).toEqual(0); + expect(heap.isEmpty()).toBeTrue(); + expect(heap.peek()).toBeUndefined(); +}); + +tap.test('should insert and extract in correct order', async () => { + const heap = new BinaryHeap((a, b) => a - b); + + heap.insert(5); + heap.insert(3); + heap.insert(7); + heap.insert(1); + heap.insert(9); + heap.insert(4); + + expect(heap.size).toEqual(6); + + // Extract in ascending order + expect(heap.extract()).toEqual(1); + expect(heap.extract()).toEqual(3); + expect(heap.extract()).toEqual(4); + expect(heap.extract()).toEqual(5); + expect(heap.extract()).toEqual(7); + expect(heap.extract()).toEqual(9); + expect(heap.extract()).toBeUndefined(); +}); + +tap.test('should work with custom objects and comparator', async () => { + const heap = new BinaryHeap( + (a, b) => a.priority - b.priority, + (item) => item.id + ); + + heap.insert({ id: 'a', priority: 5, value: 'five' }); + heap.insert({ id: 'b', priority: 2, value: 'two' }); + heap.insert({ id: 'c', priority: 8, value: 'eight' }); + heap.insert({ id: 'd', priority: 1, value: 'one' }); + + const first = heap.extract(); + expect(first?.priority).toEqual(1); + expect(first?.value).toEqual('one'); + + const second = heap.extract(); + expect(second?.priority).toEqual(2); + expect(second?.value).toEqual('two'); +}); + +tap.test('should support reverse order (max heap)', async () => { + const heap = new BinaryHeap((a, b) => b - a); + + heap.insert(5); + heap.insert(3); + heap.insert(7); + heap.insert(1); + heap.insert(9); + + // Extract in descending order + expect(heap.extract()).toEqual(9); + expect(heap.extract()).toEqual(7); + expect(heap.extract()).toEqual(5); +}); + +tap.test('should extract by predicate', async () => { + const heap = new BinaryHeap((a, b) => a.priority - b.priority); + + heap.insert({ id: 'a', priority: 5, value: 'five' }); + heap.insert({ id: 'b', priority: 2, value: 'two' }); + heap.insert({ id: 'c', priority: 8, value: 'eight' }); + + const extracted = heap.extractIf(item => item.id === 'b'); + expect(extracted?.id).toEqual('b'); + expect(heap.size).toEqual(2); + + // Should not find it again + const notFound = heap.extractIf(item => item.id === 'b'); + expect(notFound).toBeUndefined(); +}); + +tap.test('should extract by key', async () => { + const heap = new BinaryHeap( + (a, b) => a.priority - b.priority, + (item) => item.id + ); + + heap.insert({ id: 'a', priority: 5, value: 'five' }); + heap.insert({ id: 'b', priority: 2, value: 'two' }); + heap.insert({ id: 'c', priority: 8, value: 'eight' }); + + expect(heap.hasKey('b')).toBeTrue(); + + const extracted = heap.extractByKey('b'); + expect(extracted?.id).toEqual('b'); + expect(heap.size).toEqual(2); + expect(heap.hasKey('b')).toBeFalse(); + + // Should not find it again + const notFound = heap.extractByKey('b'); + expect(notFound).toBeUndefined(); +}); + +tap.test('should throw when using key operations without extractKey', async () => { + const heap = new BinaryHeap((a, b) => a.priority - b.priority); + + heap.insert({ id: 'a', priority: 5, value: 'five' }); + + let error: Error | null = null; + try { + heap.extractByKey('a'); + } catch (e: any) { + error = e; + } + + expect(error).not.toBeNull(); + expect(error?.message).toContain('extractKey function must be provided'); +}); + +tap.test('should handle duplicates correctly', async () => { + const heap = new BinaryHeap((a, b) => a - b); + + heap.insert(5); + heap.insert(5); + heap.insert(5); + heap.insert(3); + heap.insert(7); + + expect(heap.size).toEqual(5); + expect(heap.extract()).toEqual(3); + expect(heap.extract()).toEqual(5); + expect(heap.extract()).toEqual(5); + expect(heap.extract()).toEqual(5); + expect(heap.extract()).toEqual(7); +}); + +tap.test('should convert to array without modifying heap', async () => { + const heap = new BinaryHeap((a, b) => a - b); + + heap.insert(5); + heap.insert(3); + heap.insert(7); + + const array = heap.toArray(); + expect(array).toContain(3); + expect(array).toContain(5); + expect(array).toContain(7); + expect(array.length).toEqual(3); + + // Heap should still be intact + expect(heap.size).toEqual(3); + expect(heap.extract()).toEqual(3); +}); + +tap.test('should clear the heap', async () => { + const heap = new BinaryHeap( + (a, b) => a.priority - b.priority, + (item) => item.id + ); + + heap.insert({ id: 'a', priority: 5, value: 'five' }); + heap.insert({ id: 'b', priority: 2, value: 'two' }); + + expect(heap.size).toEqual(2); + expect(heap.hasKey('a')).toBeTrue(); + + heap.clear(); + + expect(heap.size).toEqual(0); + expect(heap.isEmpty()).toBeTrue(); + expect(heap.hasKey('a')).toBeFalse(); +}); + +tap.test('should handle complex extraction patterns', async () => { + const heap = new BinaryHeap((a, b) => a - b); + + // Insert numbers 1-10 in random order + [8, 3, 5, 9, 1, 7, 4, 10, 2, 6].forEach(n => heap.insert(n)); + + // Extract some in order + expect(heap.extract()).toEqual(1); + expect(heap.extract()).toEqual(2); + + // Insert more + heap.insert(0); + heap.insert(1.5); + + // Continue extracting + expect(heap.extract()).toEqual(0); + expect(heap.extract()).toEqual(1.5); + expect(heap.extract()).toEqual(3); + + // Verify remaining size (10 - 2 extracted + 2 inserted - 3 extracted = 7) + expect(heap.size).toEqual(7); +}); + +tap.start(); \ No newline at end of file diff --git a/test/core/utils/test.lifecycle-component.ts b/test/core/utils/test.lifecycle-component.ts new file mode 100644 index 0000000..2955e97 --- /dev/null +++ b/test/core/utils/test.lifecycle-component.ts @@ -0,0 +1,252 @@ +import { tap, expect } from '@git.zone/tstest/tapbundle'; +import { LifecycleComponent } from '../../../ts/core/utils/lifecycle-component.js'; +import { EventEmitter } from 'events'; + +// Test implementation of LifecycleComponent +class TestComponent extends LifecycleComponent { + public timerCallCount = 0; + public intervalCallCount = 0; + public cleanupCalled = false; + public testEmitter = new EventEmitter(); + public listenerCallCount = 0; + + constructor() { + super(); + this.setupTimers(); + this.setupListeners(); + } + + private setupTimers() { + // Set up a timeout + this.setTimeout(() => { + this.timerCallCount++; + }, 100); + + // Set up an interval + this.setInterval(() => { + this.intervalCallCount++; + }, 50); + } + + private setupListeners() { + this.addEventListener(this.testEmitter, 'test-event', () => { + this.listenerCallCount++; + }); + } + + protected async onCleanup(): Promise { + this.cleanupCalled = true; + } + + // Expose protected methods for testing + public testSetTimeout(handler: Function, timeout: number): NodeJS.Timeout { + return this.setTimeout(handler, timeout); + } + + public testSetInterval(handler: Function, interval: number): NodeJS.Timeout { + return this.setInterval(handler, interval); + } + + public testClearTimeout(timer: NodeJS.Timeout): void { + return this.clearTimeout(timer); + } + + public testClearInterval(timer: NodeJS.Timeout): void { + return this.clearInterval(timer); + } + + public testAddEventListener(target: any, event: string, handler: Function): void { + return this.addEventListener(target, event, handler); + } + + public testIsShuttingDown(): boolean { + return this.isShuttingDownState(); + } +} + +tap.test('should manage timers properly', async () => { + const component = new TestComponent(); + + // Wait for timers to fire + await new Promise(resolve => setTimeout(resolve, 200)); + + expect(component.timerCallCount).toEqual(1); + expect(component.intervalCallCount).toBeGreaterThan(2); + + await component.cleanup(); +}); + +tap.test('should manage event listeners properly', async () => { + const component = new TestComponent(); + + // Emit events + component.testEmitter.emit('test-event'); + component.testEmitter.emit('test-event'); + + expect(component.listenerCallCount).toEqual(2); + + // Cleanup and verify listeners are removed + await component.cleanup(); + + component.testEmitter.emit('test-event'); + expect(component.listenerCallCount).toEqual(2); // Should not increase +}); + +tap.test('should prevent timer execution after cleanup', async () => { + const component = new TestComponent(); + + let laterCallCount = 0; + component.testSetTimeout(() => { + laterCallCount++; + }, 100); + + // Cleanup immediately + await component.cleanup(); + + // Wait for timer that would have fired + await new Promise(resolve => setTimeout(resolve, 150)); + + expect(laterCallCount).toEqual(0); +}); + +tap.test('should handle child components', async () => { + class ParentComponent extends LifecycleComponent { + public child: TestComponent; + + constructor() { + super(); + this.child = new TestComponent(); + this.registerChildComponent(this.child); + } + } + + const parent = new ParentComponent(); + + // Wait for child timers + await new Promise(resolve => setTimeout(resolve, 100)); + + expect(parent.child.timerCallCount).toEqual(1); + + // Cleanup parent should cleanup child + await parent.cleanup(); + + expect(parent.child.cleanupCalled).toBeTrue(); + expect(parent.child.testIsShuttingDown()).toBeTrue(); +}); + +tap.test('should handle multiple cleanup calls gracefully', async () => { + const component = new TestComponent(); + + // Call cleanup multiple times + const promises = [ + component.cleanup(), + component.cleanup(), + component.cleanup() + ]; + + await Promise.all(promises); + + // Should only clean up once + expect(component.cleanupCalled).toBeTrue(); +}); + +tap.test('should clear specific timers', async () => { + const component = new TestComponent(); + + let callCount = 0; + const timer = component.testSetTimeout(() => { + callCount++; + }, 100); + + // Clear the timer + component.testClearTimeout(timer); + + // Wait and verify it didn't fire + await new Promise(resolve => setTimeout(resolve, 150)); + + expect(callCount).toEqual(0); + + await component.cleanup(); +}); + +tap.test('should clear specific intervals', async () => { + const component = new TestComponent(); + + let callCount = 0; + const interval = component.testSetInterval(() => { + callCount++; + }, 50); + + // Let it run a bit + await new Promise(resolve => setTimeout(resolve, 120)); + + const countBeforeClear = callCount; + expect(countBeforeClear).toBeGreaterThan(1); + + // Clear the interval + component.testClearInterval(interval); + + // Wait and verify it stopped + await new Promise(resolve => setTimeout(resolve, 100)); + + expect(callCount).toEqual(countBeforeClear); + + await component.cleanup(); +}); + +tap.test('should handle once event listeners', async () => { + const component = new TestComponent(); + const emitter = new EventEmitter(); + + let callCount = 0; + const handler = () => { + callCount++; + }; + + component.testAddEventListener(emitter, 'once-event', handler, { once: true }); + + // Check listener count before emit + const beforeCount = emitter.listenerCount('once-event'); + expect(beforeCount).toEqual(1); + + // Emit once - the listener should fire and auto-remove + emitter.emit('once-event'); + expect(callCount).toEqual(1); + + // Check listener was auto-removed + const afterCount = emitter.listenerCount('once-event'); + expect(afterCount).toEqual(0); + + // Emit again - should not increase count + emitter.emit('once-event'); + expect(callCount).toEqual(1); + + await component.cleanup(); +}); + +tap.test('should not create timers when shutting down', async () => { + const component = new TestComponent(); + + // Start cleanup + const cleanupPromise = component.cleanup(); + + // Try to create timers during shutdown + let timerFired = false; + let intervalFired = false; + + component.testSetTimeout(() => { + timerFired = true; + }, 10); + + component.testSetInterval(() => { + intervalFired = true; + }, 10); + + await cleanupPromise; + await new Promise(resolve => setTimeout(resolve, 50)); + + expect(timerFired).toBeFalse(); + expect(intervalFired).toBeFalse(); +}); + +tap.start(); \ No newline at end of file diff --git a/ts/common/eventUtils.ts b/ts/common/eventUtils.ts deleted file mode 100644 index 5415e07..0000000 --- a/ts/common/eventUtils.ts +++ /dev/null @@ -1,34 +0,0 @@ -// Port80Handler removed - use SmartCertManager instead -import { Port80HandlerEvents } from './types.js'; -import type { ICertificateData, ICertificateFailure, ICertificateExpiring } from './types.js'; - -/** - * Subscribers callback definitions for Port80Handler events - */ -export interface Port80HandlerSubscribers { - onCertificateIssued?: (data: ICertificateData) => void; - onCertificateRenewed?: (data: ICertificateData) => void; - onCertificateFailed?: (data: ICertificateFailure) => void; - onCertificateExpiring?: (data: ICertificateExpiring) => void; -} - -/** - * Subscribes to Port80Handler events based on provided callbacks - */ -export function subscribeToPort80Handler( - handler: any, - subscribers: Port80HandlerSubscribers -): void { - if (subscribers.onCertificateIssued) { - handler.on(Port80HandlerEvents.CERTIFICATE_ISSUED, subscribers.onCertificateIssued); - } - if (subscribers.onCertificateRenewed) { - handler.on(Port80HandlerEvents.CERTIFICATE_RENEWED, subscribers.onCertificateRenewed); - } - if (subscribers.onCertificateFailed) { - handler.on(Port80HandlerEvents.CERTIFICATE_FAILED, subscribers.onCertificateFailed); - } - if (subscribers.onCertificateExpiring) { - handler.on(Port80HandlerEvents.CERTIFICATE_EXPIRING, subscribers.onCertificateExpiring); - } -} \ No newline at end of file diff --git a/ts/common/types.ts b/ts/common/types.ts deleted file mode 100644 index e04febd..0000000 --- a/ts/common/types.ts +++ /dev/null @@ -1,91 +0,0 @@ -import * as plugins from '../plugins.js'; - -/** - * Shared types for certificate management and domain options - */ - -/** - * Domain forwarding configuration - */ -export interface IForwardConfig { - ip: string; - port: number; -} - -/** - * Domain configuration options - */ -export interface IDomainOptions { - domainName: string; - sslRedirect: boolean; // if true redirects the request to port 443 - acmeMaintenance: boolean; // tries to always have a valid cert for this domain - forward?: IForwardConfig; // forwards all http requests to that target - acmeForward?: IForwardConfig; // forwards letsencrypt requests to this config -} - -/** - * Certificate data that can be emitted via events or set from outside - */ -export interface ICertificateData { - domain: string; - certificate: string; - privateKey: string; - expiryDate: Date; -} - -/** - * Events emitted by the Port80Handler - */ -export enum Port80HandlerEvents { - CERTIFICATE_ISSUED = 'certificate-issued', - CERTIFICATE_RENEWED = 'certificate-renewed', - CERTIFICATE_FAILED = 'certificate-failed', - CERTIFICATE_EXPIRING = 'certificate-expiring', - MANAGER_STARTED = 'manager-started', - MANAGER_STOPPED = 'manager-stopped', - REQUEST_FORWARDED = 'request-forwarded', -} - -/** - * Certificate failure payload type - */ -export interface ICertificateFailure { - domain: string; - error: string; - isRenewal: boolean; -} - -/** - * Certificate expiry payload type - */ -export interface ICertificateExpiring { - domain: string; - expiryDate: Date; - daysRemaining: number; -} -/** - * Forwarding configuration for specific domains in ACME setup - */ -export interface IDomainForwardConfig { - domain: string; - forwardConfig?: IForwardConfig; - acmeForwardConfig?: IForwardConfig; - sslRedirect?: boolean; -} - -/** - * Unified ACME configuration options used across proxies and handlers - */ -export interface IAcmeOptions { - accountEmail?: string; // Email for Let's Encrypt account - enabled?: boolean; // Whether ACME is enabled - port?: number; // Port to listen on for ACME challenges (default: 80) - useProduction?: boolean; // Use production environment (default: staging) - httpsRedirectPort?: number; // Port to redirect HTTP requests to HTTPS (default: 443) - renewThresholdDays?: number; // Days before expiry to renew certificates - renewCheckIntervalHours?: number; // How often to check for renewals (in hours) - autoRenew?: boolean; // Whether to automatically renew certificates - certificateStore?: string; // Directory to store certificates - skipConfiguredCerts?: boolean; // Skip domains with existing certificates - domainForwards?: IDomainForwardConfig[]; // Domain-specific forwarding configs -} diff --git a/ts/core/utils/binary-heap.ts b/ts/core/utils/binary-heap.ts new file mode 100644 index 0000000..1f6efbf --- /dev/null +++ b/ts/core/utils/binary-heap.ts @@ -0,0 +1,225 @@ +/** + * A binary heap implementation for efficient priority queue operations + * Supports O(log n) insert and extract operations + */ +export class BinaryHeap { + private heap: T[] = []; + private keyMap?: Map; // For efficient key-based lookups + + constructor( + private compareFn: (a: T, b: T) => number, + private extractKey?: (item: T) => string + ) { + if (extractKey) { + this.keyMap = new Map(); + } + } + + /** + * Get the current size of the heap + */ + public get size(): number { + return this.heap.length; + } + + /** + * Check if the heap is empty + */ + public isEmpty(): boolean { + return this.heap.length === 0; + } + + /** + * Peek at the top element without removing it + */ + public peek(): T | undefined { + return this.heap[0]; + } + + /** + * Insert a new item into the heap + * O(log n) time complexity + */ + public insert(item: T): void { + const index = this.heap.length; + this.heap.push(item); + + if (this.keyMap && this.extractKey) { + const key = this.extractKey(item); + this.keyMap.set(key, index); + } + + this.bubbleUp(index); + } + + /** + * Extract the top element from the heap + * O(log n) time complexity + */ + public extract(): T | undefined { + if (this.heap.length === 0) return undefined; + if (this.heap.length === 1) { + const item = this.heap.pop()!; + if (this.keyMap && this.extractKey) { + this.keyMap.delete(this.extractKey(item)); + } + return item; + } + + const result = this.heap[0]; + const lastItem = this.heap.pop()!; + this.heap[0] = lastItem; + + if (this.keyMap && this.extractKey) { + this.keyMap.delete(this.extractKey(result)); + this.keyMap.set(this.extractKey(lastItem), 0); + } + + this.bubbleDown(0); + return result; + } + + /** + * Extract an element that matches the predicate + * O(n) time complexity for search, O(log n) for extraction + */ + public extractIf(predicate: (item: T) => boolean): T | undefined { + const index = this.heap.findIndex(predicate); + if (index === -1) return undefined; + + return this.extractAt(index); + } + + /** + * Extract an element by its key (if extractKey was provided) + * O(log n) time complexity + */ + public extractByKey(key: string): T | undefined { + if (!this.keyMap || !this.extractKey) { + throw new Error('extractKey function must be provided to use key-based extraction'); + } + + const index = this.keyMap.get(key); + if (index === undefined) return undefined; + + return this.extractAt(index); + } + + /** + * Check if a key exists in the heap + * O(1) time complexity + */ + public hasKey(key: string): boolean { + if (!this.keyMap) return false; + return this.keyMap.has(key); + } + + /** + * Get all elements as an array (does not modify heap) + * O(n) time complexity + */ + public toArray(): T[] { + return [...this.heap]; + } + + /** + * Clear the heap + */ + public clear(): void { + this.heap = []; + if (this.keyMap) { + this.keyMap.clear(); + } + } + + /** + * Extract element at specific index + */ + private extractAt(index: number): T { + const item = this.heap[index]; + + if (this.keyMap && this.extractKey) { + this.keyMap.delete(this.extractKey(item)); + } + + if (index === this.heap.length - 1) { + this.heap.pop(); + return item; + } + + const lastItem = this.heap.pop()!; + this.heap[index] = lastItem; + + if (this.keyMap && this.extractKey) { + this.keyMap.set(this.extractKey(lastItem), index); + } + + // Try bubbling up first + const parentIndex = Math.floor((index - 1) / 2); + if (parentIndex >= 0 && this.compareFn(this.heap[index], this.heap[parentIndex]) < 0) { + this.bubbleUp(index); + } else { + this.bubbleDown(index); + } + + return item; + } + + /** + * Bubble up element at given index to maintain heap property + */ + private bubbleUp(index: number): void { + while (index > 0) { + const parentIndex = Math.floor((index - 1) / 2); + + if (this.compareFn(this.heap[index], this.heap[parentIndex]) >= 0) { + break; + } + + this.swap(index, parentIndex); + index = parentIndex; + } + } + + /** + * Bubble down element at given index to maintain heap property + */ + private bubbleDown(index: number): void { + const length = this.heap.length; + + while (true) { + const leftChild = 2 * index + 1; + const rightChild = 2 * index + 2; + let smallest = index; + + if (leftChild < length && + this.compareFn(this.heap[leftChild], this.heap[smallest]) < 0) { + smallest = leftChild; + } + + if (rightChild < length && + this.compareFn(this.heap[rightChild], this.heap[smallest]) < 0) { + smallest = rightChild; + } + + if (smallest === index) break; + + this.swap(index, smallest); + index = smallest; + } + } + + /** + * Swap two elements in the heap + */ + private swap(i: number, j: number): void { + const temp = this.heap[i]; + this.heap[i] = this.heap[j]; + this.heap[j] = temp; + + if (this.keyMap && this.extractKey) { + this.keyMap.set(this.extractKey(this.heap[i]), i); + this.keyMap.set(this.extractKey(this.heap[j]), j); + } + } +} \ No newline at end of file diff --git a/ts/core/utils/enhanced-connection-pool.ts b/ts/core/utils/enhanced-connection-pool.ts new file mode 100644 index 0000000..93cf328 --- /dev/null +++ b/ts/core/utils/enhanced-connection-pool.ts @@ -0,0 +1,420 @@ +import { LifecycleComponent } from './lifecycle-component.js'; +import { BinaryHeap } from './binary-heap.js'; +import { AsyncMutex } from './async-utils.js'; +import { EventEmitter } from 'events'; + +/** + * Interface for pooled connection + */ +export interface IPooledConnection { + id: string; + connection: T; + createdAt: number; + lastUsedAt: number; + useCount: number; + inUse: boolean; + metadata?: any; +} + +/** + * Configuration options for the connection pool + */ +export interface IConnectionPoolOptions { + minSize?: number; + maxSize?: number; + acquireTimeout?: number; + idleTimeout?: number; + maxUseCount?: number; + validateOnAcquire?: boolean; + validateOnReturn?: boolean; + queueTimeout?: number; + connectionFactory: () => Promise; + connectionValidator?: (connection: T) => Promise; + connectionDestroyer?: (connection: T) => Promise; + onConnectionError?: (error: Error, connection?: T) => void; +} + +/** + * Interface for queued acquire request + */ +interface IAcquireRequest { + id: string; + priority: number; + timestamp: number; + resolve: (connection: IPooledConnection) => void; + reject: (error: Error) => void; + timeoutHandle?: NodeJS.Timeout; +} + +/** + * Enhanced connection pool with priority queue, backpressure, and lifecycle management + */ +export class EnhancedConnectionPool extends LifecycleComponent { + private readonly options: Required, 'connectionValidator' | 'connectionDestroyer' | 'onConnectionError'>> & Pick, 'connectionValidator' | 'connectionDestroyer' | 'onConnectionError'>; + private readonly availableConnections: IPooledConnection[] = []; + private readonly activeConnections: Map> = new Map(); + private readonly waitQueue: BinaryHeap>; + private readonly mutex = new AsyncMutex(); + private readonly eventEmitter = new EventEmitter(); + + private connectionIdCounter = 0; + private requestIdCounter = 0; + private isClosing = false; + + // Metrics + private metrics = { + connectionsCreated: 0, + connectionsDestroyed: 0, + connectionsAcquired: 0, + connectionsReleased: 0, + acquireTimeouts: 0, + validationFailures: 0, + queueHighWaterMark: 0, + }; + + constructor(options: IConnectionPoolOptions) { + super(); + + this.options = { + minSize: 0, + maxSize: 10, + acquireTimeout: 30000, + idleTimeout: 300000, // 5 minutes + maxUseCount: Infinity, + validateOnAcquire: true, + validateOnReturn: false, + queueTimeout: 60000, + ...options, + }; + + // Initialize priority queue (higher priority = extracted first) + this.waitQueue = new BinaryHeap>( + (a, b) => b.priority - a.priority || a.timestamp - b.timestamp, + (item) => item.id + ); + + // Start maintenance cycle + this.startMaintenance(); + + // Initialize minimum connections + this.initializeMinConnections(); + } + + /** + * Initialize minimum number of connections + */ + private async initializeMinConnections(): Promise { + const promises: Promise[] = []; + + for (let i = 0; i < this.options.minSize; i++) { + promises.push( + this.createConnection() + .then(conn => { + this.availableConnections.push(conn); + }) + .catch(err => { + if (this.options.onConnectionError) { + this.options.onConnectionError(err); + } + }) + ); + } + + await Promise.all(promises); + } + + /** + * Start maintenance timer for idle connection cleanup + */ + private startMaintenance(): void { + this.setInterval(() => { + this.performMaintenance(); + }, 30000); // Every 30 seconds + } + + /** + * Perform maintenance tasks + */ + private async performMaintenance(): Promise { + await this.mutex.runExclusive(async () => { + const now = Date.now(); + const toRemove: IPooledConnection[] = []; + + // Check for idle connections beyond minimum size + for (let i = this.availableConnections.length - 1; i >= 0; i--) { + const conn = this.availableConnections[i]; + + // Keep minimum connections + if (this.availableConnections.length <= this.options.minSize) { + break; + } + + // Remove idle connections + if (now - conn.lastUsedAt > this.options.idleTimeout) { + toRemove.push(conn); + this.availableConnections.splice(i, 1); + } + } + + // Destroy idle connections + for (const conn of toRemove) { + await this.destroyConnection(conn); + } + }); + } + + /** + * Acquire a connection from the pool + */ + public async acquire(priority: number = 0, timeout?: number): Promise> { + if (this.isClosing) { + throw new Error('Connection pool is closing'); + } + + return this.mutex.runExclusive(async () => { + // Try to get an available connection + const connection = await this.tryAcquireConnection(); + if (connection) { + return connection; + } + + // Check if we can create a new connection + const totalConnections = this.availableConnections.length + this.activeConnections.size; + if (totalConnections < this.options.maxSize) { + try { + const newConnection = await this.createConnection(); + return this.checkoutConnection(newConnection); + } catch (err) { + // Fall through to queue if creation fails + } + } + + // Add to wait queue + return this.queueAcquireRequest(priority, timeout); + }); + } + + /** + * Try to acquire an available connection + */ + private async tryAcquireConnection(): Promise | null> { + while (this.availableConnections.length > 0) { + const connection = this.availableConnections.shift()!; + + // Check if connection exceeded max use count + if (connection.useCount >= this.options.maxUseCount) { + await this.destroyConnection(connection); + continue; + } + + // Validate connection if required + if (this.options.validateOnAcquire && this.options.connectionValidator) { + try { + const isValid = await this.options.connectionValidator(connection.connection); + if (!isValid) { + this.metrics.validationFailures++; + await this.destroyConnection(connection); + continue; + } + } catch (err) { + this.metrics.validationFailures++; + await this.destroyConnection(connection); + continue; + } + } + + return this.checkoutConnection(connection); + } + + return null; + } + + /** + * Checkout a connection for use + */ + private checkoutConnection(connection: IPooledConnection): IPooledConnection { + connection.inUse = true; + connection.lastUsedAt = Date.now(); + connection.useCount++; + + this.activeConnections.set(connection.id, connection); + this.metrics.connectionsAcquired++; + + this.eventEmitter.emit('acquire', connection); + return connection; + } + + /** + * Queue an acquire request + */ + private queueAcquireRequest(priority: number, timeout?: number): Promise> { + return new Promise>((resolve, reject) => { + const request: IAcquireRequest = { + id: `req-${this.requestIdCounter++}`, + priority, + timestamp: Date.now(), + resolve, + reject, + }; + + // Set timeout + const timeoutMs = timeout || this.options.queueTimeout; + request.timeoutHandle = this.setTimeout(() => { + if (this.waitQueue.extractByKey(request.id)) { + this.metrics.acquireTimeouts++; + reject(new Error(`Connection acquire timeout after ${timeoutMs}ms`)); + } + }, timeoutMs); + + this.waitQueue.insert(request); + this.metrics.queueHighWaterMark = Math.max( + this.metrics.queueHighWaterMark, + this.waitQueue.size + ); + + this.eventEmitter.emit('enqueue', { queueSize: this.waitQueue.size }); + }); + } + + /** + * Release a connection back to the pool + */ + public async release(connection: IPooledConnection): Promise { + return this.mutex.runExclusive(async () => { + if (!connection.inUse || !this.activeConnections.has(connection.id)) { + throw new Error('Connection is not active'); + } + + this.activeConnections.delete(connection.id); + connection.inUse = false; + connection.lastUsedAt = Date.now(); + this.metrics.connectionsReleased++; + + // Check if connection should be destroyed + if (connection.useCount >= this.options.maxUseCount) { + await this.destroyConnection(connection); + return; + } + + // Validate on return if required + if (this.options.validateOnReturn && this.options.connectionValidator) { + try { + const isValid = await this.options.connectionValidator(connection.connection); + if (!isValid) { + await this.destroyConnection(connection); + return; + } + } catch (err) { + await this.destroyConnection(connection); + return; + } + } + + // Check if there are waiting requests + const request = this.waitQueue.extract(); + if (request) { + this.clearTimeout(request.timeoutHandle!); + request.resolve(this.checkoutConnection(connection)); + this.eventEmitter.emit('dequeue', { queueSize: this.waitQueue.size }); + } else { + // Return to available pool + this.availableConnections.push(connection); + this.eventEmitter.emit('release', connection); + } + }); + } + + /** + * Create a new connection + */ + private async createConnection(): Promise> { + const rawConnection = await this.options.connectionFactory(); + + const connection: IPooledConnection = { + id: `conn-${this.connectionIdCounter++}`, + connection: rawConnection, + createdAt: Date.now(), + lastUsedAt: Date.now(), + useCount: 0, + inUse: false, + }; + + this.metrics.connectionsCreated++; + this.eventEmitter.emit('create', connection); + + return connection; + } + + /** + * Destroy a connection + */ + private async destroyConnection(connection: IPooledConnection): Promise { + try { + if (this.options.connectionDestroyer) { + await this.options.connectionDestroyer(connection.connection); + } + + this.metrics.connectionsDestroyed++; + this.eventEmitter.emit('destroy', connection); + } catch (err) { + if (this.options.onConnectionError) { + this.options.onConnectionError(err as Error, connection.connection); + } + } + } + + /** + * Get current pool statistics + */ + public getStats() { + return { + available: this.availableConnections.length, + active: this.activeConnections.size, + waiting: this.waitQueue.size, + total: this.availableConnections.length + this.activeConnections.size, + ...this.metrics, + }; + } + + /** + * Subscribe to pool events + */ + public on(event: string, listener: Function): void { + this.addEventListener(this.eventEmitter, event, listener); + } + + /** + * Close the pool and cleanup resources + */ + protected async onCleanup(): Promise { + this.isClosing = true; + + // Clear the wait queue + while (!this.waitQueue.isEmpty()) { + const request = this.waitQueue.extract(); + if (request) { + this.clearTimeout(request.timeoutHandle!); + request.reject(new Error('Connection pool is closing')); + } + } + + // Wait for active connections to be released (with timeout) + const timeout = 30000; + const startTime = Date.now(); + + while (this.activeConnections.size > 0 && Date.now() - startTime < timeout) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + + // Destroy all connections + const allConnections = [ + ...this.availableConnections, + ...this.activeConnections.values(), + ]; + + await Promise.all(allConnections.map(conn => this.destroyConnection(conn))); + + this.availableConnections.length = 0; + this.activeConnections.clear(); + } +} \ No newline at end of file diff --git a/ts/core/utils/event-system.ts b/ts/core/utils/event-system.ts deleted file mode 100644 index ae2f7ba..0000000 --- a/ts/core/utils/event-system.ts +++ /dev/null @@ -1,376 +0,0 @@ -import * as plugins from '../../plugins.js'; -import type { - ICertificateData, - ICertificateFailure, - ICertificateExpiring -} from '../models/common-types.js'; -import type { IRouteConfig } from '../../proxies/smart-proxy/models/route-types.js'; -import { Port80HandlerEvents } from '../models/common-types.js'; - -/** - * Standardized event names used throughout the system - */ -export enum ProxyEvents { - // Certificate events - CERTIFICATE_ISSUED = 'certificate:issued', - CERTIFICATE_RENEWED = 'certificate:renewed', - CERTIFICATE_FAILED = 'certificate:failed', - CERTIFICATE_EXPIRING = 'certificate:expiring', - - // Component lifecycle events - COMPONENT_STARTED = 'component:started', - COMPONENT_STOPPED = 'component:stopped', - - // Connection events - CONNECTION_ESTABLISHED = 'connection:established', - CONNECTION_CLOSED = 'connection:closed', - CONNECTION_ERROR = 'connection:error', - - // Request events - REQUEST_RECEIVED = 'request:received', - REQUEST_COMPLETED = 'request:completed', - REQUEST_ERROR = 'request:error', - - // Route events - ROUTE_MATCHED = 'route:matched', - ROUTE_UPDATED = 'route:updated', - ROUTE_ERROR = 'route:error', - - // Security events - SECURITY_BLOCKED = 'security:blocked', - SECURITY_BREACH_ATTEMPT = 'security:breach-attempt', - - // TLS events - TLS_HANDSHAKE_STARTED = 'tls:handshake-started', - TLS_HANDSHAKE_COMPLETED = 'tls:handshake-completed', - TLS_HANDSHAKE_FAILED = 'tls:handshake-failed' -} - -/** - * Component types for event metadata - */ -export enum ComponentType { - SMART_PROXY = 'smart-proxy', - NETWORK_PROXY = 'network-proxy', - NFTABLES_PROXY = 'nftables-proxy', - PORT80_HANDLER = 'port80-handler', - CERTIFICATE_MANAGER = 'certificate-manager', - ROUTE_MANAGER = 'route-manager', - CONNECTION_MANAGER = 'connection-manager', - TLS_MANAGER = 'tls-manager', - SECURITY_MANAGER = 'security-manager' -} - -/** - * Base event data interface - */ -export interface IEventData { - timestamp: number; - componentType: ComponentType; - componentId?: string; -} - -/** - * Certificate event data - */ -export interface ICertificateEventData extends IEventData, ICertificateData { - isRenewal?: boolean; - source?: string; -} - -/** - * Certificate failure event data - */ -export interface ICertificateFailureEventData extends IEventData, ICertificateFailure {} - -/** - * Certificate expiring event data - */ -export interface ICertificateExpiringEventData extends IEventData, ICertificateExpiring {} - -/** - * Component lifecycle event data - */ -export interface IComponentEventData extends IEventData { - name: string; - version?: string; -} - -/** - * Connection event data - */ -export interface IConnectionEventData extends IEventData { - connectionId: string; - clientIp: string; - serverIp?: string; - port: number; - isTls?: boolean; - domain?: string; -} - -/** - * Request event data - */ -export interface IRequestEventData extends IEventData { - connectionId: string; - requestId: string; - method?: string; - path?: string; - statusCode?: number; - duration?: number; - routeId?: string; - routeName?: string; -} - -/** - * Route event data - */ -export interface IRouteEventData extends IEventData { - route: IRouteConfig; - context?: any; -} - -/** - * Security event data - */ -export interface ISecurityEventData extends IEventData { - clientIp: string; - reason: string; - routeId?: string; - routeName?: string; -} - -/** - * TLS event data - */ -export interface ITlsEventData extends IEventData { - connectionId: string; - domain?: string; - clientIp: string; - tlsVersion?: string; - cipherSuite?: string; - sniHostname?: string; -} - -/** - * Logger interface for event system - */ -export interface IEventLogger { - info: (message: string, ...args: any[]) => void; - warn: (message: string, ...args: any[]) => void; - error: (message: string, ...args: any[]) => void; - debug?: (message: string, ...args: any[]) => void; -} - -/** - * Event handler type - */ -export type EventHandler = (data: T) => void; - -/** - * Helper class to standardize event emission and handling - * across all system components - */ -export class EventSystem { - private emitter: plugins.EventEmitter; - private componentType: ComponentType; - private componentId: string; - private logger?: IEventLogger; - - constructor( - componentType: ComponentType, - componentId: string = '', - logger?: IEventLogger - ) { - this.emitter = new plugins.EventEmitter(); - this.componentType = componentType; - this.componentId = componentId; - this.logger = logger; - } - - /** - * Emit a certificate issued event - */ - public emitCertificateIssued(data: Omit): void { - const eventData: ICertificateEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.info?.(`Certificate issued for ${data.domain}`); - this.emitter.emit(ProxyEvents.CERTIFICATE_ISSUED, eventData); - } - - /** - * Emit a certificate renewed event - */ - public emitCertificateRenewed(data: Omit): void { - const eventData: ICertificateEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.info?.(`Certificate renewed for ${data.domain}`); - this.emitter.emit(ProxyEvents.CERTIFICATE_RENEWED, eventData); - } - - /** - * Emit a certificate failed event - */ - public emitCertificateFailed(data: Omit): void { - const eventData: ICertificateFailureEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.error?.(`Certificate issuance failed for ${data.domain}: ${data.error}`); - this.emitter.emit(ProxyEvents.CERTIFICATE_FAILED, eventData); - } - - /** - * Emit a certificate expiring event - */ - public emitCertificateExpiring(data: Omit): void { - const eventData: ICertificateExpiringEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.warn?.(`Certificate expiring for ${data.domain} in ${data.daysRemaining} days`); - this.emitter.emit(ProxyEvents.CERTIFICATE_EXPIRING, eventData); - } - - /** - * Emit a component started event - */ - public emitComponentStarted(name: string, version?: string): void { - const eventData: IComponentEventData = { - name, - version, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.info?.(`Component ${name} started${version ? ` (v${version})` : ''}`); - this.emitter.emit(ProxyEvents.COMPONENT_STARTED, eventData); - } - - /** - * Emit a component stopped event - */ - public emitComponentStopped(name: string): void { - const eventData: IComponentEventData = { - name, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.info?.(`Component ${name} stopped`); - this.emitter.emit(ProxyEvents.COMPONENT_STOPPED, eventData); - } - - /** - * Emit a connection established event - */ - public emitConnectionEstablished(data: Omit): void { - const eventData: IConnectionEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.debug?.(`Connection ${data.connectionId} established from ${data.clientIp} on port ${data.port}`); - this.emitter.emit(ProxyEvents.CONNECTION_ESTABLISHED, eventData); - } - - /** - * Emit a connection closed event - */ - public emitConnectionClosed(data: Omit): void { - const eventData: IConnectionEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.debug?.(`Connection ${data.connectionId} closed`); - this.emitter.emit(ProxyEvents.CONNECTION_CLOSED, eventData); - } - - /** - * Emit a route matched event - */ - public emitRouteMatched(data: Omit): void { - const eventData: IRouteEventData = { - ...data, - timestamp: Date.now(), - componentType: this.componentType, - componentId: this.componentId - }; - - this.logger?.debug?.(`Route matched: ${data.route.name || data.route.id || 'unnamed'}`); - this.emitter.emit(ProxyEvents.ROUTE_MATCHED, eventData); - } - - /** - * Subscribe to an event - */ - public on(event: ProxyEvents, handler: EventHandler): void { - this.emitter.on(event, handler); - } - - /** - * Subscribe to an event once - */ - public once(event: ProxyEvents, handler: EventHandler): void { - this.emitter.once(event, handler); - } - - /** - * Unsubscribe from an event - */ - public off(event: ProxyEvents, handler: EventHandler): void { - this.emitter.off(event, handler); - } - - /** - * Map Port80Handler events to standard proxy events - */ - public subscribePort80HandlerEvents(handler: any): void { - handler.on(Port80HandlerEvents.CERTIFICATE_ISSUED, (data: ICertificateData) => { - this.emitCertificateIssued({ - ...data, - isRenewal: false, - source: 'port80handler' - }); - }); - - handler.on(Port80HandlerEvents.CERTIFICATE_RENEWED, (data: ICertificateData) => { - this.emitCertificateRenewed({ - ...data, - isRenewal: true, - source: 'port80handler' - }); - }); - - handler.on(Port80HandlerEvents.CERTIFICATE_FAILED, (data: ICertificateFailure) => { - this.emitCertificateFailed(data); - }); - - handler.on(Port80HandlerEvents.CERTIFICATE_EXPIRING, (data: ICertificateExpiring) => { - this.emitCertificateExpiring(data); - }); - } -} \ No newline at end of file diff --git a/ts/core/utils/event-utils.ts b/ts/core/utils/event-utils.ts deleted file mode 100644 index 93703ce..0000000 --- a/ts/core/utils/event-utils.ts +++ /dev/null @@ -1,25 +0,0 @@ -// Port80Handler has been removed - use SmartCertManager instead -import { Port80HandlerEvents } from '../models/common-types.js'; - -// Re-export for backward compatibility -export { Port80HandlerEvents }; - -/** - * @deprecated Use SmartCertManager instead - */ -export interface IPort80HandlerSubscribers { - onCertificateIssued?: (data: any) => void; - onCertificateRenewed?: (data: any) => void; - onCertificateFailed?: (data: any) => void; - onCertificateExpiring?: (data: any) => void; -} - -/** - * @deprecated Use SmartCertManager instead - */ -export function subscribeToPort80Handler( - handler: any, - subscribers: IPort80HandlerSubscribers -): void { - console.warn('subscribeToPort80Handler is deprecated - use SmartCertManager instead'); -} \ No newline at end of file diff --git a/ts/core/utils/index.ts b/ts/core/utils/index.ts index 771022f..a79f145 100644 --- a/ts/core/utils/index.ts +++ b/ts/core/utils/index.ts @@ -2,7 +2,6 @@ * Core utility functions */ -export * from './event-utils.js'; export * from './validation-utils.js'; export * from './ip-utils.js'; export * from './template-utils.js'; @@ -10,8 +9,10 @@ export * from './route-manager.js'; export * from './route-utils.js'; export * from './security-utils.js'; export * from './shared-security-manager.js'; -export * from './event-system.js'; export * from './websocket-utils.js'; export * from './logger.js'; export * from './async-utils.js'; export * from './fs-utils.js'; +export * from './lifecycle-component.js'; +export * from './binary-heap.js'; +export * from './enhanced-connection-pool.js'; diff --git a/ts/core/utils/lifecycle-component.ts b/ts/core/utils/lifecycle-component.ts new file mode 100644 index 0000000..bc5dba0 --- /dev/null +++ b/ts/core/utils/lifecycle-component.ts @@ -0,0 +1,213 @@ +/** + * Base class for components that need proper resource lifecycle management + * Provides automatic cleanup of timers and event listeners to prevent memory leaks + */ +export abstract class LifecycleComponent { + private timers: Set = new Set(); + private intervals: Set = new Set(); + private listeners: Array<{ + target: any; + event: string; + handler: Function; + once?: boolean; + }> = []; + private childComponents: Set = new Set(); + protected isShuttingDown = false; + private cleanupPromise?: Promise; + + /** + * Create a managed setTimeout that will be automatically cleaned up + */ + protected setTimeout(handler: Function, timeout: number): NodeJS.Timeout { + if (this.isShuttingDown) { + // Return a dummy timer if shutting down + return setTimeout(() => {}, 0); + } + + const wrappedHandler = () => { + this.timers.delete(timer); + if (!this.isShuttingDown) { + handler(); + } + }; + + const timer = setTimeout(wrappedHandler, timeout); + this.timers.add(timer); + return timer; + } + + /** + * Create a managed setInterval that will be automatically cleaned up + */ + protected setInterval(handler: Function, interval: number): NodeJS.Timeout { + if (this.isShuttingDown) { + // Return a dummy timer if shutting down + return setInterval(() => {}, interval); + } + + const wrappedHandler = () => { + if (!this.isShuttingDown) { + handler(); + } + }; + + const timer = setInterval(wrappedHandler, interval); + this.intervals.add(timer); + + // Allow process to exit even with timer + if (typeof timer.unref === 'function') { + timer.unref(); + } + + return timer; + } + + /** + * Clear a managed timeout + */ + protected clearTimeout(timer: NodeJS.Timeout): void { + clearTimeout(timer); + this.timers.delete(timer); + } + + /** + * Clear a managed interval + */ + protected clearInterval(timer: NodeJS.Timeout): void { + clearInterval(timer); + this.intervals.delete(timer); + } + + /** + * Add a managed event listener that will be automatically removed on cleanup + */ + protected addEventListener( + target: any, + event: string, + handler: Function, + options?: { once?: boolean } + ): void { + if (this.isShuttingDown) { + return; + } + + // Support both EventEmitter and DOM-style event targets + if (typeof target.on === 'function') { + if (options?.once) { + target.once(event, handler); + } else { + target.on(event, handler); + } + } else if (typeof target.addEventListener === 'function') { + target.addEventListener(event, handler, options); + } else { + throw new Error('Target must support on() or addEventListener()'); + } + + this.listeners.push({ + target, + event, + handler, + once: options?.once + }); + } + + /** + * Remove a specific event listener + */ + protected removeEventListener(target: any, event: string, handler: Function): void { + // Remove from target + if (typeof target.removeListener === 'function') { + target.removeListener(event, handler); + } else if (typeof target.removeEventListener === 'function') { + target.removeEventListener(event, handler); + } + + // Remove from our tracking + const index = this.listeners.findIndex( + l => l.target === target && l.event === event && l.handler === handler + ); + if (index !== -1) { + this.listeners.splice(index, 1); + } + } + + /** + * Register a child component that should be cleaned up when this component is cleaned up + */ + protected registerChildComponent(component: LifecycleComponent): void { + this.childComponents.add(component); + } + + /** + * Unregister a child component + */ + protected unregisterChildComponent(component: LifecycleComponent): void { + this.childComponents.delete(component); + } + + /** + * Override this method to implement component-specific cleanup logic + */ + protected async onCleanup(): Promise { + // Override in subclasses + } + + /** + * Clean up all managed resources + */ + public async cleanup(): Promise { + // Return existing cleanup promise if already cleaning up + if (this.cleanupPromise) { + return this.cleanupPromise; + } + + this.cleanupPromise = this.performCleanup(); + return this.cleanupPromise; + } + + private async performCleanup(): Promise { + this.isShuttingDown = true; + + // First, clean up child components + const childCleanupPromises: Promise[] = []; + for (const child of this.childComponents) { + childCleanupPromises.push(child.cleanup()); + } + await Promise.all(childCleanupPromises); + this.childComponents.clear(); + + // Clear all timers + for (const timer of this.timers) { + clearTimeout(timer); + } + this.timers.clear(); + + // Clear all intervals + for (const timer of this.intervals) { + clearInterval(timer); + } + this.intervals.clear(); + + // Remove all event listeners + for (const { target, event, handler } of this.listeners) { + // All listeners need to be removed, including 'once' listeners that might not have fired + if (typeof target.removeListener === 'function') { + target.removeListener(event, handler); + } else if (typeof target.removeEventListener === 'function') { + target.removeEventListener(event, handler); + } + } + this.listeners = []; + + // Call subclass cleanup + await this.onCleanup(); + } + + /** + * Check if the component is shutting down + */ + protected isShuttingDownState(): boolean { + return this.isShuttingDown; + } +} \ No newline at end of file diff --git a/ts/proxies/smart-proxy/connection-manager.ts b/ts/proxies/smart-proxy/connection-manager.ts index 20e7d3d..9c777ae 100644 --- a/ts/proxies/smart-proxy/connection-manager.ts +++ b/ts/proxies/smart-proxy/connection-manager.ts @@ -3,11 +3,12 @@ import type { IConnectionRecord, ISmartProxyOptions } from './models/interfaces. import { SecurityManager } from './security-manager.js'; import { TimeoutManager } from './timeout-manager.js'; import { logger } from '../../core/utils/logger.js'; +import { LifecycleComponent } from '../../core/utils/lifecycle-component.js'; /** * Manages connection lifecycle, tracking, and cleanup with performance optimizations */ -export class ConnectionManager { +export class ConnectionManager extends LifecycleComponent { private connectionRecords: Map = new Map(); private terminationStats: { incoming: Record; @@ -16,7 +17,6 @@ export class ConnectionManager { // Performance optimization: Track connections needing inactivity check private nextInactivityCheck: Map = new Map(); - private inactivityCheckTimer: NodeJS.Timeout | null = null; // Connection limits private readonly maxConnections: number; @@ -31,6 +31,8 @@ export class ConnectionManager { private securityManager: SecurityManager, private timeoutManager: TimeoutManager ) { + super(); + // Set reasonable defaults for connection limits this.maxConnections = settings.defaults.security.maxConnections @@ -134,12 +136,10 @@ export class ConnectionManager { */ private startInactivityCheckTimer(): void { // Check every 30 seconds for connections that need inactivity check - this.inactivityCheckTimer = setInterval(() => { + this.setInterval(() => { this.performOptimizedInactivityCheck(); }, 30000); - - // Allow process to exit even with timer - this.inactivityCheckTimer.unref(); + // Note: LifecycleComponent's setInterval already calls unref() } /** @@ -196,11 +196,9 @@ export class ConnectionManager { this.processCleanupQueue(); } else if (!this.cleanupTimer) { // Otherwise, schedule batch processing - this.cleanupTimer = setTimeout(() => { + this.cleanupTimer = this.setTimeout(() => { this.processCleanupQueue(); }, 100); - - this.cleanupTimer.unref(); } } @@ -209,7 +207,7 @@ export class ConnectionManager { */ private processCleanupQueue(): void { if (this.cleanupTimer) { - clearTimeout(this.cleanupTimer); + this.clearTimeout(this.cleanupTimer); this.cleanupTimer = null; } @@ -225,11 +223,9 @@ export class ConnectionManager { // If there are more in queue, schedule next batch if (this.cleanupQueue.size > 0) { - this.cleanupTimer = setTimeout(() => { + this.cleanupTimer = this.setTimeout(() => { this.processCleanupQueue(); }, 10); - - this.cleanupTimer.unref(); } } @@ -531,17 +527,15 @@ export class ConnectionManager { /** * Clear all connections (for shutdown) */ - public clearConnections(): void { - // Stop timers - if (this.inactivityCheckTimer) { - clearInterval(this.inactivityCheckTimer); - this.inactivityCheckTimer = null; - } - - if (this.cleanupTimer) { - clearTimeout(this.cleanupTimer); - this.cleanupTimer = null; - } + public async clearConnections(): Promise { + // Delegate to LifecycleComponent's cleanup + await this.cleanup(); + } + + /** + * Override LifecycleComponent's onCleanup method + */ + protected async onCleanup(): Promise { // Process connections in batches to avoid blocking const connections = Array.from(this.connectionRecords.values());