update
This commit is contained in:
286
ts/mail/delivery/smtpclient/connection-manager.ts
Normal file
286
ts/mail/delivery/smtpclient/connection-manager.ts
Normal file
@ -0,0 +1,286 @@
|
||||
/**
|
||||
* SMTP Client Connection Manager
|
||||
* Connection pooling and lifecycle management
|
||||
*/
|
||||
|
||||
import * as net from 'node:net';
|
||||
import * as tls from 'node:tls';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import { DEFAULTS, CONNECTION_STATES } from './constants.js';
|
||||
import type {
|
||||
ISmtpClientOptions,
|
||||
ISmtpConnection,
|
||||
IConnectionPoolStatus,
|
||||
ConnectionState
|
||||
} from './interfaces.js';
|
||||
import { logConnection, logDebug } from './utils/logging.js';
|
||||
import { generateConnectionId } from './utils/helpers.js';
|
||||
|
||||
export class ConnectionManager extends EventEmitter {
|
||||
private options: ISmtpClientOptions;
|
||||
private connections: Map<string, ISmtpConnection> = new Map();
|
||||
private pendingConnections: Set<string> = new Set();
|
||||
private idleTimeout: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(options: ISmtpClientOptions) {
|
||||
super();
|
||||
this.options = options;
|
||||
this.setupIdleCleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a connection
|
||||
*/
|
||||
public async getConnection(): Promise<ISmtpConnection> {
|
||||
// Try to reuse an idle connection if pooling is enabled
|
||||
if (this.options.pool) {
|
||||
const idleConnection = this.findIdleConnection();
|
||||
if (idleConnection) {
|
||||
const connectionId = this.getConnectionId(idleConnection) || 'unknown';
|
||||
logDebug('Reusing idle connection', this.options, { connectionId });
|
||||
return idleConnection;
|
||||
}
|
||||
|
||||
// Check if we can create a new connection
|
||||
if (this.getActiveConnectionCount() >= (this.options.maxConnections || DEFAULTS.MAX_CONNECTIONS)) {
|
||||
throw new Error('Maximum number of connections reached');
|
||||
}
|
||||
}
|
||||
|
||||
return this.createConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new connection
|
||||
*/
|
||||
public async createConnection(): Promise<ISmtpConnection> {
|
||||
const connectionId = generateConnectionId();
|
||||
|
||||
try {
|
||||
this.pendingConnections.add(connectionId);
|
||||
logConnection('connecting', this.options, { connectionId });
|
||||
|
||||
const socket = await this.establishSocket();
|
||||
const connection: ISmtpConnection = {
|
||||
socket,
|
||||
state: CONNECTION_STATES.CONNECTED as ConnectionState,
|
||||
options: this.options,
|
||||
secure: this.options.secure || false,
|
||||
createdAt: new Date(),
|
||||
lastActivity: new Date(),
|
||||
messageCount: 0
|
||||
};
|
||||
|
||||
this.setupSocketHandlers(socket, connectionId);
|
||||
this.connections.set(connectionId, connection);
|
||||
this.pendingConnections.delete(connectionId);
|
||||
|
||||
logConnection('connected', this.options, { connectionId });
|
||||
this.emit('connection', connection);
|
||||
|
||||
return connection;
|
||||
} catch (error) {
|
||||
this.pendingConnections.delete(connectionId);
|
||||
logConnection('error', this.options, { connectionId, error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a connection back to the pool or close it
|
||||
*/
|
||||
public releaseConnection(connection: ISmtpConnection): void {
|
||||
const connectionId = this.getConnectionId(connection);
|
||||
|
||||
if (!connectionId || !this.connections.has(connectionId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.options.pool && this.shouldReuseConnection(connection)) {
|
||||
// Return to pool
|
||||
connection.state = CONNECTION_STATES.READY as ConnectionState;
|
||||
connection.lastActivity = new Date();
|
||||
logDebug('Connection returned to pool', this.options, { connectionId });
|
||||
} else {
|
||||
// Close connection
|
||||
this.closeConnection(connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a specific connection
|
||||
*/
|
||||
public closeConnection(connection: ISmtpConnection): void {
|
||||
const connectionId = this.getConnectionId(connection);
|
||||
|
||||
if (connectionId) {
|
||||
this.connections.delete(connectionId);
|
||||
}
|
||||
|
||||
connection.state = CONNECTION_STATES.CLOSING as ConnectionState;
|
||||
|
||||
try {
|
||||
if (!connection.socket.destroyed) {
|
||||
connection.socket.destroy();
|
||||
}
|
||||
} catch (error) {
|
||||
logDebug('Error closing connection', this.options, { error });
|
||||
}
|
||||
|
||||
logConnection('disconnected', this.options, { connectionId });
|
||||
this.emit('disconnect', connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections
|
||||
*/
|
||||
public closeAllConnections(): void {
|
||||
logDebug('Closing all connections', this.options);
|
||||
|
||||
for (const connection of this.connections.values()) {
|
||||
this.closeConnection(connection);
|
||||
}
|
||||
|
||||
this.connections.clear();
|
||||
this.pendingConnections.clear();
|
||||
|
||||
if (this.idleTimeout) {
|
||||
clearInterval(this.idleTimeout);
|
||||
this.idleTimeout = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection pool status
|
||||
*/
|
||||
public getPoolStatus(): IConnectionPoolStatus {
|
||||
const total = this.connections.size;
|
||||
const active = Array.from(this.connections.values())
|
||||
.filter(conn => conn.state === CONNECTION_STATES.BUSY).length;
|
||||
const idle = total - active;
|
||||
const pending = this.pendingConnections.size;
|
||||
|
||||
return { total, active, idle, pending };
|
||||
}
|
||||
|
||||
/**
|
||||
* Update connection activity timestamp
|
||||
*/
|
||||
public updateActivity(connection: ISmtpConnection): void {
|
||||
connection.lastActivity = new Date();
|
||||
}
|
||||
|
||||
private async establishSocket(): Promise<net.Socket | tls.TLSSocket> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = this.options.connectionTimeout || DEFAULTS.CONNECTION_TIMEOUT;
|
||||
let socket: net.Socket | tls.TLSSocket;
|
||||
|
||||
if (this.options.secure) {
|
||||
// Direct TLS connection
|
||||
socket = tls.connect({
|
||||
host: this.options.host,
|
||||
port: this.options.port,
|
||||
...this.options.tls
|
||||
});
|
||||
} else {
|
||||
// Plain connection
|
||||
socket = new net.Socket();
|
||||
socket.connect(this.options.port, this.options.host);
|
||||
}
|
||||
|
||||
const timeoutHandler = setTimeout(() => {
|
||||
socket.destroy();
|
||||
reject(new Error(`Connection timeout after ${timeout}ms`));
|
||||
}, timeout);
|
||||
|
||||
socket.once('connect', () => {
|
||||
clearTimeout(timeoutHandler);
|
||||
resolve(socket);
|
||||
});
|
||||
|
||||
socket.once('error', (error) => {
|
||||
clearTimeout(timeoutHandler);
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private setupSocketHandlers(socket: net.Socket | tls.TLSSocket, connectionId: string): void {
|
||||
const socketTimeout = this.options.socketTimeout || DEFAULTS.SOCKET_TIMEOUT;
|
||||
|
||||
socket.setTimeout(socketTimeout);
|
||||
|
||||
socket.on('timeout', () => {
|
||||
logDebug('Socket timeout', this.options, { connectionId });
|
||||
socket.destroy();
|
||||
});
|
||||
|
||||
socket.on('error', (error) => {
|
||||
logConnection('error', this.options, { connectionId, error });
|
||||
this.connections.delete(connectionId);
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
this.connections.delete(connectionId);
|
||||
logDebug('Socket closed', this.options, { connectionId });
|
||||
});
|
||||
}
|
||||
|
||||
private findIdleConnection(): ISmtpConnection | null {
|
||||
for (const connection of this.connections.values()) {
|
||||
if (connection.state === CONNECTION_STATES.READY) {
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private shouldReuseConnection(connection: ISmtpConnection): boolean {
|
||||
const maxMessages = this.options.maxMessages || DEFAULTS.MAX_MESSAGES;
|
||||
const maxAge = 300000; // 5 minutes
|
||||
const age = Date.now() - connection.createdAt.getTime();
|
||||
|
||||
return connection.messageCount < maxMessages &&
|
||||
age < maxAge &&
|
||||
!connection.socket.destroyed;
|
||||
}
|
||||
|
||||
private getActiveConnectionCount(): number {
|
||||
return this.connections.size + this.pendingConnections.size;
|
||||
}
|
||||
|
||||
private getConnectionId(connection: ISmtpConnection): string | null {
|
||||
for (const [id, conn] of this.connections.entries()) {
|
||||
if (conn === connection) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private setupIdleCleanup(): void {
|
||||
if (!this.options.pool) {
|
||||
return;
|
||||
}
|
||||
|
||||
const cleanupInterval = DEFAULTS.POOL_IDLE_TIMEOUT;
|
||||
|
||||
this.idleTimeout = setInterval(() => {
|
||||
const now = Date.now();
|
||||
const connectionsToClose: ISmtpConnection[] = [];
|
||||
|
||||
for (const connection of this.connections.values()) {
|
||||
const idleTime = now - connection.lastActivity.getTime();
|
||||
|
||||
if (connection.state === CONNECTION_STATES.READY && idleTime > cleanupInterval) {
|
||||
connectionsToClose.push(connection);
|
||||
}
|
||||
}
|
||||
|
||||
for (const connection of connectionsToClose) {
|
||||
logDebug('Closing idle connection', this.options);
|
||||
this.closeConnection(connection);
|
||||
}
|
||||
}, cleanupInterval);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user