289 lines
8.5 KiB
TypeScript
289 lines
8.5 KiB
TypeScript
/**
|
|
* SMTP Client Connection Manager
|
|
* Connection pooling and lifecycle management
|
|
*/
|
|
|
|
import * as net from 'node:net';
|
|
import * as tls from 'node:tls';
|
|
import { EventEmitter } from 'node:events';
|
|
import { DEFAULTS, CONNECTION_STATES } from './constants.js';
|
|
import type {
|
|
ISmtpClientOptions,
|
|
ISmtpConnection,
|
|
IConnectionPoolStatus,
|
|
ConnectionState
|
|
} from './interfaces.js';
|
|
import { logConnection, logDebug } from './utils/logging.js';
|
|
import { generateConnectionId } from './utils/helpers.js';
|
|
|
|
export class ConnectionManager extends EventEmitter {
|
|
private options: ISmtpClientOptions;
|
|
private connections: Map<string, ISmtpConnection> = new Map();
|
|
private pendingConnections: Set<string> = new Set();
|
|
private idleTimeout: NodeJS.Timeout | null = null;
|
|
|
|
constructor(options: ISmtpClientOptions) {
|
|
super();
|
|
this.options = options;
|
|
this.setupIdleCleanup();
|
|
}
|
|
|
|
/**
|
|
* Get or create a connection
|
|
*/
|
|
public async getConnection(): Promise<ISmtpConnection> {
|
|
// Try to reuse an idle connection if pooling is enabled
|
|
if (this.options.pool) {
|
|
const idleConnection = this.findIdleConnection();
|
|
if (idleConnection) {
|
|
const connectionId = this.getConnectionId(idleConnection) || 'unknown';
|
|
logDebug('Reusing idle connection', this.options, { connectionId });
|
|
return idleConnection;
|
|
}
|
|
|
|
// Check if we can create a new connection
|
|
if (this.getActiveConnectionCount() >= (this.options.maxConnections || DEFAULTS.MAX_CONNECTIONS)) {
|
|
throw new Error('Maximum number of connections reached');
|
|
}
|
|
}
|
|
|
|
return this.createConnection();
|
|
}
|
|
|
|
/**
|
|
* Create a new connection
|
|
*/
|
|
public async createConnection(): Promise<ISmtpConnection> {
|
|
const connectionId = generateConnectionId();
|
|
|
|
try {
|
|
this.pendingConnections.add(connectionId);
|
|
logConnection('connecting', this.options, { connectionId });
|
|
|
|
const socket = await this.establishSocket();
|
|
const connection: ISmtpConnection = {
|
|
socket,
|
|
state: CONNECTION_STATES.CONNECTED as ConnectionState,
|
|
options: this.options,
|
|
secure: this.options.secure || false,
|
|
createdAt: new Date(),
|
|
lastActivity: new Date(),
|
|
messageCount: 0
|
|
};
|
|
|
|
this.setupSocketHandlers(socket, connectionId);
|
|
this.connections.set(connectionId, connection);
|
|
this.pendingConnections.delete(connectionId);
|
|
|
|
logConnection('connected', this.options, { connectionId });
|
|
this.emit('connection', connection);
|
|
|
|
return connection;
|
|
} catch (error) {
|
|
this.pendingConnections.delete(connectionId);
|
|
logConnection('error', this.options, { connectionId, error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Release a connection back to the pool or close it
|
|
*/
|
|
public releaseConnection(connection: ISmtpConnection): void {
|
|
const connectionId = this.getConnectionId(connection);
|
|
|
|
if (!connectionId || !this.connections.has(connectionId)) {
|
|
return;
|
|
}
|
|
|
|
if (this.options.pool && this.shouldReuseConnection(connection)) {
|
|
// Return to pool
|
|
connection.state = CONNECTION_STATES.READY as ConnectionState;
|
|
connection.lastActivity = new Date();
|
|
logDebug('Connection returned to pool', this.options, { connectionId });
|
|
} else {
|
|
// Close connection
|
|
this.closeConnection(connection);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close a specific connection
|
|
*/
|
|
public closeConnection(connection: ISmtpConnection): void {
|
|
const connectionId = this.getConnectionId(connection);
|
|
|
|
if (connectionId) {
|
|
this.connections.delete(connectionId);
|
|
}
|
|
|
|
connection.state = CONNECTION_STATES.CLOSING as ConnectionState;
|
|
|
|
try {
|
|
if (!connection.socket.destroyed) {
|
|
connection.socket.destroy();
|
|
}
|
|
} catch (error) {
|
|
logDebug('Error closing connection', this.options, { error });
|
|
}
|
|
|
|
logConnection('disconnected', this.options, { connectionId });
|
|
this.emit('disconnect', connection);
|
|
}
|
|
|
|
/**
|
|
* Close all connections
|
|
*/
|
|
public closeAllConnections(): void {
|
|
logDebug('Closing all connections', this.options);
|
|
|
|
for (const connection of this.connections.values()) {
|
|
this.closeConnection(connection);
|
|
}
|
|
|
|
this.connections.clear();
|
|
this.pendingConnections.clear();
|
|
|
|
if (this.idleTimeout) {
|
|
clearInterval(this.idleTimeout);
|
|
this.idleTimeout = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get connection pool status
|
|
*/
|
|
public getPoolStatus(): IConnectionPoolStatus {
|
|
const total = this.connections.size;
|
|
const active = Array.from(this.connections.values())
|
|
.filter(conn => conn.state === CONNECTION_STATES.BUSY).length;
|
|
const idle = total - active;
|
|
const pending = this.pendingConnections.size;
|
|
|
|
return { total, active, idle, pending };
|
|
}
|
|
|
|
/**
|
|
* Update connection activity timestamp
|
|
*/
|
|
public updateActivity(connection: ISmtpConnection): void {
|
|
connection.lastActivity = new Date();
|
|
}
|
|
|
|
private async establishSocket(): Promise<net.Socket | tls.TLSSocket> {
|
|
return new Promise((resolve, reject) => {
|
|
const timeout = this.options.connectionTimeout || DEFAULTS.CONNECTION_TIMEOUT;
|
|
let socket: net.Socket | tls.TLSSocket;
|
|
|
|
if (this.options.secure) {
|
|
// Direct TLS connection
|
|
socket = tls.connect({
|
|
host: this.options.host,
|
|
port: this.options.port,
|
|
...this.options.tls
|
|
});
|
|
} else {
|
|
// Plain connection
|
|
socket = new net.Socket();
|
|
socket.connect(this.options.port, this.options.host);
|
|
}
|
|
|
|
const timeoutHandler = setTimeout(() => {
|
|
socket.destroy();
|
|
reject(new Error(`Connection timeout after ${timeout}ms`));
|
|
}, timeout);
|
|
|
|
// For TLS connections, we need to wait for 'secureConnect' instead of 'connect'
|
|
const successEvent = this.options.secure ? 'secureConnect' : 'connect';
|
|
|
|
socket.once(successEvent, () => {
|
|
clearTimeout(timeoutHandler);
|
|
resolve(socket);
|
|
});
|
|
|
|
socket.once('error', (error) => {
|
|
clearTimeout(timeoutHandler);
|
|
reject(error);
|
|
});
|
|
});
|
|
}
|
|
|
|
private setupSocketHandlers(socket: net.Socket | tls.TLSSocket, connectionId: string): void {
|
|
const socketTimeout = this.options.socketTimeout || DEFAULTS.SOCKET_TIMEOUT;
|
|
|
|
socket.setTimeout(socketTimeout);
|
|
|
|
socket.on('timeout', () => {
|
|
logDebug('Socket timeout', this.options, { connectionId });
|
|
socket.destroy();
|
|
});
|
|
|
|
socket.on('error', (error) => {
|
|
logConnection('error', this.options, { connectionId, error });
|
|
this.connections.delete(connectionId);
|
|
});
|
|
|
|
socket.on('close', () => {
|
|
this.connections.delete(connectionId);
|
|
logDebug('Socket closed', this.options, { connectionId });
|
|
});
|
|
}
|
|
|
|
private findIdleConnection(): ISmtpConnection | null {
|
|
for (const connection of this.connections.values()) {
|
|
if (connection.state === CONNECTION_STATES.READY) {
|
|
return connection;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
private shouldReuseConnection(connection: ISmtpConnection): boolean {
|
|
const maxMessages = this.options.maxMessages || DEFAULTS.MAX_MESSAGES;
|
|
const maxAge = 300000; // 5 minutes
|
|
const age = Date.now() - connection.createdAt.getTime();
|
|
|
|
return connection.messageCount < maxMessages &&
|
|
age < maxAge &&
|
|
!connection.socket.destroyed;
|
|
}
|
|
|
|
private getActiveConnectionCount(): number {
|
|
return this.connections.size + this.pendingConnections.size;
|
|
}
|
|
|
|
private getConnectionId(connection: ISmtpConnection): string | null {
|
|
for (const [id, conn] of this.connections.entries()) {
|
|
if (conn === connection) {
|
|
return id;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
private setupIdleCleanup(): void {
|
|
if (!this.options.pool) {
|
|
return;
|
|
}
|
|
|
|
const cleanupInterval = DEFAULTS.POOL_IDLE_TIMEOUT;
|
|
|
|
this.idleTimeout = setInterval(() => {
|
|
const now = Date.now();
|
|
const connectionsToClose: ISmtpConnection[] = [];
|
|
|
|
for (const connection of this.connections.values()) {
|
|
const idleTime = now - connection.lastActivity.getTime();
|
|
|
|
if (connection.state === CONNECTION_STATES.READY && idleTime > cleanupInterval) {
|
|
connectionsToClose.push(connection);
|
|
}
|
|
}
|
|
|
|
for (const connection of connectionsToClose) {
|
|
logDebug('Closing idle connection', this.options);
|
|
this.closeConnection(connection);
|
|
}
|
|
}, cleanupInterval);
|
|
}
|
|
} |