289 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
		
		
			
		
	
	
			289 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
|  | /** | ||
|  |  * SMTP Client Connection Manager | ||
|  |  * Connection pooling and lifecycle management | ||
|  |  */ | ||
|  | 
 | ||
|  | import * as net from 'node:net'; | ||
|  | import * as tls from 'node:tls'; | ||
|  | import { EventEmitter } from 'node:events'; | ||
|  | import { DEFAULTS, CONNECTION_STATES } from './constants.ts'; | ||
|  | import type {  | ||
|  |   ISmtpClientOptions,  | ||
|  |   ISmtpConnection,  | ||
|  |   IConnectionPoolStatus, | ||
|  |   ConnectionState  | ||
|  | } from './interfaces.ts'; | ||
|  | import { logConnection, logDebug } from './utils/logging.ts'; | ||
|  | import { generateConnectionId } from './utils/helpers.ts'; | ||
|  | 
 | ||
|  | export class ConnectionManager extends EventEmitter { | ||
|  |   private options: ISmtpClientOptions; | ||
|  |   private connections: Map<string, ISmtpConnection> = new Map(); | ||
|  |   private pendingConnections: Set<string> = new Set(); | ||
|  |   private idleTimeout: NodeJS.Timeout | null = null; | ||
|  |    | ||
|  |   constructor(options: ISmtpClientOptions) { | ||
|  |     super(); | ||
|  |     this.options = options; | ||
|  |     this.setupIdleCleanup(); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Get or create a connection | ||
|  |    */ | ||
|  |   public async getConnection(): Promise<ISmtpConnection> { | ||
|  |     // Try to reuse an idle connection if pooling is enabled
 | ||
|  |     if (this.options.pool) { | ||
|  |       const idleConnection = this.findIdleConnection(); | ||
|  |       if (idleConnection) { | ||
|  |         const connectionId = this.getConnectionId(idleConnection) || 'unknown'; | ||
|  |         logDebug('Reusing idle connection', this.options, { connectionId }); | ||
|  |         return idleConnection; | ||
|  |       } | ||
|  |        | ||
|  |       // Check if we can create a new connection
 | ||
|  |       if (this.getActiveConnectionCount() >= (this.options.maxConnections || DEFAULTS.MAX_CONNECTIONS)) { | ||
|  |         throw new Error('Maximum number of connections reached'); | ||
|  |       } | ||
|  |     } | ||
|  |      | ||
|  |     return this.createConnection(); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Create a new connection | ||
|  |    */ | ||
|  |   public async createConnection(): Promise<ISmtpConnection> { | ||
|  |     const connectionId = generateConnectionId(); | ||
|  |      | ||
|  |     try { | ||
|  |       this.pendingConnections.add(connectionId); | ||
|  |       logConnection('connecting', this.options, { connectionId }); | ||
|  |        | ||
|  |       const socket = await this.establishSocket(); | ||
|  |       const connection: ISmtpConnection = { | ||
|  |         socket, | ||
|  |         state: CONNECTION_STATES.CONNECTED as ConnectionState, | ||
|  |         options: this.options, | ||
|  |         secure: this.options.secure || false, | ||
|  |         createdAt: new Date(), | ||
|  |         lastActivity: new Date(), | ||
|  |         messageCount: 0 | ||
|  |       }; | ||
|  |        | ||
|  |       this.setupSocketHandlers(socket, connectionId); | ||
|  |       this.connections.set(connectionId, connection); | ||
|  |       this.pendingConnections.delete(connectionId); | ||
|  |        | ||
|  |       logConnection('connected', this.options, { connectionId }); | ||
|  |       this.emit('connection', connection); | ||
|  |        | ||
|  |       return connection; | ||
|  |     } catch (error) { | ||
|  |       this.pendingConnections.delete(connectionId); | ||
|  |       logConnection('error', this.options, { connectionId, error }); | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Release a connection back to the pool or close it | ||
|  |    */ | ||
|  |   public releaseConnection(connection: ISmtpConnection): void { | ||
|  |     const connectionId = this.getConnectionId(connection); | ||
|  |      | ||
|  |     if (!connectionId || !this.connections.has(connectionId)) { | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     if (this.options.pool && this.shouldReuseConnection(connection)) { | ||
|  |       // Return to pool
 | ||
|  |       connection.state = CONNECTION_STATES.READY as ConnectionState; | ||
|  |       connection.lastActivity = new Date(); | ||
|  |       logDebug('Connection returned to pool', this.options, { connectionId }); | ||
|  |     } else { | ||
|  |       // Close connection
 | ||
|  |       this.closeConnection(connection); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Close a specific connection | ||
|  |    */ | ||
|  |   public closeConnection(connection: ISmtpConnection): void { | ||
|  |     const connectionId = this.getConnectionId(connection); | ||
|  |      | ||
|  |     if (connectionId) { | ||
|  |       this.connections.delete(connectionId); | ||
|  |     } | ||
|  |      | ||
|  |     connection.state = CONNECTION_STATES.CLOSING as ConnectionState; | ||
|  |      | ||
|  |     try { | ||
|  |       if (!connection.socket.destroyed) { | ||
|  |         connection.socket.destroy(); | ||
|  |       } | ||
|  |     } catch (error) { | ||
|  |       logDebug('Error closing connection', this.options, { error }); | ||
|  |     } | ||
|  |      | ||
|  |     logConnection('disconnected', this.options, { connectionId }); | ||
|  |     this.emit('disconnect', connection); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Close all connections | ||
|  |    */ | ||
|  |   public closeAllConnections(): void { | ||
|  |     logDebug('Closing all connections', this.options); | ||
|  |      | ||
|  |     for (const connection of this.connections.values()) { | ||
|  |       this.closeConnection(connection); | ||
|  |     } | ||
|  |      | ||
|  |     this.connections.clear(); | ||
|  |     this.pendingConnections.clear(); | ||
|  |      | ||
|  |     if (this.idleTimeout) { | ||
|  |       clearInterval(this.idleTimeout); | ||
|  |       this.idleTimeout = null; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Get connection pool status | ||
|  |    */ | ||
|  |   public getPoolStatus(): IConnectionPoolStatus { | ||
|  |     const total = this.connections.size; | ||
|  |     const active = Array.from(this.connections.values()) | ||
|  |       .filter(conn => conn.state === CONNECTION_STATES.BUSY).length; | ||
|  |     const idle = total - active; | ||
|  |     const pending = this.pendingConnections.size; | ||
|  |      | ||
|  |     return { total, active, idle, pending }; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Update connection activity timestamp | ||
|  |    */ | ||
|  |   public updateActivity(connection: ISmtpConnection): void { | ||
|  |     connection.lastActivity = new Date(); | ||
|  |   } | ||
|  |    | ||
|  |   private async establishSocket(): Promise<net.Socket | tls.TLSSocket> { | ||
|  |     return new Promise((resolve, reject) => { | ||
|  |       const timeout = this.options.connectionTimeout || DEFAULTS.CONNECTION_TIMEOUT; | ||
|  |       let socket: net.Socket | tls.TLSSocket; | ||
|  |        | ||
|  |       if (this.options.secure) { | ||
|  |         // Direct TLS connection
 | ||
|  |         socket = tls.connect({ | ||
|  |           host: this.options.host, | ||
|  |           port: this.options.port, | ||
|  |           ...this.options.tls | ||
|  |         }); | ||
|  |       } else { | ||
|  |         // Plain connection
 | ||
|  |         socket = new net.Socket(); | ||
|  |         socket.connect(this.options.port, this.options.host); | ||
|  |       } | ||
|  |        | ||
|  |       const timeoutHandler = setTimeout(() => { | ||
|  |         socket.destroy(); | ||
|  |         reject(new Error(`Connection timeout after ${timeout}ms`)); | ||
|  |       }, timeout); | ||
|  |        | ||
|  |       // For TLS connections, we need to wait for 'secureConnect' instead of 'connect'
 | ||
|  |       const successEvent = this.options.secure ? 'secureConnect' : 'connect'; | ||
|  |        | ||
|  |       socket.once(successEvent, () => { | ||
|  |         clearTimeout(timeoutHandler); | ||
|  |         resolve(socket); | ||
|  |       }); | ||
|  |        | ||
|  |       socket.once('error', (error) => { | ||
|  |         clearTimeout(timeoutHandler); | ||
|  |         reject(error); | ||
|  |       }); | ||
|  |     }); | ||
|  |   } | ||
|  |    | ||
|  |   private setupSocketHandlers(socket: net.Socket | tls.TLSSocket, connectionId: string): void { | ||
|  |     const socketTimeout = this.options.socketTimeout || DEFAULTS.SOCKET_TIMEOUT; | ||
|  |      | ||
|  |     socket.setTimeout(socketTimeout); | ||
|  |      | ||
|  |     socket.on('timeout', () => { | ||
|  |       logDebug('Socket timeout', this.options, { connectionId }); | ||
|  |       socket.destroy(); | ||
|  |     }); | ||
|  |      | ||
|  |     socket.on('error', (error) => { | ||
|  |       logConnection('error', this.options, { connectionId, error }); | ||
|  |       this.connections.delete(connectionId); | ||
|  |     }); | ||
|  |      | ||
|  |     socket.on('close', () => { | ||
|  |       this.connections.delete(connectionId); | ||
|  |       logDebug('Socket closed', this.options, { connectionId }); | ||
|  |     }); | ||
|  |   } | ||
|  |    | ||
|  |   private findIdleConnection(): ISmtpConnection | null { | ||
|  |     for (const connection of this.connections.values()) { | ||
|  |       if (connection.state === CONNECTION_STATES.READY) { | ||
|  |         return connection; | ||
|  |       } | ||
|  |     } | ||
|  |     return null; | ||
|  |   } | ||
|  |    | ||
|  |   private shouldReuseConnection(connection: ISmtpConnection): boolean { | ||
|  |     const maxMessages = this.options.maxMessages || DEFAULTS.MAX_MESSAGES; | ||
|  |     const maxAge = 300000; // 5 minutes
 | ||
|  |     const age = Date.now() - connection.createdAt.getTime(); | ||
|  |      | ||
|  |     return connection.messageCount < maxMessages &&  | ||
|  |            age < maxAge &&  | ||
|  |            !connection.socket.destroyed; | ||
|  |   } | ||
|  |    | ||
|  |   private getActiveConnectionCount(): number { | ||
|  |     return this.connections.size + this.pendingConnections.size; | ||
|  |   } | ||
|  |    | ||
|  |   private getConnectionId(connection: ISmtpConnection): string | null { | ||
|  |     for (const [id, conn] of this.connections.entries()) { | ||
|  |       if (conn === connection) { | ||
|  |         return id; | ||
|  |       } | ||
|  |     } | ||
|  |     return null; | ||
|  |   } | ||
|  |    | ||
|  |   private setupIdleCleanup(): void { | ||
|  |     if (!this.options.pool) { | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     const cleanupInterval = DEFAULTS.POOL_IDLE_TIMEOUT; | ||
|  |      | ||
|  |     this.idleTimeout = setInterval(() => { | ||
|  |       const now = Date.now(); | ||
|  |       const connectionsToClose: ISmtpConnection[] = []; | ||
|  |        | ||
|  |       for (const connection of this.connections.values()) { | ||
|  |         const idleTime = now - connection.lastActivity.getTime(); | ||
|  |          | ||
|  |         if (connection.state === CONNECTION_STATES.READY && idleTime > cleanupInterval) { | ||
|  |           connectionsToClose.push(connection); | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       for (const connection of connectionsToClose) { | ||
|  |         logDebug('Closing idle connection', this.options); | ||
|  |         this.closeConnection(connection); | ||
|  |       } | ||
|  |     }, cleanupInterval); | ||
|  |   } | ||
|  | } |