initial
This commit is contained in:
		
							
								
								
									
										289
									
								
								ts/mail/delivery/smtpclient/connection-manager.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										289
									
								
								ts/mail/delivery/smtpclient/connection-manager.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,289 @@ | ||||
| /** | ||||
|  * SMTP Client Connection Manager | ||||
|  * Connection pooling and lifecycle management | ||||
|  */ | ||||
|  | ||||
| import * as net from 'node:net'; | ||||
| import * as tls from 'node:tls'; | ||||
| import { EventEmitter } from 'node:events'; | ||||
| import { DEFAULTS, CONNECTION_STATES } from './constants.ts'; | ||||
| import type {  | ||||
|   ISmtpClientOptions,  | ||||
|   ISmtpConnection,  | ||||
|   IConnectionPoolStatus, | ||||
|   ConnectionState  | ||||
| } from './interfaces.ts'; | ||||
| import { logConnection, logDebug } from './utils/logging.ts'; | ||||
| import { generateConnectionId } from './utils/helpers.ts'; | ||||
|  | ||||
| export class ConnectionManager extends EventEmitter { | ||||
|   private options: ISmtpClientOptions; | ||||
|   private connections: Map<string, ISmtpConnection> = new Map(); | ||||
|   private pendingConnections: Set<string> = new Set(); | ||||
|   private idleTimeout: NodeJS.Timeout | null = null; | ||||
|    | ||||
|   constructor(options: ISmtpClientOptions) { | ||||
|     super(); | ||||
|     this.options = options; | ||||
|     this.setupIdleCleanup(); | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Get or create a connection | ||||
|    */ | ||||
|   public async getConnection(): Promise<ISmtpConnection> { | ||||
|     // Try to reuse an idle connection if pooling is enabled | ||||
|     if (this.options.pool) { | ||||
|       const idleConnection = this.findIdleConnection(); | ||||
|       if (idleConnection) { | ||||
|         const connectionId = this.getConnectionId(idleConnection) || 'unknown'; | ||||
|         logDebug('Reusing idle connection', this.options, { connectionId }); | ||||
|         return idleConnection; | ||||
|       } | ||||
|        | ||||
|       // Check if we can create a new connection | ||||
|       if (this.getActiveConnectionCount() >= (this.options.maxConnections || DEFAULTS.MAX_CONNECTIONS)) { | ||||
|         throw new Error('Maximum number of connections reached'); | ||||
|       } | ||||
|     } | ||||
|      | ||||
|     return this.createConnection(); | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Create a new connection | ||||
|    */ | ||||
|   public async createConnection(): Promise<ISmtpConnection> { | ||||
|     const connectionId = generateConnectionId(); | ||||
|      | ||||
|     try { | ||||
|       this.pendingConnections.add(connectionId); | ||||
|       logConnection('connecting', this.options, { connectionId }); | ||||
|        | ||||
|       const socket = await this.establishSocket(); | ||||
|       const connection: ISmtpConnection = { | ||||
|         socket, | ||||
|         state: CONNECTION_STATES.CONNECTED as ConnectionState, | ||||
|         options: this.options, | ||||
|         secure: this.options.secure || false, | ||||
|         createdAt: new Date(), | ||||
|         lastActivity: new Date(), | ||||
|         messageCount: 0 | ||||
|       }; | ||||
|        | ||||
|       this.setupSocketHandlers(socket, connectionId); | ||||
|       this.connections.set(connectionId, connection); | ||||
|       this.pendingConnections.delete(connectionId); | ||||
|        | ||||
|       logConnection('connected', this.options, { connectionId }); | ||||
|       this.emit('connection', connection); | ||||
|        | ||||
|       return connection; | ||||
|     } catch (error) { | ||||
|       this.pendingConnections.delete(connectionId); | ||||
|       logConnection('error', this.options, { connectionId, error }); | ||||
|       throw error; | ||||
|     } | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Release a connection back to the pool or close it | ||||
|    */ | ||||
|   public releaseConnection(connection: ISmtpConnection): void { | ||||
|     const connectionId = this.getConnectionId(connection); | ||||
|      | ||||
|     if (!connectionId || !this.connections.has(connectionId)) { | ||||
|       return; | ||||
|     } | ||||
|      | ||||
|     if (this.options.pool && this.shouldReuseConnection(connection)) { | ||||
|       // Return to pool | ||||
|       connection.state = CONNECTION_STATES.READY as ConnectionState; | ||||
|       connection.lastActivity = new Date(); | ||||
|       logDebug('Connection returned to pool', this.options, { connectionId }); | ||||
|     } else { | ||||
|       // Close connection | ||||
|       this.closeConnection(connection); | ||||
|     } | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Close a specific connection | ||||
|    */ | ||||
|   public closeConnection(connection: ISmtpConnection): void { | ||||
|     const connectionId = this.getConnectionId(connection); | ||||
|      | ||||
|     if (connectionId) { | ||||
|       this.connections.delete(connectionId); | ||||
|     } | ||||
|      | ||||
|     connection.state = CONNECTION_STATES.CLOSING as ConnectionState; | ||||
|      | ||||
|     try { | ||||
|       if (!connection.socket.destroyed) { | ||||
|         connection.socket.destroy(); | ||||
|       } | ||||
|     } catch (error) { | ||||
|       logDebug('Error closing connection', this.options, { error }); | ||||
|     } | ||||
|      | ||||
|     logConnection('disconnected', this.options, { connectionId }); | ||||
|     this.emit('disconnect', connection); | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Close all connections | ||||
|    */ | ||||
|   public closeAllConnections(): void { | ||||
|     logDebug('Closing all connections', this.options); | ||||
|      | ||||
|     for (const connection of this.connections.values()) { | ||||
|       this.closeConnection(connection); | ||||
|     } | ||||
|      | ||||
|     this.connections.clear(); | ||||
|     this.pendingConnections.clear(); | ||||
|      | ||||
|     if (this.idleTimeout) { | ||||
|       clearInterval(this.idleTimeout); | ||||
|       this.idleTimeout = null; | ||||
|     } | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Get connection pool status | ||||
|    */ | ||||
|   public getPoolStatus(): IConnectionPoolStatus { | ||||
|     const total = this.connections.size; | ||||
|     const active = Array.from(this.connections.values()) | ||||
|       .filter(conn => conn.state === CONNECTION_STATES.BUSY).length; | ||||
|     const idle = total - active; | ||||
|     const pending = this.pendingConnections.size; | ||||
|      | ||||
|     return { total, active, idle, pending }; | ||||
|   } | ||||
|    | ||||
|   /** | ||||
|    * Update connection activity timestamp | ||||
|    */ | ||||
|   public updateActivity(connection: ISmtpConnection): void { | ||||
|     connection.lastActivity = new Date(); | ||||
|   } | ||||
|    | ||||
|   private async establishSocket(): Promise<net.Socket | tls.TLSSocket> { | ||||
|     return new Promise((resolve, reject) => { | ||||
|       const timeout = this.options.connectionTimeout || DEFAULTS.CONNECTION_TIMEOUT; | ||||
|       let socket: net.Socket | tls.TLSSocket; | ||||
|        | ||||
|       if (this.options.secure) { | ||||
|         // Direct TLS connection | ||||
|         socket = tls.connect({ | ||||
|           host: this.options.host, | ||||
|           port: this.options.port, | ||||
|           ...this.options.tls | ||||
|         }); | ||||
|       } else { | ||||
|         // Plain connection | ||||
|         socket = new net.Socket(); | ||||
|         socket.connect(this.options.port, this.options.host); | ||||
|       } | ||||
|        | ||||
|       const timeoutHandler = setTimeout(() => { | ||||
|         socket.destroy(); | ||||
|         reject(new Error(`Connection timeout after ${timeout}ms`)); | ||||
|       }, timeout); | ||||
|        | ||||
|       // For TLS connections, we need to wait for 'secureConnect' instead of 'connect' | ||||
|       const successEvent = this.options.secure ? 'secureConnect' : 'connect'; | ||||
|        | ||||
|       socket.once(successEvent, () => { | ||||
|         clearTimeout(timeoutHandler); | ||||
|         resolve(socket); | ||||
|       }); | ||||
|        | ||||
|       socket.once('error', (error) => { | ||||
|         clearTimeout(timeoutHandler); | ||||
|         reject(error); | ||||
|       }); | ||||
|     }); | ||||
|   } | ||||
|    | ||||
|   private setupSocketHandlers(socket: net.Socket | tls.TLSSocket, connectionId: string): void { | ||||
|     const socketTimeout = this.options.socketTimeout || DEFAULTS.SOCKET_TIMEOUT; | ||||
|      | ||||
|     socket.setTimeout(socketTimeout); | ||||
|      | ||||
|     socket.on('timeout', () => { | ||||
|       logDebug('Socket timeout', this.options, { connectionId }); | ||||
|       socket.destroy(); | ||||
|     }); | ||||
|      | ||||
|     socket.on('error', (error) => { | ||||
|       logConnection('error', this.options, { connectionId, error }); | ||||
|       this.connections.delete(connectionId); | ||||
|     }); | ||||
|      | ||||
|     socket.on('close', () => { | ||||
|       this.connections.delete(connectionId); | ||||
|       logDebug('Socket closed', this.options, { connectionId }); | ||||
|     }); | ||||
|   } | ||||
|    | ||||
|   private findIdleConnection(): ISmtpConnection | null { | ||||
|     for (const connection of this.connections.values()) { | ||||
|       if (connection.state === CONNECTION_STATES.READY) { | ||||
|         return connection; | ||||
|       } | ||||
|     } | ||||
|     return null; | ||||
|   } | ||||
|    | ||||
|   private shouldReuseConnection(connection: ISmtpConnection): boolean { | ||||
|     const maxMessages = this.options.maxMessages || DEFAULTS.MAX_MESSAGES; | ||||
|     const maxAge = 300000; // 5 minutes | ||||
|     const age = Date.now() - connection.createdAt.getTime(); | ||||
|      | ||||
|     return connection.messageCount < maxMessages &&  | ||||
|            age < maxAge &&  | ||||
|            !connection.socket.destroyed; | ||||
|   } | ||||
|    | ||||
|   private getActiveConnectionCount(): number { | ||||
|     return this.connections.size + this.pendingConnections.size; | ||||
|   } | ||||
|    | ||||
|   private getConnectionId(connection: ISmtpConnection): string | null { | ||||
|     for (const [id, conn] of this.connections.entries()) { | ||||
|       if (conn === connection) { | ||||
|         return id; | ||||
|       } | ||||
|     } | ||||
|     return null; | ||||
|   } | ||||
|    | ||||
|   private setupIdleCleanup(): void { | ||||
|     if (!this.options.pool) { | ||||
|       return; | ||||
|     } | ||||
|      | ||||
|     const cleanupInterval = DEFAULTS.POOL_IDLE_TIMEOUT; | ||||
|      | ||||
|     this.idleTimeout = setInterval(() => { | ||||
|       const now = Date.now(); | ||||
|       const connectionsToClose: ISmtpConnection[] = []; | ||||
|        | ||||
|       for (const connection of this.connections.values()) { | ||||
|         const idleTime = now - connection.lastActivity.getTime(); | ||||
|          | ||||
|         if (connection.state === CONNECTION_STATES.READY && idleTime > cleanupInterval) { | ||||
|           connectionsToClose.push(connection); | ||||
|         } | ||||
|       } | ||||
|        | ||||
|       for (const connection of connectionsToClose) { | ||||
|         logDebug('Closing idle connection', this.options); | ||||
|         this.closeConnection(connection); | ||||
|       } | ||||
|     }, cleanupInterval); | ||||
|   } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user