import { LifecycleComponent } from './lifecycle-component.js'; import { BinaryHeap } from './binary-heap.js'; import { AsyncMutex } from './async-utils.js'; import { EventEmitter } from 'events'; /** * Interface for pooled connection */ export interface IPooledConnection { id: string; connection: T; createdAt: number; lastUsedAt: number; useCount: number; inUse: boolean; metadata?: any; } /** * Configuration options for the connection pool */ export interface IConnectionPoolOptions { minSize?: number; maxSize?: number; acquireTimeout?: number; idleTimeout?: number; maxUseCount?: number; validateOnAcquire?: boolean; validateOnReturn?: boolean; queueTimeout?: number; connectionFactory: () => Promise; connectionValidator?: (connection: T) => Promise; connectionDestroyer?: (connection: T) => Promise; onConnectionError?: (error: Error, connection?: T) => void; } /** * Interface for queued acquire request */ interface IAcquireRequest { id: string; priority: number; timestamp: number; resolve: (connection: IPooledConnection) => void; reject: (error: Error) => void; timeoutHandle?: NodeJS.Timeout; } /** * Enhanced connection pool with priority queue, backpressure, and lifecycle management */ export class EnhancedConnectionPool extends LifecycleComponent { private readonly options: Required, 'connectionValidator' | 'connectionDestroyer' | 'onConnectionError'>> & Pick, 'connectionValidator' | 'connectionDestroyer' | 'onConnectionError'>; private readonly availableConnections: IPooledConnection[] = []; private readonly activeConnections: Map> = new Map(); private readonly waitQueue: BinaryHeap>; private readonly mutex = new AsyncMutex(); private readonly eventEmitter = new EventEmitter(); private connectionIdCounter = 0; private requestIdCounter = 0; private isClosing = false; // Metrics private metrics = { connectionsCreated: 0, connectionsDestroyed: 0, connectionsAcquired: 0, connectionsReleased: 0, acquireTimeouts: 0, validationFailures: 0, queueHighWaterMark: 0, }; constructor(options: IConnectionPoolOptions) { super(); this.options = { minSize: 0, maxSize: 10, acquireTimeout: 30000, idleTimeout: 300000, // 5 minutes maxUseCount: Infinity, validateOnAcquire: true, validateOnReturn: false, queueTimeout: 60000, ...options, }; // Initialize priority queue (higher priority = extracted first) this.waitQueue = new BinaryHeap>( (a, b) => b.priority - a.priority || a.timestamp - b.timestamp, (item) => item.id ); // Start maintenance cycle this.startMaintenance(); // Initialize minimum connections this.initializeMinConnections(); } /** * Initialize minimum number of connections */ private async initializeMinConnections(): Promise { const promises: Promise[] = []; for (let i = 0; i < this.options.minSize; i++) { promises.push( this.createConnection() .then(conn => { this.availableConnections.push(conn); }) .catch(err => { if (this.options.onConnectionError) { this.options.onConnectionError(err); } }) ); } await Promise.all(promises); } /** * Start maintenance timer for idle connection cleanup */ private startMaintenance(): void { this.setInterval(() => { this.performMaintenance(); }, 30000); // Every 30 seconds } /** * Perform maintenance tasks */ private async performMaintenance(): Promise { await this.mutex.runExclusive(async () => { const now = Date.now(); const toRemove: IPooledConnection[] = []; // Check for idle connections beyond minimum size for (let i = this.availableConnections.length - 1; i >= 0; i--) { const conn = this.availableConnections[i]; // Keep minimum connections if (this.availableConnections.length <= this.options.minSize) { break; } // Remove idle connections if (now - conn.lastUsedAt > this.options.idleTimeout) { toRemove.push(conn); this.availableConnections.splice(i, 1); } } // Destroy idle connections for (const conn of toRemove) { await this.destroyConnection(conn); } }); } /** * Acquire a connection from the pool */ public async acquire(priority: number = 0, timeout?: number): Promise> { if (this.isClosing) { throw new Error('Connection pool is closing'); } return this.mutex.runExclusive(async () => { // Try to get an available connection const connection = await this.tryAcquireConnection(); if (connection) { return connection; } // Check if we can create a new connection const totalConnections = this.availableConnections.length + this.activeConnections.size; if (totalConnections < this.options.maxSize) { try { const newConnection = await this.createConnection(); return this.checkoutConnection(newConnection); } catch (err) { // Fall through to queue if creation fails } } // Add to wait queue return this.queueAcquireRequest(priority, timeout); }); } /** * Try to acquire an available connection */ private async tryAcquireConnection(): Promise | null> { while (this.availableConnections.length > 0) { const connection = this.availableConnections.shift()!; // Check if connection exceeded max use count if (connection.useCount >= this.options.maxUseCount) { await this.destroyConnection(connection); continue; } // Validate connection if required if (this.options.validateOnAcquire && this.options.connectionValidator) { try { const isValid = await this.options.connectionValidator(connection.connection); if (!isValid) { this.metrics.validationFailures++; await this.destroyConnection(connection); continue; } } catch (err) { this.metrics.validationFailures++; await this.destroyConnection(connection); continue; } } return this.checkoutConnection(connection); } return null; } /** * Checkout a connection for use */ private checkoutConnection(connection: IPooledConnection): IPooledConnection { connection.inUse = true; connection.lastUsedAt = Date.now(); connection.useCount++; this.activeConnections.set(connection.id, connection); this.metrics.connectionsAcquired++; this.eventEmitter.emit('acquire', connection); return connection; } /** * Queue an acquire request */ private queueAcquireRequest(priority: number, timeout?: number): Promise> { return new Promise>((resolve, reject) => { const request: IAcquireRequest = { id: `req-${this.requestIdCounter++}`, priority, timestamp: Date.now(), resolve, reject, }; // Set timeout const timeoutMs = timeout || this.options.queueTimeout; request.timeoutHandle = this.setTimeout(() => { if (this.waitQueue.extractByKey(request.id)) { this.metrics.acquireTimeouts++; reject(new Error(`Connection acquire timeout after ${timeoutMs}ms`)); } }, timeoutMs); this.waitQueue.insert(request); this.metrics.queueHighWaterMark = Math.max( this.metrics.queueHighWaterMark, this.waitQueue.size ); this.eventEmitter.emit('enqueue', { queueSize: this.waitQueue.size }); }); } /** * Release a connection back to the pool */ public async release(connection: IPooledConnection): Promise { return this.mutex.runExclusive(async () => { if (!connection.inUse || !this.activeConnections.has(connection.id)) { throw new Error('Connection is not active'); } this.activeConnections.delete(connection.id); connection.inUse = false; connection.lastUsedAt = Date.now(); this.metrics.connectionsReleased++; // Check if connection should be destroyed if (connection.useCount >= this.options.maxUseCount) { await this.destroyConnection(connection); return; } // Validate on return if required if (this.options.validateOnReturn && this.options.connectionValidator) { try { const isValid = await this.options.connectionValidator(connection.connection); if (!isValid) { await this.destroyConnection(connection); return; } } catch (err) { await this.destroyConnection(connection); return; } } // Check if there are waiting requests const request = this.waitQueue.extract(); if (request) { this.clearTimeout(request.timeoutHandle!); request.resolve(this.checkoutConnection(connection)); this.eventEmitter.emit('dequeue', { queueSize: this.waitQueue.size }); } else { // Return to available pool this.availableConnections.push(connection); this.eventEmitter.emit('release', connection); } }); } /** * Create a new connection */ private async createConnection(): Promise> { const rawConnection = await this.options.connectionFactory(); const connection: IPooledConnection = { id: `conn-${this.connectionIdCounter++}`, connection: rawConnection, createdAt: Date.now(), lastUsedAt: Date.now(), useCount: 0, inUse: false, }; this.metrics.connectionsCreated++; this.eventEmitter.emit('create', connection); return connection; } /** * Destroy a connection */ private async destroyConnection(connection: IPooledConnection): Promise { try { if (this.options.connectionDestroyer) { await this.options.connectionDestroyer(connection.connection); } this.metrics.connectionsDestroyed++; this.eventEmitter.emit('destroy', connection); } catch (err) { if (this.options.onConnectionError) { this.options.onConnectionError(err as Error, connection.connection); } } } /** * Get current pool statistics */ public getStats() { return { available: this.availableConnections.length, active: this.activeConnections.size, waiting: this.waitQueue.size, total: this.availableConnections.length + this.activeConnections.size, ...this.metrics, }; } /** * Subscribe to pool events */ public on(event: string, listener: Function): void { this.addEventListener(this.eventEmitter, event, listener); } /** * Close the pool and cleanup resources */ protected async onCleanup(): Promise { this.isClosing = true; // Clear the wait queue while (!this.waitQueue.isEmpty()) { const request = this.waitQueue.extract(); if (request) { this.clearTimeout(request.timeoutHandle!); request.reject(new Error('Connection pool is closing')); } } // Wait for active connections to be released (with timeout) const timeout = 30000; const startTime = Date.now(); while (this.activeConnections.size > 0 && Date.now() - startTime < timeout) { await new Promise(resolve => { const timer = setTimeout(resolve, 100); if (typeof timer.unref === 'function') { timer.unref(); } }); } // Destroy all connections const allConnections = [ ...this.availableConnections, ...this.activeConnections.values(), ]; await Promise.all(allConnections.map(conn => this.destroyConnection(conn))); this.availableConnections.length = 0; this.activeConnections.clear(); } }