- Deleted event-utils.ts which contained deprecated Port80Handler and its subscribers. - Updated index.ts to remove the export of event-utils. - Refactored ConnectionManager to extend LifecycleComponent for better resource management. - Added BinaryHeap implementation for efficient priority queue operations. - Introduced EnhancedConnectionPool for managing pooled connections with lifecycle management. - Implemented LifecycleComponent to manage timers and event listeners automatically. - Added comprehensive tests for BinaryHeap and LifecycleComponent to ensure functionality.
420 lines
12 KiB
TypeScript
420 lines
12 KiB
TypeScript
import { LifecycleComponent } from './lifecycle-component.js';
|
|
import { BinaryHeap } from './binary-heap.js';
|
|
import { AsyncMutex } from './async-utils.js';
|
|
import { EventEmitter } from 'events';
|
|
|
|
/**
|
|
* Interface for pooled connection
|
|
*/
|
|
export interface IPooledConnection<T> {
|
|
id: string;
|
|
connection: T;
|
|
createdAt: number;
|
|
lastUsedAt: number;
|
|
useCount: number;
|
|
inUse: boolean;
|
|
metadata?: any;
|
|
}
|
|
|
|
/**
|
|
* Configuration options for the connection pool
|
|
*/
|
|
export interface IConnectionPoolOptions<T> {
|
|
minSize?: number;
|
|
maxSize?: number;
|
|
acquireTimeout?: number;
|
|
idleTimeout?: number;
|
|
maxUseCount?: number;
|
|
validateOnAcquire?: boolean;
|
|
validateOnReturn?: boolean;
|
|
queueTimeout?: number;
|
|
connectionFactory: () => Promise<T>;
|
|
connectionValidator?: (connection: T) => Promise<boolean>;
|
|
connectionDestroyer?: (connection: T) => Promise<void>;
|
|
onConnectionError?: (error: Error, connection?: T) => void;
|
|
}
|
|
|
|
/**
|
|
* Interface for queued acquire request
|
|
*/
|
|
interface IAcquireRequest<T> {
|
|
id: string;
|
|
priority: number;
|
|
timestamp: number;
|
|
resolve: (connection: IPooledConnection<T>) => void;
|
|
reject: (error: Error) => void;
|
|
timeoutHandle?: NodeJS.Timeout;
|
|
}
|
|
|
|
/**
|
|
* Enhanced connection pool with priority queue, backpressure, and lifecycle management
|
|
*/
|
|
export class EnhancedConnectionPool<T> extends LifecycleComponent {
|
|
private readonly options: Required<Omit<IConnectionPoolOptions<T>, 'connectionValidator' | 'connectionDestroyer' | 'onConnectionError'>> & Pick<IConnectionPoolOptions<T>, 'connectionValidator' | 'connectionDestroyer' | 'onConnectionError'>;
|
|
private readonly availableConnections: IPooledConnection<T>[] = [];
|
|
private readonly activeConnections: Map<string, IPooledConnection<T>> = new Map();
|
|
private readonly waitQueue: BinaryHeap<IAcquireRequest<T>>;
|
|
private readonly mutex = new AsyncMutex();
|
|
private readonly eventEmitter = new EventEmitter();
|
|
|
|
private connectionIdCounter = 0;
|
|
private requestIdCounter = 0;
|
|
private isClosing = false;
|
|
|
|
// Metrics
|
|
private metrics = {
|
|
connectionsCreated: 0,
|
|
connectionsDestroyed: 0,
|
|
connectionsAcquired: 0,
|
|
connectionsReleased: 0,
|
|
acquireTimeouts: 0,
|
|
validationFailures: 0,
|
|
queueHighWaterMark: 0,
|
|
};
|
|
|
|
constructor(options: IConnectionPoolOptions<T>) {
|
|
super();
|
|
|
|
this.options = {
|
|
minSize: 0,
|
|
maxSize: 10,
|
|
acquireTimeout: 30000,
|
|
idleTimeout: 300000, // 5 minutes
|
|
maxUseCount: Infinity,
|
|
validateOnAcquire: true,
|
|
validateOnReturn: false,
|
|
queueTimeout: 60000,
|
|
...options,
|
|
};
|
|
|
|
// Initialize priority queue (higher priority = extracted first)
|
|
this.waitQueue = new BinaryHeap<IAcquireRequest<T>>(
|
|
(a, b) => b.priority - a.priority || a.timestamp - b.timestamp,
|
|
(item) => item.id
|
|
);
|
|
|
|
// Start maintenance cycle
|
|
this.startMaintenance();
|
|
|
|
// Initialize minimum connections
|
|
this.initializeMinConnections();
|
|
}
|
|
|
|
/**
|
|
* Initialize minimum number of connections
|
|
*/
|
|
private async initializeMinConnections(): Promise<void> {
|
|
const promises: Promise<void>[] = [];
|
|
|
|
for (let i = 0; i < this.options.minSize; i++) {
|
|
promises.push(
|
|
this.createConnection()
|
|
.then(conn => {
|
|
this.availableConnections.push(conn);
|
|
})
|
|
.catch(err => {
|
|
if (this.options.onConnectionError) {
|
|
this.options.onConnectionError(err);
|
|
}
|
|
})
|
|
);
|
|
}
|
|
|
|
await Promise.all(promises);
|
|
}
|
|
|
|
/**
|
|
* Start maintenance timer for idle connection cleanup
|
|
*/
|
|
private startMaintenance(): void {
|
|
this.setInterval(() => {
|
|
this.performMaintenance();
|
|
}, 30000); // Every 30 seconds
|
|
}
|
|
|
|
/**
|
|
* Perform maintenance tasks
|
|
*/
|
|
private async performMaintenance(): Promise<void> {
|
|
await this.mutex.runExclusive(async () => {
|
|
const now = Date.now();
|
|
const toRemove: IPooledConnection<T>[] = [];
|
|
|
|
// Check for idle connections beyond minimum size
|
|
for (let i = this.availableConnections.length - 1; i >= 0; i--) {
|
|
const conn = this.availableConnections[i];
|
|
|
|
// Keep minimum connections
|
|
if (this.availableConnections.length <= this.options.minSize) {
|
|
break;
|
|
}
|
|
|
|
// Remove idle connections
|
|
if (now - conn.lastUsedAt > this.options.idleTimeout) {
|
|
toRemove.push(conn);
|
|
this.availableConnections.splice(i, 1);
|
|
}
|
|
}
|
|
|
|
// Destroy idle connections
|
|
for (const conn of toRemove) {
|
|
await this.destroyConnection(conn);
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Acquire a connection from the pool
|
|
*/
|
|
public async acquire(priority: number = 0, timeout?: number): Promise<IPooledConnection<T>> {
|
|
if (this.isClosing) {
|
|
throw new Error('Connection pool is closing');
|
|
}
|
|
|
|
return this.mutex.runExclusive(async () => {
|
|
// Try to get an available connection
|
|
const connection = await this.tryAcquireConnection();
|
|
if (connection) {
|
|
return connection;
|
|
}
|
|
|
|
// Check if we can create a new connection
|
|
const totalConnections = this.availableConnections.length + this.activeConnections.size;
|
|
if (totalConnections < this.options.maxSize) {
|
|
try {
|
|
const newConnection = await this.createConnection();
|
|
return this.checkoutConnection(newConnection);
|
|
} catch (err) {
|
|
// Fall through to queue if creation fails
|
|
}
|
|
}
|
|
|
|
// Add to wait queue
|
|
return this.queueAcquireRequest(priority, timeout);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Try to acquire an available connection
|
|
*/
|
|
private async tryAcquireConnection(): Promise<IPooledConnection<T> | null> {
|
|
while (this.availableConnections.length > 0) {
|
|
const connection = this.availableConnections.shift()!;
|
|
|
|
// Check if connection exceeded max use count
|
|
if (connection.useCount >= this.options.maxUseCount) {
|
|
await this.destroyConnection(connection);
|
|
continue;
|
|
}
|
|
|
|
// Validate connection if required
|
|
if (this.options.validateOnAcquire && this.options.connectionValidator) {
|
|
try {
|
|
const isValid = await this.options.connectionValidator(connection.connection);
|
|
if (!isValid) {
|
|
this.metrics.validationFailures++;
|
|
await this.destroyConnection(connection);
|
|
continue;
|
|
}
|
|
} catch (err) {
|
|
this.metrics.validationFailures++;
|
|
await this.destroyConnection(connection);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
return this.checkoutConnection(connection);
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Checkout a connection for use
|
|
*/
|
|
private checkoutConnection(connection: IPooledConnection<T>): IPooledConnection<T> {
|
|
connection.inUse = true;
|
|
connection.lastUsedAt = Date.now();
|
|
connection.useCount++;
|
|
|
|
this.activeConnections.set(connection.id, connection);
|
|
this.metrics.connectionsAcquired++;
|
|
|
|
this.eventEmitter.emit('acquire', connection);
|
|
return connection;
|
|
}
|
|
|
|
/**
|
|
* Queue an acquire request
|
|
*/
|
|
private queueAcquireRequest(priority: number, timeout?: number): Promise<IPooledConnection<T>> {
|
|
return new Promise<IPooledConnection<T>>((resolve, reject) => {
|
|
const request: IAcquireRequest<T> = {
|
|
id: `req-${this.requestIdCounter++}`,
|
|
priority,
|
|
timestamp: Date.now(),
|
|
resolve,
|
|
reject,
|
|
};
|
|
|
|
// Set timeout
|
|
const timeoutMs = timeout || this.options.queueTimeout;
|
|
request.timeoutHandle = this.setTimeout(() => {
|
|
if (this.waitQueue.extractByKey(request.id)) {
|
|
this.metrics.acquireTimeouts++;
|
|
reject(new Error(`Connection acquire timeout after ${timeoutMs}ms`));
|
|
}
|
|
}, timeoutMs);
|
|
|
|
this.waitQueue.insert(request);
|
|
this.metrics.queueHighWaterMark = Math.max(
|
|
this.metrics.queueHighWaterMark,
|
|
this.waitQueue.size
|
|
);
|
|
|
|
this.eventEmitter.emit('enqueue', { queueSize: this.waitQueue.size });
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Release a connection back to the pool
|
|
*/
|
|
public async release(connection: IPooledConnection<T>): Promise<void> {
|
|
return this.mutex.runExclusive(async () => {
|
|
if (!connection.inUse || !this.activeConnections.has(connection.id)) {
|
|
throw new Error('Connection is not active');
|
|
}
|
|
|
|
this.activeConnections.delete(connection.id);
|
|
connection.inUse = false;
|
|
connection.lastUsedAt = Date.now();
|
|
this.metrics.connectionsReleased++;
|
|
|
|
// Check if connection should be destroyed
|
|
if (connection.useCount >= this.options.maxUseCount) {
|
|
await this.destroyConnection(connection);
|
|
return;
|
|
}
|
|
|
|
// Validate on return if required
|
|
if (this.options.validateOnReturn && this.options.connectionValidator) {
|
|
try {
|
|
const isValid = await this.options.connectionValidator(connection.connection);
|
|
if (!isValid) {
|
|
await this.destroyConnection(connection);
|
|
return;
|
|
}
|
|
} catch (err) {
|
|
await this.destroyConnection(connection);
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Check if there are waiting requests
|
|
const request = this.waitQueue.extract();
|
|
if (request) {
|
|
this.clearTimeout(request.timeoutHandle!);
|
|
request.resolve(this.checkoutConnection(connection));
|
|
this.eventEmitter.emit('dequeue', { queueSize: this.waitQueue.size });
|
|
} else {
|
|
// Return to available pool
|
|
this.availableConnections.push(connection);
|
|
this.eventEmitter.emit('release', connection);
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a new connection
|
|
*/
|
|
private async createConnection(): Promise<IPooledConnection<T>> {
|
|
const rawConnection = await this.options.connectionFactory();
|
|
|
|
const connection: IPooledConnection<T> = {
|
|
id: `conn-${this.connectionIdCounter++}`,
|
|
connection: rawConnection,
|
|
createdAt: Date.now(),
|
|
lastUsedAt: Date.now(),
|
|
useCount: 0,
|
|
inUse: false,
|
|
};
|
|
|
|
this.metrics.connectionsCreated++;
|
|
this.eventEmitter.emit('create', connection);
|
|
|
|
return connection;
|
|
}
|
|
|
|
/**
|
|
* Destroy a connection
|
|
*/
|
|
private async destroyConnection(connection: IPooledConnection<T>): Promise<void> {
|
|
try {
|
|
if (this.options.connectionDestroyer) {
|
|
await this.options.connectionDestroyer(connection.connection);
|
|
}
|
|
|
|
this.metrics.connectionsDestroyed++;
|
|
this.eventEmitter.emit('destroy', connection);
|
|
} catch (err) {
|
|
if (this.options.onConnectionError) {
|
|
this.options.onConnectionError(err as Error, connection.connection);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get current pool statistics
|
|
*/
|
|
public getStats() {
|
|
return {
|
|
available: this.availableConnections.length,
|
|
active: this.activeConnections.size,
|
|
waiting: this.waitQueue.size,
|
|
total: this.availableConnections.length + this.activeConnections.size,
|
|
...this.metrics,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Subscribe to pool events
|
|
*/
|
|
public on(event: string, listener: Function): void {
|
|
this.addEventListener(this.eventEmitter, event, listener);
|
|
}
|
|
|
|
/**
|
|
* Close the pool and cleanup resources
|
|
*/
|
|
protected async onCleanup(): Promise<void> {
|
|
this.isClosing = true;
|
|
|
|
// Clear the wait queue
|
|
while (!this.waitQueue.isEmpty()) {
|
|
const request = this.waitQueue.extract();
|
|
if (request) {
|
|
this.clearTimeout(request.timeoutHandle!);
|
|
request.reject(new Error('Connection pool is closing'));
|
|
}
|
|
}
|
|
|
|
// Wait for active connections to be released (with timeout)
|
|
const timeout = 30000;
|
|
const startTime = Date.now();
|
|
|
|
while (this.activeConnections.size > 0 && Date.now() - startTime < timeout) {
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
}
|
|
|
|
// Destroy all connections
|
|
const allConnections = [
|
|
...this.availableConnections,
|
|
...this.activeConnections.values(),
|
|
];
|
|
|
|
await Promise.all(allConnections.map(conn => this.destroyConnection(conn)));
|
|
|
|
this.availableConnections.length = 0;
|
|
this.activeConnections.clear();
|
|
}
|
|
} |