1061 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			1061 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| /**
 | |
|  * SMTP Connection Manager
 | |
|  * Responsible for managing socket connections to the SMTP server
 | |
|  */
 | |
| 
 | |
| import * as plugins from '../../../plugins.ts';
 | |
| import type { IConnectionManager, ISmtpServer } from './interfaces.ts';
 | |
| import { SmtpResponseCode, SMTP_DEFAULTS, SmtpState } from './constants.ts';
 | |
| import { SmtpLogger } from './utils/logging.ts';
 | |
| import { adaptiveLogger } from './utils/adaptive-logging.ts';
 | |
| import { getSocketDetails, formatMultilineResponse } from './utils/helpers.ts';
 | |
| 
 | |
| /**
 | |
|  * Manager for SMTP connections
 | |
|  * Handles connection setup, event listeners, and lifecycle management
 | |
|  * Provides resource management, connection tracking, and monitoring
 | |
|  */
 | |
| export class ConnectionManager implements IConnectionManager {
 | |
|   /**
 | |
|    * Reference to the SMTP server instance
 | |
|    */
 | |
|   private smtpServer: ISmtpServer;
 | |
|   
 | |
|   /**
 | |
|    * Set of active socket connections
 | |
|    */
 | |
|   private activeConnections: Set<plugins.net.Socket | plugins.tls.TLSSocket> = new Set();
 | |
|   
 | |
|   /**
 | |
|    * Connection tracking for resource management
 | |
|    */
 | |
|   private connectionStats = {
 | |
|     totalConnections: 0,
 | |
|     activeConnections: 0,
 | |
|     peakConnections: 0,
 | |
|     rejectedConnections: 0,
 | |
|     closedConnections: 0,
 | |
|     erroredConnections: 0,
 | |
|     timedOutConnections: 0
 | |
|   };
 | |
| 
 | |
|   /**
 | |
|    * Per-IP connection tracking for rate limiting
 | |
|    */
 | |
|   private ipConnections: Map<string, {
 | |
|     count: number;
 | |
|     firstConnection: number;
 | |
|     lastConnection: number;
 | |
|   }> = new Map();
 | |
|   
 | |
|   /**
 | |
|    * Resource monitoring interval
 | |
|    */
 | |
|   private resourceCheckInterval: NodeJS.Timeout | null = null;
 | |
|   
 | |
|   /**
 | |
|    * Track cleanup timers so we can clear them
 | |
|    */
 | |
|   private cleanupTimers: Set<NodeJS.Timeout> = new Set();
 | |
|   
 | |
|   /**
 | |
|    * SMTP server options with enhanced resource controls
 | |
|    */
 | |
|   private options: {
 | |
|     hostname: string;
 | |
|     maxConnections: number;
 | |
|     socketTimeout: number;
 | |
|     maxConnectionsPerIP: number;
 | |
|     connectionRateLimit: number;
 | |
|     connectionRateWindow: number;
 | |
|     bufferSizeLimit: number;
 | |
|     resourceCheckInterval: number;
 | |
|   };
 | |
|   
 | |
|   /**
 | |
|    * Creates a new connection manager with enhanced resource management
 | |
|    * @param smtpServer - SMTP server instance
 | |
|    */
 | |
|   constructor(smtpServer: ISmtpServer) {
 | |
|     this.smtpServer = smtpServer;
 | |
|     
 | |
|     // Get options from server
 | |
|     const serverOptions = this.smtpServer.getOptions();
 | |
|     
 | |
|     // Default values for resource management - adjusted for production scalability
 | |
|     const DEFAULT_MAX_CONNECTIONS_PER_IP = 50; // Increased to support high-concurrency scenarios
 | |
|     const DEFAULT_CONNECTION_RATE_LIMIT = 200; // Increased for production load handling
 | |
|     const DEFAULT_CONNECTION_RATE_WINDOW = 60 * 1000; // 60 seconds window
 | |
|     const DEFAULT_BUFFER_SIZE_LIMIT = 10 * 1024 * 1024; // 10 MB
 | |
|     const DEFAULT_RESOURCE_CHECK_INTERVAL = 30 * 1000; // 30 seconds
 | |
|     
 | |
|     this.options = {
 | |
|       hostname: serverOptions.hostname || SMTP_DEFAULTS.HOSTNAME,
 | |
|       maxConnections: serverOptions.maxConnections || SMTP_DEFAULTS.MAX_CONNECTIONS,
 | |
|       socketTimeout: serverOptions.socketTimeout || SMTP_DEFAULTS.SOCKET_TIMEOUT,
 | |
|       maxConnectionsPerIP: DEFAULT_MAX_CONNECTIONS_PER_IP,
 | |
|       connectionRateLimit: DEFAULT_CONNECTION_RATE_LIMIT,
 | |
|       connectionRateWindow: DEFAULT_CONNECTION_RATE_WINDOW,
 | |
|       bufferSizeLimit: DEFAULT_BUFFER_SIZE_LIMIT,
 | |
|       resourceCheckInterval: DEFAULT_RESOURCE_CHECK_INTERVAL
 | |
|     };
 | |
|     
 | |
|     // Start resource monitoring
 | |
|     this.startResourceMonitoring();
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Start resource monitoring interval to check resource usage
 | |
|    */
 | |
|   private startResourceMonitoring(): void {
 | |
|     // Clear any existing interval
 | |
|     if (this.resourceCheckInterval) {
 | |
|       clearInterval(this.resourceCheckInterval);
 | |
|     }
 | |
|     
 | |
|     // Set up new interval
 | |
|     this.resourceCheckInterval = setInterval(() => {
 | |
|       this.monitorResourceUsage();
 | |
|     }, this.options.resourceCheckInterval);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Monitor resource usage and log statistics
 | |
|    */
 | |
|   private monitorResourceUsage(): void {
 | |
|     // Calculate memory usage
 | |
|     const memoryUsage = process.memoryUsage();
 | |
|     const memoryUsageMB = {
 | |
|       rss: Math.round(memoryUsage.rss / 1024 / 1024),
 | |
|       heapTotal: Math.round(memoryUsage.heapTotal / 1024 / 1024),
 | |
|       heapUsed: Math.round(memoryUsage.heapUsed / 1024 / 1024),
 | |
|       external: Math.round(memoryUsage.external / 1024 / 1024)
 | |
|     };
 | |
|     
 | |
|     // Calculate connection rate metrics
 | |
|     const activeIPs = Array.from(this.ipConnections.entries())
 | |
|       .filter(([_, data]) => data.count > 0).length;
 | |
|       
 | |
|     const highVolumeIPs = Array.from(this.ipConnections.entries())
 | |
|       .filter(([_, data]) => data.count > this.options.connectionRateLimit / 2).length;
 | |
|     
 | |
|     // Log resource usage with more detailed metrics
 | |
|     SmtpLogger.info('Resource usage stats', {
 | |
|       connections: {
 | |
|         active: this.activeConnections.size,
 | |
|         total: this.connectionStats.totalConnections,
 | |
|         peak: this.connectionStats.peakConnections,
 | |
|         rejected: this.connectionStats.rejectedConnections,
 | |
|         closed: this.connectionStats.closedConnections,
 | |
|         errored: this.connectionStats.erroredConnections,
 | |
|         timedOut: this.connectionStats.timedOutConnections
 | |
|       },
 | |
|       memory: memoryUsageMB,
 | |
|       ipTracking: {
 | |
|         uniqueIPs: this.ipConnections.size,
 | |
|         activeIPs: activeIPs,
 | |
|         highVolumeIPs: highVolumeIPs
 | |
|       },
 | |
|       resourceLimits: {
 | |
|         maxConnections: this.options.maxConnections,
 | |
|         maxConnectionsPerIP: this.options.maxConnectionsPerIP,
 | |
|         connectionRateLimit: this.options.connectionRateLimit,
 | |
|         bufferSizeLimit: Math.round(this.options.bufferSizeLimit / 1024 / 1024) + 'MB'
 | |
|       }
 | |
|     });
 | |
|     
 | |
|     // Check for potential DoS conditions
 | |
|     if (highVolumeIPs > 3) {
 | |
|       SmtpLogger.warn(`Potential DoS detected: ${highVolumeIPs} IPs with high connection rates`);
 | |
|     }
 | |
|     
 | |
|     // Assess memory usage trends
 | |
|     if (memoryUsageMB.heapUsed > 500) { // Over 500MB heap used
 | |
|       SmtpLogger.warn(`High memory usage detected: ${memoryUsageMB.heapUsed}MB heap used`);
 | |
|     }
 | |
|     
 | |
|     // Clean up expired IP rate limits and validate resource tracking
 | |
|     this.cleanupIpRateLimits();
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Clean up expired IP rate limits and perform additional resource monitoring
 | |
|    */
 | |
|   private cleanupIpRateLimits(): void {
 | |
|     const now = Date.now();
 | |
|     const windowThreshold = now - this.options.connectionRateWindow;
 | |
|     let activeIps = 0;
 | |
|     let removedEntries = 0;
 | |
|     
 | |
|     // Iterate through IP connections and manage entries
 | |
|     for (const [ip, data] of this.ipConnections.entries()) {
 | |
|       // If the last connection was before the window threshold + one extra window, remove the entry
 | |
|       if (data.lastConnection < windowThreshold - this.options.connectionRateWindow) {
 | |
|         // Remove stale entries to prevent memory growth
 | |
|         this.ipConnections.delete(ip);
 | |
|         removedEntries++;
 | |
|       } 
 | |
|       // If last connection was before the window threshold, reset the count
 | |
|       else if (data.lastConnection < windowThreshold) {
 | |
|         if (data.count > 0) {
 | |
|           // Reset but keep the IP in the map with a zero count
 | |
|           this.ipConnections.set(ip, { 
 | |
|             count: 0, 
 | |
|             firstConnection: now, 
 | |
|             lastConnection: now 
 | |
|           });
 | |
|         }
 | |
|       } else {
 | |
|         // This IP is still active within the current window
 | |
|         activeIps++;
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     // Log cleanup activity if significant changes occurred
 | |
|     if (removedEntries > 0) {
 | |
|       SmtpLogger.debug(`IP rate limit cleanup: removed ${removedEntries} stale entries, ${this.ipConnections.size} remaining, ${activeIps} active in current window`);
 | |
|     }
 | |
|     
 | |
|     // Check for memory leaks in connection tracking
 | |
|     if (this.activeConnections.size > 0 && this.connectionStats.activeConnections !== this.activeConnections.size) {
 | |
|       SmtpLogger.warn(`Connection tracking inconsistency detected: stats.active=${this.connectionStats.activeConnections}, actual=${this.activeConnections.size}`);
 | |
|       // Fix the inconsistency
 | |
|       this.connectionStats.activeConnections = this.activeConnections.size;
 | |
|     }
 | |
|     
 | |
|     // Validate and clean leaked resources if needed
 | |
|     this.validateResourceTracking();
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Validate and repair resource tracking to prevent leaks
 | |
|    */
 | |
|   private validateResourceTracking(): void {
 | |
|     // Prepare a detailed report if inconsistencies are found
 | |
|     const inconsistenciesFound = [];
 | |
|     
 | |
|     // 1. Check active connections count matches activeConnections set size
 | |
|     if (this.connectionStats.activeConnections !== this.activeConnections.size) {
 | |
|       inconsistenciesFound.push({
 | |
|         issue: 'Active connection count mismatch',
 | |
|         stats: this.connectionStats.activeConnections,
 | |
|         actual: this.activeConnections.size,
 | |
|         action: 'Auto-corrected'
 | |
|       });
 | |
|       this.connectionStats.activeConnections = this.activeConnections.size;
 | |
|     }
 | |
|     
 | |
|     // 2. Check for destroyed sockets in active connections
 | |
|     let destroyedSocketsCount = 0;
 | |
|     const socketsToRemove: Array<plugins.net.Socket | plugins.tls.TLSSocket> = [];
 | |
|     
 | |
|     for (const socket of this.activeConnections) {
 | |
|       if (socket.destroyed) {
 | |
|         destroyedSocketsCount++;
 | |
|         socketsToRemove.push(socket);
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     // Remove destroyed sockets from tracking
 | |
|     for (const socket of socketsToRemove) {
 | |
|       this.activeConnections.delete(socket);
 | |
|       // Also ensure all listeners are removed
 | |
|       try {
 | |
|         socket.removeAllListeners();
 | |
|       } catch {
 | |
|         // Ignore errors from removeAllListeners
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     if (destroyedSocketsCount > 0) {
 | |
|       inconsistenciesFound.push({
 | |
|         issue: 'Destroyed sockets in active list',
 | |
|         count: destroyedSocketsCount,
 | |
|         action: 'Removed from tracking'
 | |
|       });
 | |
|       // Update active connections count after cleanup
 | |
|       this.connectionStats.activeConnections = this.activeConnections.size;
 | |
|     }
 | |
|     
 | |
|     // 3. Check for sessions without corresponding active connections
 | |
|     const sessionCount = this.smtpServer.getSessionManager().getSessionCount();
 | |
|     if (sessionCount > this.activeConnections.size) {
 | |
|       inconsistenciesFound.push({
 | |
|         issue: 'Orphaned sessions',
 | |
|         sessions: sessionCount,
 | |
|         connections: this.activeConnections.size,
 | |
|         action: 'Session cleanup recommended'
 | |
|       });
 | |
|     }
 | |
|     
 | |
|     // If any inconsistencies found, log a detailed report
 | |
|     if (inconsistenciesFound.length > 0) {
 | |
|       SmtpLogger.warn('Resource tracking inconsistencies detected and repaired', { inconsistencies: inconsistenciesFound });
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Handle a new connection with resource management
 | |
|    * @param socket - Client socket
 | |
|    */
 | |
|   public async handleNewConnection(socket: plugins.net.Socket): Promise<void> {
 | |
|     // Update connection stats
 | |
|     this.connectionStats.totalConnections++;
 | |
|     this.connectionStats.activeConnections = this.activeConnections.size + 1;
 | |
|     
 | |
|     if (this.connectionStats.activeConnections > this.connectionStats.peakConnections) {
 | |
|       this.connectionStats.peakConnections = this.connectionStats.activeConnections;
 | |
|     }
 | |
|     
 | |
|     // Get client IP
 | |
|     const remoteAddress = socket.remoteAddress || '0.0.0.0';
 | |
|     
 | |
|     // Use UnifiedRateLimiter for connection rate limiting
 | |
|     const emailServer = this.smtpServer.getEmailServer();
 | |
|     const rateLimiter = emailServer.getRateLimiter();
 | |
|     
 | |
|     // Check connection limit with UnifiedRateLimiter
 | |
|     const connectionResult = rateLimiter.recordConnection(remoteAddress);
 | |
|     if (!connectionResult.allowed) {
 | |
|       this.rejectConnection(socket, connectionResult.reason || 'Rate limit exceeded');
 | |
|       this.connectionStats.rejectedConnections++;
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Still track IP connections locally for cleanup purposes
 | |
|     this.trackIPConnection(remoteAddress);
 | |
|     
 | |
|     // Check if maximum global connections reached
 | |
|     if (this.hasReachedMaxConnections()) {
 | |
|       this.rejectConnection(socket, 'Too many connections');
 | |
|       this.connectionStats.rejectedConnections++;
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Add socket to active connections
 | |
|     this.activeConnections.add(socket);
 | |
|     
 | |
|     // Set up socket options
 | |
|     socket.setKeepAlive(true);
 | |
|     socket.setTimeout(this.options.socketTimeout);
 | |
|     
 | |
|     // Explicitly set socket buffer sizes to prevent memory issues
 | |
|     socket.setNoDelay(true); // Disable Nagle's algorithm for better responsiveness
 | |
|     
 | |
|     // Set limits on socket buffer size if supported by Node.ts version
 | |
|     try {
 | |
|       // Here we set reasonable buffer limits to prevent memory exhaustion attacks
 | |
|       const highWaterMark = 64 * 1024; // 64 KB
 | |
|       // Note: Socket high water mark methods can't be set directly in newer Node.ts versions
 | |
|       // These would need to be set during socket creation or with a different API
 | |
|     } catch (error) {
 | |
|       // Ignore errors from older Node.ts versions that don't support these methods
 | |
|       SmtpLogger.debug(`Could not set socket buffer limits: ${error instanceof Error ? error.message : String(error)}`);
 | |
|     }
 | |
|     
 | |
|     // Set up event handlers
 | |
|     this.setupSocketEventHandlers(socket);
 | |
|     
 | |
|     // Create a session for this connection
 | |
|     this.smtpServer.getSessionManager().createSession(socket, false);
 | |
|     
 | |
|     // Log the new connection using adaptive logger
 | |
|     const socketDetails = getSocketDetails(socket);
 | |
|     adaptiveLogger.logConnection(socket, 'connect');
 | |
|     
 | |
|     // Update adaptive logger with current connection count
 | |
|     adaptiveLogger.updateConnectionCount(this.connectionStats.activeConnections);
 | |
|     
 | |
|     // Send greeting
 | |
|     this.sendGreeting(socket);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Check if an IP has exceeded the rate limit
 | |
|    * @param ip - Client IP address
 | |
|    * @returns True if rate limited
 | |
|    */
 | |
|   private isIPRateLimited(ip: string): boolean {
 | |
|     const now = Date.now();
 | |
|     const ipData = this.ipConnections.get(ip);
 | |
|     
 | |
|     if (!ipData) {
 | |
|       return false; // No previous connections
 | |
|     }
 | |
|     
 | |
|     // Check if we're within the rate window
 | |
|     const isWithinWindow = now - ipData.firstConnection <= this.options.connectionRateWindow;
 | |
|     
 | |
|     // If within window and count exceeds limit, rate limit is applied
 | |
|     if (isWithinWindow && ipData.count >= this.options.connectionRateLimit) {
 | |
|       SmtpLogger.warn(`Rate limit exceeded for IP ${ip}: ${ipData.count} connections in ${Math.round((now - ipData.firstConnection) / 1000)}s`);
 | |
|       return true;
 | |
|     }
 | |
|     
 | |
|     return false;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Track a new connection from an IP
 | |
|    * @param ip - Client IP address
 | |
|    */
 | |
|   private trackIPConnection(ip: string): void {
 | |
|     const now = Date.now();
 | |
|     const ipData = this.ipConnections.get(ip);
 | |
|     
 | |
|     if (!ipData) {
 | |
|       // First connection from this IP
 | |
|       this.ipConnections.set(ip, {
 | |
|         count: 1,
 | |
|         firstConnection: now,
 | |
|         lastConnection: now
 | |
|       });
 | |
|     } else {
 | |
|       // Check if we need to reset the window
 | |
|       if (now - ipData.lastConnection > this.options.connectionRateWindow) {
 | |
|         // Reset the window
 | |
|         this.ipConnections.set(ip, {
 | |
|           count: 1,
 | |
|           firstConnection: now,
 | |
|           lastConnection: now
 | |
|         });
 | |
|       } else {
 | |
|         // Increment within the current window
 | |
|         this.ipConnections.set(ip, {
 | |
|           count: ipData.count + 1,
 | |
|           firstConnection: ipData.firstConnection,
 | |
|           lastConnection: now
 | |
|         });
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Check if an IP has reached its connection limit
 | |
|    * @param ip - Client IP address
 | |
|    * @returns True if limit reached
 | |
|    */
 | |
|   private hasReachedIPConnectionLimit(ip: string): boolean {
 | |
|     let ipConnectionCount = 0;
 | |
|     
 | |
|     // Count active connections from this IP
 | |
|     for (const socket of this.activeConnections) {
 | |
|       if (socket.remoteAddress === ip) {
 | |
|         ipConnectionCount++;
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     return ipConnectionCount >= this.options.maxConnectionsPerIP;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Handle a new secure TLS connection with resource management
 | |
|    * @param socket - Client TLS socket
 | |
|    */
 | |
|   public async handleNewSecureConnection(socket: plugins.tls.TLSSocket): Promise<void> {
 | |
|     // Update connection stats
 | |
|     this.connectionStats.totalConnections++;
 | |
|     this.connectionStats.activeConnections = this.activeConnections.size + 1;
 | |
|     
 | |
|     if (this.connectionStats.activeConnections > this.connectionStats.peakConnections) {
 | |
|       this.connectionStats.peakConnections = this.connectionStats.activeConnections;
 | |
|     }
 | |
|     
 | |
|     // Get client IP
 | |
|     const remoteAddress = socket.remoteAddress || '0.0.0.0';
 | |
|     
 | |
|     // Use UnifiedRateLimiter for connection rate limiting
 | |
|     const emailServer = this.smtpServer.getEmailServer();
 | |
|     const rateLimiter = emailServer.getRateLimiter();
 | |
|     
 | |
|     // Check connection limit with UnifiedRateLimiter
 | |
|     const connectionResult = rateLimiter.recordConnection(remoteAddress);
 | |
|     if (!connectionResult.allowed) {
 | |
|       this.rejectConnection(socket, connectionResult.reason || 'Rate limit exceeded');
 | |
|       this.connectionStats.rejectedConnections++;
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Still track IP connections locally for cleanup purposes
 | |
|     this.trackIPConnection(remoteAddress);
 | |
|     
 | |
|     // Check if maximum global connections reached
 | |
|     if (this.hasReachedMaxConnections()) {
 | |
|       this.rejectConnection(socket, 'Too many connections');
 | |
|       this.connectionStats.rejectedConnections++;
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Add socket to active connections
 | |
|     this.activeConnections.add(socket);
 | |
|     
 | |
|     // Set up socket options
 | |
|     socket.setKeepAlive(true);
 | |
|     socket.setTimeout(this.options.socketTimeout);
 | |
|     
 | |
|     // Explicitly set socket buffer sizes to prevent memory issues
 | |
|     socket.setNoDelay(true); // Disable Nagle's algorithm for better responsiveness
 | |
|     
 | |
|     // Set limits on socket buffer size if supported by Node.ts version
 | |
|     try {
 | |
|       // Here we set reasonable buffer limits to prevent memory exhaustion attacks
 | |
|       const highWaterMark = 64 * 1024; // 64 KB
 | |
|       // Note: Socket high water mark methods can't be set directly in newer Node.ts versions
 | |
|       // These would need to be set during socket creation or with a different API
 | |
|     } catch (error) {
 | |
|       // Ignore errors from older Node.ts versions that don't support these methods
 | |
|       SmtpLogger.debug(`Could not set socket buffer limits: ${error instanceof Error ? error.message : String(error)}`);
 | |
|     }
 | |
|     
 | |
|     // Set up event handlers
 | |
|     this.setupSocketEventHandlers(socket);
 | |
|     
 | |
|     // Create a session for this connection
 | |
|     this.smtpServer.getSessionManager().createSession(socket, true);
 | |
|     
 | |
|     // Log the new secure connection using adaptive logger
 | |
|     adaptiveLogger.logConnection(socket, 'connect');
 | |
|     
 | |
|     // Update adaptive logger with current connection count
 | |
|     adaptiveLogger.updateConnectionCount(this.connectionStats.activeConnections);
 | |
|     
 | |
|     // Send greeting
 | |
|     this.sendGreeting(socket);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Set up event handlers for a socket with enhanced resource management
 | |
|    * @param socket - Client socket
 | |
|    */
 | |
|   public setupSocketEventHandlers(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
 | |
|     // Store existing socket event handlers before adding new ones
 | |
|     const existingDataHandler = socket.listeners('data')[0] as (...args: any[]) => void;
 | |
|     const existingCloseHandler = socket.listeners('close')[0] as (...args: any[]) => void;
 | |
|     const existingErrorHandler = socket.listeners('error')[0] as (...args: any[]) => void;
 | |
|     const existingTimeoutHandler = socket.listeners('timeout')[0] as (...args: any[]) => void;
 | |
|     
 | |
|     // Remove existing event handlers if they exist
 | |
|     if (existingDataHandler) socket.removeListener('data', existingDataHandler);
 | |
|     if (existingCloseHandler) socket.removeListener('close', existingCloseHandler);
 | |
|     if (existingErrorHandler) socket.removeListener('error', existingErrorHandler);
 | |
|     if (existingTimeoutHandler) socket.removeListener('timeout', existingTimeoutHandler);
 | |
|     
 | |
|     // Data event - process incoming data from the client with resource limits
 | |
|     let buffer = '';
 | |
|     let totalBytesReceived = 0;
 | |
|     
 | |
|     socket.on('data', async (data) => {
 | |
|       try {
 | |
|         // Get current session and update activity timestamp
 | |
|         const session = this.smtpServer.getSessionManager().getSession(socket);
 | |
|         if (session) {
 | |
|           this.smtpServer.getSessionManager().updateSessionActivity(session);
 | |
|         }
 | |
|         
 | |
|         // Check if we're in DATA receiving mode - handle differently
 | |
|         if (session && session.state === SmtpState.DATA_RECEIVING) {
 | |
|           // In DATA mode, pass raw chunks directly to command handler with special marker
 | |
|           // Don't line-buffer large email content
 | |
|           try {
 | |
|             const dataString = data.toString('utf8');
 | |
|             // Use a special prefix to indicate this is raw data, not a command line
 | |
|             // CRITICAL FIX: Must await to prevent async pile-up
 | |
|             await this.smtpServer.getCommandHandler().processCommand(socket, `__RAW_DATA__${dataString}`);
 | |
|             return;
 | |
|           } catch (dataError) {
 | |
|             SmtpLogger.error(`Data handler error during DATA mode: ${dataError instanceof Error ? dataError.message : String(dataError)}`);
 | |
|             socket.destroy();
 | |
|             return;
 | |
|           }
 | |
|         }
 | |
|         
 | |
|         // For command mode, continue with line-buffered processing
 | |
|         // Check buffer size limits to prevent memory attacks
 | |
|         totalBytesReceived += data.length;
 | |
|         
 | |
|         if (buffer.length > this.options.bufferSizeLimit) {
 | |
|           // Buffer is too large, reject the connection
 | |
|           SmtpLogger.warn(`Buffer size limit exceeded: ${buffer.length} bytes for ${socket.remoteAddress}`);
 | |
|           this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Message too large, disconnecting`);
 | |
|           socket.destroy();
 | |
|           return;
 | |
|         }
 | |
|         
 | |
|         // Impose a total transfer limit to prevent DoS
 | |
|         if (totalBytesReceived > this.options.bufferSizeLimit * 2) {
 | |
|           SmtpLogger.warn(`Total transfer limit exceeded: ${totalBytesReceived} bytes for ${socket.remoteAddress}`);
 | |
|           this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Transfer limit exceeded, disconnecting`);
 | |
|           socket.destroy();
 | |
|           return;
 | |
|         }
 | |
|         
 | |
|         // Convert buffer to string safely with explicit encoding
 | |
|         const dataString = data.toString('utf8');
 | |
|         
 | |
|         // Buffer incoming data
 | |
|         buffer += dataString;
 | |
|         
 | |
|         // Process complete lines
 | |
|         let lineEndPos;
 | |
|         while ((lineEndPos = buffer.indexOf(SMTP_DEFAULTS.CRLF)) !== -1) {
 | |
|           // Extract a complete line
 | |
|           const line = buffer.substring(0, lineEndPos);
 | |
|           buffer = buffer.substring(lineEndPos + 2); // +2 to skip CRLF
 | |
|           
 | |
|           // Check line length to prevent extremely long lines
 | |
|           if (line.length > 4096) { // 4KB line limit is reasonable for SMTP
 | |
|             SmtpLogger.warn(`Line length limit exceeded: ${line.length} bytes for ${socket.remoteAddress}`);
 | |
|             this.sendResponse(socket, `${SmtpResponseCode.SYNTAX_ERROR} Line too long, disconnecting`);
 | |
|             socket.destroy();
 | |
|             return;
 | |
|           }
 | |
|           
 | |
|           // Process non-empty lines
 | |
|           if (line.length > 0) {
 | |
|             try {
 | |
|               // CRITICAL FIX: Must await processCommand to prevent async pile-up
 | |
|               // This was causing the busy loop with high CPU usage when many empty lines were processed
 | |
|               await this.smtpServer.getCommandHandler().processCommand(socket, line);
 | |
|             } catch (error) {
 | |
|               // Handle any errors in command processing
 | |
|               SmtpLogger.error(`Command handler error: ${error instanceof Error ? error.message : String(error)}`);
 | |
|               this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Internal server error`);
 | |
|               
 | |
|               // If there's a severe error, close the connection
 | |
|               if (error instanceof Error && 
 | |
|                   (error.message.includes('fatal') || error.message.includes('critical'))) {
 | |
|                 socket.destroy();
 | |
|                 return;
 | |
|               }
 | |
|             }
 | |
|           }
 | |
|         }
 | |
|         
 | |
|         // If buffer is getting too large without CRLF, it might be a DoS attempt
 | |
|         if (buffer.length > 10240) { // 10KB is a reasonable limit for a line without CRLF
 | |
|           SmtpLogger.warn(`Incomplete line too large: ${buffer.length} bytes for ${socket.remoteAddress}`);
 | |
|           this.sendResponse(socket, `${SmtpResponseCode.SYNTAX_ERROR} Incomplete line too large, disconnecting`);
 | |
|           socket.destroy();
 | |
|         }
 | |
|       } catch (error) {
 | |
|         // Handle any unexpected errors during data processing
 | |
|         SmtpLogger.error(`Data handler error: ${error instanceof Error ? error.message : String(error)}`);
 | |
|         socket.destroy();
 | |
|       }
 | |
|     });
 | |
|     
 | |
|     // Add drain event handler to manage flow control
 | |
|     socket.on('drain', () => {
 | |
|       // Socket buffer has been emptied, resume data flow if needed
 | |
|       if (socket.isPaused()) {
 | |
|         socket.resume();
 | |
|         SmtpLogger.debug(`Resumed socket for ${socket.remoteAddress} after drain`);
 | |
|       }
 | |
|     });
 | |
|     
 | |
|     // Close event - clean up when connection is closed
 | |
|     socket.on('close', (hadError) => {
 | |
|       this.handleSocketClose(socket, hadError);
 | |
|     });
 | |
|     
 | |
|     // Error event - handle socket errors
 | |
|     socket.on('error', (err) => {
 | |
|       this.handleSocketError(socket, err);
 | |
|     });
 | |
|     
 | |
|     // Timeout event - handle socket timeouts
 | |
|     socket.on('timeout', () => {
 | |
|       this.handleSocketTimeout(socket);
 | |
|     });
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Get the current connection count
 | |
|    * @returns Number of active connections
 | |
|    */
 | |
|   public getConnectionCount(): number {
 | |
|     return this.activeConnections.size;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Check if the server has reached the maximum number of connections
 | |
|    * @returns True if max connections reached
 | |
|    */
 | |
|   public hasReachedMaxConnections(): boolean {
 | |
|     return this.activeConnections.size >= this.options.maxConnections;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Close all active connections
 | |
|    */
 | |
|   public closeAllConnections(): void {
 | |
|     const connectionCount = this.activeConnections.size;
 | |
|     if (connectionCount === 0) {
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     SmtpLogger.info(`Closing all connections (count: ${connectionCount})`);
 | |
|     
 | |
|     for (const socket of this.activeConnections) {
 | |
|       try {
 | |
|         // Send service closing notification
 | |
|         this.sendServiceClosing(socket);
 | |
|         
 | |
|         // End the socket gracefully
 | |
|         socket.end();
 | |
|         
 | |
|         // Force destroy after a short delay if not already destroyed
 | |
|         const destroyTimer = setTimeout(() => {
 | |
|           if (!socket.destroyed) {
 | |
|             socket.destroy();
 | |
|           }
 | |
|           this.cleanupTimers.delete(destroyTimer);
 | |
|         }, 100);
 | |
|         this.cleanupTimers.add(destroyTimer);
 | |
|       } catch (error) {
 | |
|         SmtpLogger.error(`Error closing connection: ${error instanceof Error ? error.message : String(error)}`);
 | |
|         // Force destroy on error
 | |
|         try {
 | |
|           socket.destroy();
 | |
|         } catch (e) {
 | |
|           // Ignore destroy errors
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     // Clear active connections
 | |
|     this.activeConnections.clear();
 | |
|     
 | |
|     // Stop resource monitoring to prevent hanging timers
 | |
|     if (this.resourceCheckInterval) {
 | |
|       clearInterval(this.resourceCheckInterval);
 | |
|       this.resourceCheckInterval = null;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Handle socket close event
 | |
|    * @param socket - Client socket
 | |
|    * @param hadError - Whether the socket was closed due to error
 | |
|    */
 | |
|   private handleSocketClose(socket: plugins.net.Socket | plugins.tls.TLSSocket, hadError: boolean): void {
 | |
|     try {
 | |
|       // Update connection statistics
 | |
|       this.connectionStats.closedConnections++;
 | |
|       this.connectionStats.activeConnections = this.activeConnections.size - 1;
 | |
|       
 | |
|       // Get socket details for logging
 | |
|       const socketDetails = getSocketDetails(socket);
 | |
|       const socketId = `${socketDetails.remoteAddress}:${socketDetails.remotePort}`;
 | |
|       
 | |
|       // Log with appropriate level based on whether there was an error
 | |
|       if (hadError) {
 | |
|         SmtpLogger.warn(`Socket closed with error: ${socketId}`);
 | |
|       } else {
 | |
|         SmtpLogger.debug(`Socket closed normally: ${socketId}`);
 | |
|       }
 | |
|       
 | |
|       // Get the session before removing it
 | |
|       const session = this.smtpServer.getSessionManager().getSession(socket);
 | |
|       
 | |
|       // Remove from active connections
 | |
|       this.activeConnections.delete(socket);
 | |
|       
 | |
|       // Remove from session manager
 | |
|       this.smtpServer.getSessionManager().removeSession(socket);
 | |
|       
 | |
|       // Cancel any timeout ID stored in the session
 | |
|       if (session?.dataTimeoutId) {
 | |
|         clearTimeout(session.dataTimeoutId);
 | |
|       }
 | |
|       
 | |
|       // Remove all event listeners to prevent memory leaks
 | |
|       socket.removeAllListeners();
 | |
|       
 | |
|       // Log connection close with session details if available
 | |
|       adaptiveLogger.logConnection(socket, 'close', session);
 | |
|       
 | |
|       // Update adaptive logger with new connection count
 | |
|       adaptiveLogger.updateConnectionCount(this.connectionStats.activeConnections);
 | |
|     } catch (error) {
 | |
|       // Handle any unexpected errors during cleanup
 | |
|       SmtpLogger.error(`Error in handleSocketClose: ${error instanceof Error ? error.message : String(error)}`);
 | |
|       
 | |
|       // Ensure socket is removed from active connections even if an error occurs
 | |
|       this.activeConnections.delete(socket);
 | |
|       
 | |
|       // Always try to remove all listeners even on error
 | |
|       try {
 | |
|         socket.removeAllListeners();
 | |
|       } catch {
 | |
|         // Ignore errors from removeAllListeners
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Handle socket error event
 | |
|    * @param socket - Client socket
 | |
|    * @param error - Error object
 | |
|    */
 | |
|   private handleSocketError(socket: plugins.net.Socket | plugins.tls.TLSSocket, error: Error): void {
 | |
|     try {
 | |
|       // Update connection statistics
 | |
|       this.connectionStats.erroredConnections++;
 | |
|       
 | |
|       // Get socket details for context
 | |
|       const socketDetails = getSocketDetails(socket);
 | |
|       const socketId = `${socketDetails.remoteAddress}:${socketDetails.remotePort}`;
 | |
|       
 | |
|       // Get the session
 | |
|       const session = this.smtpServer.getSessionManager().getSession(socket);
 | |
|       
 | |
|       // Detailed error logging with context information
 | |
|       SmtpLogger.error(`Socket error for ${socketId}: ${error.message}`, {
 | |
|         errorCode: (error as any).code,
 | |
|         errorStack: error.stack,
 | |
|         sessionId: session?.id,
 | |
|         sessionState: session?.state,
 | |
|         remoteAddress: socketDetails.remoteAddress,
 | |
|         remotePort: socketDetails.remotePort
 | |
|       });
 | |
|       
 | |
|       // Log the error for connection tracking using adaptive logger
 | |
|       adaptiveLogger.logConnection(socket, 'error', session, error);
 | |
|       
 | |
|       // Cancel any timeout ID stored in the session
 | |
|       if (session?.dataTimeoutId) {
 | |
|         clearTimeout(session.dataTimeoutId);
 | |
|       }
 | |
|       
 | |
|       // Close the socket if not already closed
 | |
|       if (!socket.destroyed) {
 | |
|         socket.destroy();
 | |
|       }
 | |
|       
 | |
|       // Remove from active connections (cleanup after error)
 | |
|       this.activeConnections.delete(socket);
 | |
|       
 | |
|       // Remove from session manager
 | |
|       this.smtpServer.getSessionManager().removeSession(socket);
 | |
|     } catch (handlerError) {
 | |
|       // Meta-error handling (errors in the error handler)
 | |
|       SmtpLogger.error(`Error in handleSocketError: ${handlerError instanceof Error ? handlerError.message : String(handlerError)}`);
 | |
|       
 | |
|       // Ensure socket is destroyed and removed from active connections
 | |
|       if (!socket.destroyed) {
 | |
|         socket.destroy();
 | |
|       }
 | |
|       this.activeConnections.delete(socket);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Handle socket timeout event
 | |
|    * @param socket - Client socket
 | |
|    */
 | |
|   private handleSocketTimeout(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
 | |
|     try {
 | |
|       // Update connection statistics
 | |
|       this.connectionStats.timedOutConnections++;
 | |
|       
 | |
|       // Get socket details for context
 | |
|       const socketDetails = getSocketDetails(socket);
 | |
|       const socketId = `${socketDetails.remoteAddress}:${socketDetails.remotePort}`;
 | |
|       
 | |
|       // Get the session
 | |
|       const session = this.smtpServer.getSessionManager().getSession(socket);
 | |
|       
 | |
|       // Get timing information for better debugging
 | |
|       const now = Date.now();
 | |
|       const idleTime = session?.lastActivity ? now - session.lastActivity : 'unknown';
 | |
|       
 | |
|       if (session) {
 | |
|         // Log the timeout with extended details
 | |
|         SmtpLogger.warn(`Socket timeout from ${session.remoteAddress}`, {
 | |
|           sessionId: session.id,
 | |
|           remoteAddress: session.remoteAddress,
 | |
|           state: session.state,
 | |
|           timeout: this.options.socketTimeout,
 | |
|           idleTime: idleTime,
 | |
|           emailState: session.envelope?.mailFrom ? 'has-sender' : 'no-sender',
 | |
|           recipientCount: session.envelope?.rcptTo?.length || 0
 | |
|         });
 | |
|         
 | |
|         // Cancel any timeout ID stored in the session
 | |
|         if (session.dataTimeoutId) {
 | |
|           clearTimeout(session.dataTimeoutId);
 | |
|         }
 | |
|         
 | |
|         // Send timeout notification to client
 | |
|         this.sendResponse(socket, `${SmtpResponseCode.SERVICE_NOT_AVAILABLE} Connection timeout - closing connection`);
 | |
|       } else {
 | |
|         // Log timeout without session context
 | |
|         SmtpLogger.warn(`Socket timeout without session from ${socketId}`);
 | |
|       }
 | |
|       
 | |
|       // Close the socket gracefully
 | |
|       try {
 | |
|         socket.end();
 | |
|         
 | |
|         // Set a forced close timeout in case socket.end() doesn't close the connection
 | |
|         const timeoutDestroyTimer = setTimeout(() => {
 | |
|           if (!socket.destroyed) {
 | |
|             SmtpLogger.warn(`Forcing destroy of timed out socket: ${socketId}`);
 | |
|             socket.destroy();
 | |
|           }
 | |
|           this.cleanupTimers.delete(timeoutDestroyTimer);
 | |
|         }, 5000); // 5 second grace period for socket to end properly
 | |
|         this.cleanupTimers.add(timeoutDestroyTimer);
 | |
|       } catch (error) {
 | |
|         SmtpLogger.error(`Error ending timed out socket: ${error instanceof Error ? error.message : String(error)}`);
 | |
|         
 | |
|         // Ensure socket is destroyed even if end() fails
 | |
|         if (!socket.destroyed) {
 | |
|           socket.destroy();
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       // Clean up resources
 | |
|       this.activeConnections.delete(socket);
 | |
|       this.smtpServer.getSessionManager().removeSession(socket);
 | |
|     } catch (handlerError) {
 | |
|       // Handle any unexpected errors during timeout handling
 | |
|       SmtpLogger.error(`Error in handleSocketTimeout: ${handlerError instanceof Error ? handlerError.message : String(handlerError)}`);
 | |
|       
 | |
|       // Ensure socket is destroyed and removed from tracking
 | |
|       if (!socket.destroyed) {
 | |
|         socket.destroy();
 | |
|       }
 | |
|       this.activeConnections.delete(socket);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Reject a connection
 | |
|    * @param socket - Client socket
 | |
|    * @param reason - Reason for rejection
 | |
|    */
 | |
|   private rejectConnection(socket: plugins.net.Socket | plugins.tls.TLSSocket, reason: string): void {
 | |
|     // Log the rejection
 | |
|     const socketDetails = getSocketDetails(socket);
 | |
|     SmtpLogger.warn(`Connection rejected from ${socketDetails.remoteAddress}:${socketDetails.remotePort}: ${reason}`);
 | |
|     
 | |
|     // Send rejection message
 | |
|     this.sendResponse(socket, `${SmtpResponseCode.SERVICE_NOT_AVAILABLE} ${this.options.hostname} Service temporarily unavailable - ${reason}`);
 | |
|     
 | |
|     // Close the socket
 | |
|     try {
 | |
|       socket.end();
 | |
|     } catch (error) {
 | |
|       SmtpLogger.error(`Error ending rejected socket: ${error instanceof Error ? error.message : String(error)}`);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Send greeting message
 | |
|    * @param socket - Client socket
 | |
|    */
 | |
|   private sendGreeting(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
 | |
|     const greeting = `${SmtpResponseCode.SERVICE_READY} ${this.options.hostname} ESMTP service ready`;
 | |
|     this.sendResponse(socket, greeting);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Send service closing notification
 | |
|    * @param socket - Client socket
 | |
|    */
 | |
|   private sendServiceClosing(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
 | |
|     const message = `${SmtpResponseCode.SERVICE_CLOSING} ${this.options.hostname} Service closing transmission channel`;
 | |
|     this.sendResponse(socket, message);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Send response to client
 | |
|    * @param socket - Client socket
 | |
|    * @param response - Response to send
 | |
|    */
 | |
|   private sendResponse(socket: plugins.net.Socket | plugins.tls.TLSSocket, response: string): void {
 | |
|     // Check if socket is still writable before attempting to write
 | |
|     if (socket.destroyed || socket.readyState !== 'open' || !socket.writable) {
 | |
|       SmtpLogger.debug(`Skipping response to closed/destroyed socket: ${response}`, {
 | |
|         remoteAddress: socket.remoteAddress,
 | |
|         remotePort: socket.remotePort,
 | |
|         destroyed: socket.destroyed,
 | |
|         readyState: socket.readyState,
 | |
|         writable: socket.writable
 | |
|       });
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     try {
 | |
|       socket.write(`${response}${SMTP_DEFAULTS.CRLF}`);
 | |
|       adaptiveLogger.logResponse(response, socket);
 | |
|     } catch (error) {
 | |
|       // Log error and destroy socket
 | |
|       SmtpLogger.error(`Error sending response: ${error instanceof Error ? error.message : String(error)}`, {
 | |
|         response,
 | |
|         remoteAddress: socket.remoteAddress,
 | |
|         remotePort: socket.remotePort,
 | |
|         error: error instanceof Error ? error : new Error(String(error))
 | |
|       });
 | |
|       
 | |
|       socket.destroy();
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Handle a new connection (interface requirement)
 | |
|    */
 | |
|   public async handleConnection(socket: plugins.net.Socket | plugins.tls.TLSSocket, secure: boolean): Promise<void> {
 | |
|     if (secure) {
 | |
|       this.handleNewSecureConnection(socket as plugins.tls.TLSSocket);
 | |
|     } else {
 | |
|       this.handleNewConnection(socket as plugins.net.Socket);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Check if accepting new connections (interface requirement)
 | |
|    */
 | |
|   public canAcceptConnection(): boolean {
 | |
|     return !this.hasReachedMaxConnections();
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Clean up resources
 | |
|    */
 | |
|   public destroy(): void {
 | |
|     // Clear resource monitoring interval
 | |
|     if (this.resourceCheckInterval) {
 | |
|       clearInterval(this.resourceCheckInterval);
 | |
|       this.resourceCheckInterval = null;
 | |
|     }
 | |
|     
 | |
|     // Clear all cleanup timers
 | |
|     for (const timer of this.cleanupTimers) {
 | |
|       clearTimeout(timer);
 | |
|     }
 | |
|     this.cleanupTimers.clear();
 | |
|     
 | |
|     // Close all active connections
 | |
|     this.closeAllConnections();
 | |
|     
 | |
|     // Clear maps
 | |
|     this.activeConnections.clear();
 | |
|     this.ipConnections.clear();
 | |
|     
 | |
|     // Reset connection stats
 | |
|     this.connectionStats = {
 | |
|       totalConnections: 0,
 | |
|       activeConnections: 0,
 | |
|       peakConnections: 0,
 | |
|       rejectedConnections: 0,
 | |
|       closedConnections: 0,
 | |
|       erroredConnections: 0,
 | |
|       timedOutConnections: 0
 | |
|     };
 | |
|     
 | |
|     SmtpLogger.debug('ConnectionManager destroyed');
 | |
|   }
 | |
| } |