2025-05-21 12:52:24 +00:00
/ * *
* SMTP Connection Manager
* Responsible for managing socket connections to the SMTP server
* /
import * as plugins from '../../../plugins.js' ;
2025-05-22 23:02:37 +00:00
import type { IConnectionManager , ISmtpServer } from './interfaces.js' ;
2025-05-22 09:22:55 +00:00
import { SmtpResponseCode , SMTP_DEFAULTS , SmtpState } from './constants.js' ;
2025-05-21 12:52:24 +00:00
import { SmtpLogger } from './utils/logging.js' ;
2025-05-22 09:22:55 +00:00
import { adaptiveLogger } from './utils/adaptive-logging.js' ;
2025-05-21 12:52:24 +00:00
import { getSocketDetails , formatMultilineResponse } from './utils/helpers.js' ;
/ * *
* Manager for SMTP connections
* Handles connection setup , event listeners , and lifecycle management
2025-05-21 17:05:42 +00:00
* Provides resource management , connection tracking , and monitoring
2025-05-21 12:52:24 +00:00
* /
export class ConnectionManager implements IConnectionManager {
2025-05-22 23:02:37 +00:00
/ * *
* Reference to the SMTP server instance
* /
private smtpServer : ISmtpServer ;
2025-05-21 12:52:24 +00:00
/ * *
* Set of active socket connections
* /
private activeConnections : Set < plugins.net.Socket | plugins.tls.TLSSocket > = new Set ( ) ;
2025-05-21 17:05:42 +00:00
/ * *
* Connection tracking for resource management
* /
private connectionStats = {
totalConnections : 0 ,
activeConnections : 0 ,
peakConnections : 0 ,
rejectedConnections : 0 ,
closedConnections : 0 ,
erroredConnections : 0 ,
timedOutConnections : 0
} ;
/ * *
* Per - IP connection tracking for rate limiting
* /
private ipConnections : Map < string , {
count : number ;
firstConnection : number ;
lastConnection : number ;
} > = new Map ( ) ;
/ * *
* Resource monitoring interval
* /
private resourceCheckInterval : NodeJS.Timeout | null = null ;
2025-05-23 01:00:37 +00:00
/ * *
* Track cleanup timers so we can clear them
* /
private cleanupTimers : Set < NodeJS.Timeout > = new Set ( ) ;
2025-05-21 12:52:24 +00:00
/ * *
2025-05-21 17:05:42 +00:00
* SMTP server options with enhanced resource controls
2025-05-21 12:52:24 +00:00
* /
private options : {
hostname : string ;
maxConnections : number ;
socketTimeout : number ;
2025-05-21 17:05:42 +00:00
maxConnectionsPerIP : number ;
connectionRateLimit : number ;
connectionRateWindow : number ;
bufferSizeLimit : number ;
resourceCheckInterval : number ;
2025-05-21 12:52:24 +00:00
} ;
/ * *
2025-05-21 17:05:42 +00:00
* Creates a new connection manager with enhanced resource management
2025-05-22 23:02:37 +00:00
* @param smtpServer - SMTP server instance
2025-05-21 12:52:24 +00:00
* /
2025-05-22 23:02:37 +00:00
constructor ( smtpServer : ISmtpServer ) {
this . smtpServer = smtpServer ;
// Get options from server
const serverOptions = this . smtpServer . getOptions ( ) ;
2025-05-21 12:52:24 +00:00
2025-05-22 09:22:55 +00:00
// Default values for resource management - adjusted for production scalability
const DEFAULT_MAX_CONNECTIONS_PER_IP = 50 ; // Increased to support high-concurrency scenarios
const DEFAULT_CONNECTION_RATE_LIMIT = 200 ; // Increased for production load handling
2025-05-21 17:05:42 +00:00
const DEFAULT_CONNECTION_RATE_WINDOW = 60 * 1000 ; // 60 seconds window
const DEFAULT_BUFFER_SIZE_LIMIT = 10 * 1024 * 1024 ; // 10 MB
const DEFAULT_RESOURCE_CHECK_INTERVAL = 30 * 1000 ; // 30 seconds
2025-05-21 12:52:24 +00:00
this . options = {
2025-05-22 23:02:37 +00:00
hostname : serverOptions.hostname || SMTP_DEFAULTS . HOSTNAME ,
maxConnections : serverOptions.maxConnections || SMTP_DEFAULTS . MAX_CONNECTIONS ,
socketTimeout : serverOptions.socketTimeout || SMTP_DEFAULTS . SOCKET_TIMEOUT ,
maxConnectionsPerIP : DEFAULT_MAX_CONNECTIONS_PER_IP ,
connectionRateLimit : DEFAULT_CONNECTION_RATE_LIMIT ,
connectionRateWindow : DEFAULT_CONNECTION_RATE_WINDOW ,
bufferSizeLimit : DEFAULT_BUFFER_SIZE_LIMIT ,
resourceCheckInterval : DEFAULT_RESOURCE_CHECK_INTERVAL
2025-05-21 12:52:24 +00:00
} ;
2025-05-21 17:05:42 +00:00
// Start resource monitoring
this . startResourceMonitoring ( ) ;
}
/ * *
* Start resource monitoring interval to check resource usage
* /
private startResourceMonitoring ( ) : void {
// Clear any existing interval
if ( this . resourceCheckInterval ) {
clearInterval ( this . resourceCheckInterval ) ;
}
// Set up new interval
this . resourceCheckInterval = setInterval ( ( ) = > {
this . monitorResourceUsage ( ) ;
} , this . options . resourceCheckInterval ) ;
}
/ * *
* Monitor resource usage and log statistics
* /
private monitorResourceUsage ( ) : void {
// Calculate memory usage
const memoryUsage = process . memoryUsage ( ) ;
const memoryUsageMB = {
rss : Math.round ( memoryUsage . rss / 1024 / 1024 ) ,
heapTotal : Math.round ( memoryUsage . heapTotal / 1024 / 1024 ) ,
heapUsed : Math.round ( memoryUsage . heapUsed / 1024 / 1024 ) ,
external : Math.round ( memoryUsage . external / 1024 / 1024 )
} ;
// Calculate connection rate metrics
const activeIPs = Array . from ( this . ipConnections . entries ( ) )
. filter ( ( [ _ , data ] ) = > data . count > 0 ) . length ;
const highVolumeIPs = Array . from ( this . ipConnections . entries ( ) )
. filter ( ( [ _ , data ] ) = > data . count > this . options . connectionRateLimit / 2 ) . length ;
// Log resource usage with more detailed metrics
SmtpLogger . info ( 'Resource usage stats' , {
connections : {
active : this.activeConnections.size ,
total : this.connectionStats.totalConnections ,
peak : this.connectionStats.peakConnections ,
rejected : this.connectionStats.rejectedConnections ,
closed : this.connectionStats.closedConnections ,
errored : this.connectionStats.erroredConnections ,
timedOut : this.connectionStats.timedOutConnections
} ,
memory : memoryUsageMB ,
ipTracking : {
uniqueIPs : this.ipConnections.size ,
activeIPs : activeIPs ,
highVolumeIPs : highVolumeIPs
} ,
resourceLimits : {
maxConnections : this.options.maxConnections ,
maxConnectionsPerIP : this.options.maxConnectionsPerIP ,
connectionRateLimit : this.options.connectionRateLimit ,
bufferSizeLimit : Math.round ( this . options . bufferSizeLimit / 1024 / 1024 ) + 'MB'
}
} ) ;
// Check for potential DoS conditions
if ( highVolumeIPs > 3 ) {
SmtpLogger . warn ( ` Potential DoS detected: ${ highVolumeIPs } IPs with high connection rates ` ) ;
}
// Assess memory usage trends
if ( memoryUsageMB . heapUsed > 500 ) { // Over 500MB heap used
SmtpLogger . warn ( ` High memory usage detected: ${ memoryUsageMB . heapUsed } MB heap used ` ) ;
}
// Clean up expired IP rate limits and validate resource tracking
this . cleanupIpRateLimits ( ) ;
2025-05-21 12:52:24 +00:00
}
/ * *
2025-05-21 17:05:42 +00:00
* Clean up expired IP rate limits and perform additional resource monitoring
* /
private cleanupIpRateLimits ( ) : void {
const now = Date . now ( ) ;
const windowThreshold = now - this . options . connectionRateWindow ;
let activeIps = 0 ;
let removedEntries = 0 ;
// Iterate through IP connections and manage entries
for ( const [ ip , data ] of this . ipConnections . entries ( ) ) {
// If the last connection was before the window threshold + one extra window, remove the entry
if ( data . lastConnection < windowThreshold - this . options . connectionRateWindow ) {
// Remove stale entries to prevent memory growth
this . ipConnections . delete ( ip ) ;
removedEntries ++ ;
}
// If last connection was before the window threshold, reset the count
else if ( data . lastConnection < windowThreshold ) {
if ( data . count > 0 ) {
// Reset but keep the IP in the map with a zero count
this . ipConnections . set ( ip , {
count : 0 ,
firstConnection : now ,
lastConnection : now
} ) ;
}
} else {
// This IP is still active within the current window
activeIps ++ ;
}
}
// Log cleanup activity if significant changes occurred
if ( removedEntries > 0 ) {
SmtpLogger . debug ( ` IP rate limit cleanup: removed ${ removedEntries } stale entries, ${ this . ipConnections . size } remaining, ${ activeIps } active in current window ` ) ;
}
// Check for memory leaks in connection tracking
if ( this . activeConnections . size > 0 && this . connectionStats . activeConnections !== this . activeConnections . size ) {
SmtpLogger . warn ( ` Connection tracking inconsistency detected: stats.active= ${ this . connectionStats . activeConnections } , actual= ${ this . activeConnections . size } ` ) ;
// Fix the inconsistency
this . connectionStats . activeConnections = this . activeConnections . size ;
}
// Validate and clean leaked resources if needed
this . validateResourceTracking ( ) ;
}
/ * *
* Validate and repair resource tracking to prevent leaks
* /
private validateResourceTracking ( ) : void {
// Prepare a detailed report if inconsistencies are found
const inconsistenciesFound = [ ] ;
// 1. Check active connections count matches activeConnections set size
if ( this . connectionStats . activeConnections !== this . activeConnections . size ) {
inconsistenciesFound . push ( {
issue : 'Active connection count mismatch' ,
stats : this.connectionStats.activeConnections ,
actual : this.activeConnections.size ,
action : 'Auto-corrected'
} ) ;
this . connectionStats . activeConnections = this . activeConnections . size ;
}
// 2. Check for destroyed sockets in active connections
let destroyedSocketsCount = 0 ;
for ( const socket of this . activeConnections ) {
if ( socket . destroyed ) {
destroyedSocketsCount ++ ;
// This should not happen - remove destroyed sockets from tracking
this . activeConnections . delete ( socket ) ;
}
}
if ( destroyedSocketsCount > 0 ) {
inconsistenciesFound . push ( {
issue : 'Destroyed sockets in active list' ,
count : destroyedSocketsCount ,
action : 'Removed from tracking'
} ) ;
// Update active connections count after cleanup
this . connectionStats . activeConnections = this . activeConnections . size ;
}
// 3. Check for sessions without corresponding active connections
2025-05-22 23:02:37 +00:00
const sessionCount = this . smtpServer . getSessionManager ( ) . getSessionCount ( ) ;
2025-05-21 17:05:42 +00:00
if ( sessionCount > this . activeConnections . size ) {
inconsistenciesFound . push ( {
issue : 'Orphaned sessions' ,
sessions : sessionCount ,
connections : this.activeConnections.size ,
action : 'Session cleanup recommended'
} ) ;
}
// If any inconsistencies found, log a detailed report
if ( inconsistenciesFound . length > 0 ) {
SmtpLogger . warn ( 'Resource tracking inconsistencies detected and repaired' , { inconsistencies : inconsistenciesFound } ) ;
}
}
/ * *
* Handle a new connection with resource management
2025-05-21 12:52:24 +00:00
* @param socket - Client socket
* /
2025-05-23 00:06:07 +00:00
public async handleNewConnection ( socket : plugins.net.Socket ) : Promise < void > {
2025-05-21 17:05:42 +00:00
// Update connection stats
this . connectionStats . totalConnections ++ ;
this . connectionStats . activeConnections = this . activeConnections . size + 1 ;
if ( this . connectionStats . activeConnections > this . connectionStats . peakConnections ) {
this . connectionStats . peakConnections = this . connectionStats . activeConnections ;
}
// Get client IP
const remoteAddress = socket . remoteAddress || '0.0.0.0' ;
// Check rate limits by IP
if ( this . isIPRateLimited ( remoteAddress ) ) {
this . rejectConnection ( socket , 'Rate limit exceeded' ) ;
this . connectionStats . rejectedConnections ++ ;
return ;
}
// Check per-IP connection limit
if ( this . hasReachedIPConnectionLimit ( remoteAddress ) ) {
this . rejectConnection ( socket , 'Too many connections from this IP' ) ;
this . connectionStats . rejectedConnections ++ ;
return ;
}
// Check if maximum global connections reached
2025-05-21 12:52:24 +00:00
if ( this . hasReachedMaxConnections ( ) ) {
this . rejectConnection ( socket , 'Too many connections' ) ;
2025-05-21 17:05:42 +00:00
this . connectionStats . rejectedConnections ++ ;
2025-05-21 12:52:24 +00:00
return ;
}
// Add socket to active connections
this . activeConnections . add ( socket ) ;
// Set up socket options
socket . setKeepAlive ( true ) ;
socket . setTimeout ( this . options . socketTimeout ) ;
2025-05-21 17:05:42 +00:00
// Explicitly set socket buffer sizes to prevent memory issues
socket . setNoDelay ( true ) ; // Disable Nagle's algorithm for better responsiveness
// Set limits on socket buffer size if supported by Node.js version
try {
// Here we set reasonable buffer limits to prevent memory exhaustion attacks
const highWaterMark = 64 * 1024 ; // 64 KB
2025-05-21 17:33:16 +00:00
// Note: Socket high water mark methods can't be set directly in newer Node.js versions
// These would need to be set during socket creation or with a different API
2025-05-21 17:05:42 +00:00
} catch ( error ) {
// Ignore errors from older Node.js versions that don't support these methods
SmtpLogger . debug ( ` Could not set socket buffer limits: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
}
// Track this IP connection
this . trackIPConnection ( remoteAddress ) ;
2025-05-21 12:52:24 +00:00
// Set up event handlers
this . setupSocketEventHandlers ( socket ) ;
// Create a session for this connection
2025-05-22 23:02:37 +00:00
this . smtpServer . getSessionManager ( ) . createSession ( socket , false ) ;
2025-05-21 12:52:24 +00:00
2025-05-22 09:22:55 +00:00
// Log the new connection using adaptive logger
2025-05-21 12:52:24 +00:00
const socketDetails = getSocketDetails ( socket ) ;
2025-05-22 09:22:55 +00:00
adaptiveLogger . logConnection ( socket , 'connect' ) ;
// Update adaptive logger with current connection count
adaptiveLogger . updateConnectionCount ( this . connectionStats . activeConnections ) ;
2025-05-21 12:52:24 +00:00
// Send greeting
this . sendGreeting ( socket ) ;
}
/ * *
2025-05-21 17:05:42 +00:00
* Check if an IP has exceeded the rate limit
* @param ip - Client IP address
* @returns True if rate limited
* /
private isIPRateLimited ( ip : string ) : boolean {
const now = Date . now ( ) ;
const ipData = this . ipConnections . get ( ip ) ;
if ( ! ipData ) {
return false ; // No previous connections
}
// Check if we're within the rate window
const isWithinWindow = now - ipData . firstConnection <= this . options . connectionRateWindow ;
// If within window and count exceeds limit, rate limit is applied
if ( isWithinWindow && ipData . count >= this . options . connectionRateLimit ) {
SmtpLogger . warn ( ` Rate limit exceeded for IP ${ ip } : ${ ipData . count } connections in ${ Math . round ( ( now - ipData . firstConnection ) / 1000 ) } s ` ) ;
return true ;
}
return false ;
}
/ * *
* Track a new connection from an IP
* @param ip - Client IP address
* /
private trackIPConnection ( ip : string ) : void {
const now = Date . now ( ) ;
const ipData = this . ipConnections . get ( ip ) ;
if ( ! ipData ) {
// First connection from this IP
this . ipConnections . set ( ip , {
count : 1 ,
firstConnection : now ,
lastConnection : now
} ) ;
} else {
// Check if we need to reset the window
if ( now - ipData . lastConnection > this . options . connectionRateWindow ) {
// Reset the window
this . ipConnections . set ( ip , {
count : 1 ,
firstConnection : now ,
lastConnection : now
} ) ;
} else {
// Increment within the current window
this . ipConnections . set ( ip , {
count : ipData.count + 1 ,
firstConnection : ipData.firstConnection ,
lastConnection : now
} ) ;
}
}
}
/ * *
* Check if an IP has reached its connection limit
* @param ip - Client IP address
* @returns True if limit reached
* /
private hasReachedIPConnectionLimit ( ip : string ) : boolean {
let ipConnectionCount = 0 ;
// Count active connections from this IP
for ( const socket of this . activeConnections ) {
if ( socket . remoteAddress === ip ) {
ipConnectionCount ++ ;
}
}
return ipConnectionCount >= this . options . maxConnectionsPerIP ;
}
/ * *
* Handle a new secure TLS connection with resource management
2025-05-21 12:52:24 +00:00
* @param socket - Client TLS socket
* /
2025-05-23 00:06:07 +00:00
public async handleNewSecureConnection ( socket : plugins.tls.TLSSocket ) : Promise < void > {
2025-05-21 17:05:42 +00:00
// Update connection stats
this . connectionStats . totalConnections ++ ;
this . connectionStats . activeConnections = this . activeConnections . size + 1 ;
if ( this . connectionStats . activeConnections > this . connectionStats . peakConnections ) {
this . connectionStats . peakConnections = this . connectionStats . activeConnections ;
}
// Get client IP
const remoteAddress = socket . remoteAddress || '0.0.0.0' ;
// Check rate limits by IP
if ( this . isIPRateLimited ( remoteAddress ) ) {
this . rejectConnection ( socket , 'Rate limit exceeded' ) ;
this . connectionStats . rejectedConnections ++ ;
return ;
}
// Check per-IP connection limit
if ( this . hasReachedIPConnectionLimit ( remoteAddress ) ) {
this . rejectConnection ( socket , 'Too many connections from this IP' ) ;
this . connectionStats . rejectedConnections ++ ;
return ;
}
// Check if maximum global connections reached
2025-05-21 12:52:24 +00:00
if ( this . hasReachedMaxConnections ( ) ) {
this . rejectConnection ( socket , 'Too many connections' ) ;
2025-05-21 17:05:42 +00:00
this . connectionStats . rejectedConnections ++ ;
2025-05-21 12:52:24 +00:00
return ;
}
// Add socket to active connections
this . activeConnections . add ( socket ) ;
// Set up socket options
socket . setKeepAlive ( true ) ;
socket . setTimeout ( this . options . socketTimeout ) ;
2025-05-21 17:05:42 +00:00
// Explicitly set socket buffer sizes to prevent memory issues
socket . setNoDelay ( true ) ; // Disable Nagle's algorithm for better responsiveness
// Set limits on socket buffer size if supported by Node.js version
try {
// Here we set reasonable buffer limits to prevent memory exhaustion attacks
const highWaterMark = 64 * 1024 ; // 64 KB
2025-05-21 17:33:16 +00:00
// Note: Socket high water mark methods can't be set directly in newer Node.js versions
// These would need to be set during socket creation or with a different API
2025-05-21 17:05:42 +00:00
} catch ( error ) {
// Ignore errors from older Node.js versions that don't support these methods
SmtpLogger . debug ( ` Could not set socket buffer limits: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
}
// Track this IP connection
this . trackIPConnection ( remoteAddress ) ;
2025-05-21 12:52:24 +00:00
// Set up event handlers
this . setupSocketEventHandlers ( socket ) ;
// Create a session for this connection
2025-05-22 23:02:37 +00:00
this . smtpServer . getSessionManager ( ) . createSession ( socket , true ) ;
2025-05-21 12:52:24 +00:00
2025-05-22 09:22:55 +00:00
// Log the new secure connection using adaptive logger
adaptiveLogger . logConnection ( socket , 'connect' ) ;
// Update adaptive logger with current connection count
adaptiveLogger . updateConnectionCount ( this . connectionStats . activeConnections ) ;
2025-05-21 12:52:24 +00:00
// Send greeting
this . sendGreeting ( socket ) ;
}
/ * *
2025-05-21 17:05:42 +00:00
* Set up event handlers for a socket with enhanced resource management
2025-05-21 12:52:24 +00:00
* @param socket - Client socket
* /
public setupSocketEventHandlers ( socket : plugins.net.Socket | plugins . tls . TLSSocket ) : void {
// Store existing socket event handlers before adding new ones
2025-05-21 13:42:12 +00:00
const existingDataHandler = socket . listeners ( 'data' ) [ 0 ] as ( . . . args : any [ ] ) = > void ;
const existingCloseHandler = socket . listeners ( 'close' ) [ 0 ] as ( . . . args : any [ ] ) = > void ;
const existingErrorHandler = socket . listeners ( 'error' ) [ 0 ] as ( . . . args : any [ ] ) = > void ;
const existingTimeoutHandler = socket . listeners ( 'timeout' ) [ 0 ] as ( . . . args : any [ ] ) = > void ;
2025-05-21 12:52:24 +00:00
// Remove existing event handlers if they exist
if ( existingDataHandler ) socket . removeListener ( 'data' , existingDataHandler ) ;
if ( existingCloseHandler ) socket . removeListener ( 'close' , existingCloseHandler ) ;
if ( existingErrorHandler ) socket . removeListener ( 'error' , existingErrorHandler ) ;
if ( existingTimeoutHandler ) socket . removeListener ( 'timeout' , existingTimeoutHandler ) ;
2025-05-21 17:05:42 +00:00
// Data event - process incoming data from the client with resource limits
2025-05-21 12:52:24 +00:00
let buffer = '' ;
2025-05-21 17:05:42 +00:00
let totalBytesReceived = 0 ;
2025-05-23 01:00:37 +00:00
socket . on ( 'data' , async ( data ) = > {
2025-05-21 17:05:42 +00:00
try {
// Get current session and update activity timestamp
2025-05-22 23:02:37 +00:00
const session = this . smtpServer . getSessionManager ( ) . getSession ( socket ) ;
2025-05-21 17:05:42 +00:00
if ( session ) {
2025-05-22 23:02:37 +00:00
this . smtpServer . getSessionManager ( ) . updateSessionActivity ( session ) ;
2025-05-21 17:05:42 +00:00
}
2025-05-22 09:22:55 +00:00
// Check if we're in DATA receiving mode - handle differently
if ( session && session . state === SmtpState . DATA_RECEIVING ) {
// In DATA mode, pass raw chunks directly to command handler with special marker
// Don't line-buffer large email content
try {
const dataString = data . toString ( 'utf8' ) ;
// Use a special prefix to indicate this is raw data, not a command line
2025-05-23 01:00:37 +00:00
// CRITICAL FIX: Must await to prevent async pile-up
await this . smtpServer . getCommandHandler ( ) . processCommand ( socket , ` __RAW_DATA__ ${ dataString } ` ) ;
2025-05-22 09:22:55 +00:00
return ;
} catch ( dataError ) {
SmtpLogger . error ( ` Data handler error during DATA mode: ${ dataError instanceof Error ? dataError.message : String ( dataError ) } ` ) ;
socket . destroy ( ) ;
return ;
}
}
// For command mode, continue with line-buffered processing
2025-05-21 17:05:42 +00:00
// Check buffer size limits to prevent memory attacks
totalBytesReceived += data . length ;
2025-05-21 12:52:24 +00:00
2025-05-21 17:05:42 +00:00
if ( buffer . length > this . options . bufferSizeLimit ) {
// Buffer is too large, reject the connection
SmtpLogger . warn ( ` Buffer size limit exceeded: ${ buffer . length } bytes for ${ socket . remoteAddress } ` ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . EXCEEDED_STORAGE } Message too large, disconnecting ` ) ;
socket . destroy ( ) ;
return ;
2025-05-21 12:52:24 +00:00
}
2025-05-21 17:05:42 +00:00
// Impose a total transfer limit to prevent DoS
if ( totalBytesReceived > this . options . bufferSizeLimit * 2 ) {
SmtpLogger . warn ( ` Total transfer limit exceeded: ${ totalBytesReceived } bytes for ${ socket . remoteAddress } ` ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . EXCEEDED_STORAGE } Transfer limit exceeded, disconnecting ` ) ;
socket . destroy ( ) ;
return ;
}
// Convert buffer to string safely with explicit encoding
const dataString = data . toString ( 'utf8' ) ;
// Buffer incoming data
buffer += dataString ;
// Process complete lines
let lineEndPos ;
while ( ( lineEndPos = buffer . indexOf ( SMTP_DEFAULTS . CRLF ) ) !== - 1 ) {
// Extract a complete line
const line = buffer . substring ( 0 , lineEndPos ) ;
buffer = buffer . substring ( lineEndPos + 2 ) ; // +2 to skip CRLF
// Check line length to prevent extremely long lines
if ( line . length > 4096 ) { // 4KB line limit is reasonable for SMTP
SmtpLogger . warn ( ` Line length limit exceeded: ${ line . length } bytes for ${ socket . remoteAddress } ` ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . SYNTAX_ERROR } Line too long, disconnecting ` ) ;
socket . destroy ( ) ;
return ;
}
// Process non-empty lines
if ( line . length > 0 ) {
try {
2025-05-23 01:00:37 +00:00
// CRITICAL FIX: Must await processCommand to prevent async pile-up
// This was causing the busy loop with high CPU usage when many empty lines were processed
await this . smtpServer . getCommandHandler ( ) . processCommand ( socket , line ) ;
} catch ( error ) {
2025-05-21 17:05:42 +00:00
// Handle any errors in command processing
2025-05-23 01:00:37 +00:00
SmtpLogger . error ( ` Command handler error: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . LOCAL_ERROR } Internal server error ` ) ;
2025-05-21 17:05:42 +00:00
// If there's a severe error, close the connection
2025-05-23 01:00:37 +00:00
if ( error instanceof Error &&
( error . message . includes ( 'fatal' ) || error . message . includes ( 'critical' ) ) ) {
2025-05-21 17:05:42 +00:00
socket . destroy ( ) ;
return ;
}
}
}
}
// If buffer is getting too large without CRLF, it might be a DoS attempt
if ( buffer . length > 10240 ) { // 10KB is a reasonable limit for a line without CRLF
SmtpLogger . warn ( ` Incomplete line too large: ${ buffer . length } bytes for ${ socket . remoteAddress } ` ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . SYNTAX_ERROR } Incomplete line too large, disconnecting ` ) ;
socket . destroy ( ) ;
}
} catch ( error ) {
// Handle any unexpected errors during data processing
SmtpLogger . error ( ` Data handler error: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
socket . destroy ( ) ;
}
} ) ;
// Add drain event handler to manage flow control
socket . on ( 'drain' , ( ) = > {
// Socket buffer has been emptied, resume data flow if needed
if ( socket . isPaused ( ) ) {
socket . resume ( ) ;
SmtpLogger . debug ( ` Resumed socket for ${ socket . remoteAddress } after drain ` ) ;
2025-05-21 12:52:24 +00:00
}
} ) ;
// Close event - clean up when connection is closed
socket . on ( 'close' , ( hadError ) = > {
this . handleSocketClose ( socket , hadError ) ;
} ) ;
// Error event - handle socket errors
socket . on ( 'error' , ( err ) = > {
this . handleSocketError ( socket , err ) ;
} ) ;
// Timeout event - handle socket timeouts
socket . on ( 'timeout' , ( ) = > {
this . handleSocketTimeout ( socket ) ;
} ) ;
}
/ * *
* Get the current connection count
* @returns Number of active connections
* /
public getConnectionCount ( ) : number {
return this . activeConnections . size ;
}
/ * *
* Check if the server has reached the maximum number of connections
* @returns True if max connections reached
* /
public hasReachedMaxConnections ( ) : boolean {
return this . activeConnections . size >= this . options . maxConnections ;
}
/ * *
* Close all active connections
* /
public closeAllConnections ( ) : void {
const connectionCount = this . activeConnections . size ;
if ( connectionCount === 0 ) {
return ;
}
SmtpLogger . info ( ` Closing all connections (count: ${ connectionCount } ) ` ) ;
for ( const socket of this . activeConnections ) {
try {
// Send service closing notification
this . sendServiceClosing ( socket ) ;
2025-05-23 01:00:37 +00:00
// End the socket gracefully
2025-05-21 12:52:24 +00:00
socket . end ( ) ;
2025-05-23 01:00:37 +00:00
// Force destroy after a short delay if not already destroyed
const destroyTimer = setTimeout ( ( ) = > {
if ( ! socket . destroyed ) {
socket . destroy ( ) ;
}
this . cleanupTimers . delete ( destroyTimer ) ;
} , 100 ) ;
this . cleanupTimers . add ( destroyTimer ) ;
2025-05-21 12:52:24 +00:00
} catch ( error ) {
SmtpLogger . error ( ` Error closing connection: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
2025-05-23 01:00:37 +00:00
// Force destroy on error
try {
socket . destroy ( ) ;
} catch ( e ) {
// Ignore destroy errors
}
2025-05-21 12:52:24 +00:00
}
}
// Clear active connections
this . activeConnections . clear ( ) ;
2025-05-22 09:22:55 +00:00
// Stop resource monitoring to prevent hanging timers
if ( this . resourceCheckInterval ) {
clearInterval ( this . resourceCheckInterval ) ;
this . resourceCheckInterval = null ;
}
2025-05-21 12:52:24 +00:00
}
/ * *
* Handle socket close event
* @param socket - Client socket
* @param hadError - Whether the socket was closed due to error
* /
private handleSocketClose ( socket : plugins.net.Socket | plugins . tls . TLSSocket , hadError : boolean ) : void {
2025-05-21 17:05:42 +00:00
try {
// Update connection statistics
this . connectionStats . closedConnections ++ ;
this . connectionStats . activeConnections = this . activeConnections . size - 1 ;
// Get socket details for logging
const socketDetails = getSocketDetails ( socket ) ;
const socketId = ` ${ socketDetails . remoteAddress } : ${ socketDetails . remotePort } ` ;
// Log with appropriate level based on whether there was an error
if ( hadError ) {
SmtpLogger . warn ( ` Socket closed with error: ${ socketId } ` ) ;
} else {
SmtpLogger . debug ( ` Socket closed normally: ${ socketId } ` ) ;
}
// Get the session before removing it
2025-05-22 23:02:37 +00:00
const session = this . smtpServer . getSessionManager ( ) . getSession ( socket ) ;
2025-05-21 17:05:42 +00:00
// Remove from active connections
this . activeConnections . delete ( socket ) ;
// Remove from session manager
2025-05-22 23:02:37 +00:00
this . smtpServer . getSessionManager ( ) . removeSession ( socket ) ;
2025-05-21 17:05:42 +00:00
// Cancel any timeout ID stored in the session
if ( session ? . dataTimeoutId ) {
clearTimeout ( session . dataTimeoutId ) ;
}
// Log connection close with session details if available
2025-05-22 09:22:55 +00:00
adaptiveLogger . logConnection ( socket , 'close' , session ) ;
// Update adaptive logger with new connection count
adaptiveLogger . updateConnectionCount ( this . connectionStats . activeConnections ) ;
2025-05-21 17:05:42 +00:00
} catch ( error ) {
// Handle any unexpected errors during cleanup
SmtpLogger . error ( ` Error in handleSocketClose: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
// Ensure socket is removed from active connections even if an error occurs
this . activeConnections . delete ( socket ) ;
}
2025-05-21 12:52:24 +00:00
}
/ * *
* Handle socket error event
* @param socket - Client socket
* @param error - Error object
* /
private handleSocketError ( socket : plugins.net.Socket | plugins . tls . TLSSocket , error : Error ) : void {
2025-05-21 17:05:42 +00:00
try {
// Update connection statistics
this . connectionStats . erroredConnections ++ ;
// Get socket details for context
const socketDetails = getSocketDetails ( socket ) ;
const socketId = ` ${ socketDetails . remoteAddress } : ${ socketDetails . remotePort } ` ;
// Get the session
2025-05-22 23:02:37 +00:00
const session = this . smtpServer . getSessionManager ( ) . getSession ( socket ) ;
2025-05-21 17:05:42 +00:00
// Detailed error logging with context information
SmtpLogger . error ( ` Socket error for ${ socketId } : ${ error . message } ` , {
errorCode : ( error as any ) . code ,
errorStack : error.stack ,
sessionId : session?.id ,
sessionState : session?.state ,
remoteAddress : socketDetails.remoteAddress ,
remotePort : socketDetails.remotePort
} ) ;
2025-05-22 09:22:55 +00:00
// Log the error for connection tracking using adaptive logger
adaptiveLogger . logConnection ( socket , 'error' , session , error ) ;
2025-05-21 17:05:42 +00:00
// Cancel any timeout ID stored in the session
if ( session ? . dataTimeoutId ) {
clearTimeout ( session . dataTimeoutId ) ;
}
// Close the socket if not already closed
if ( ! socket . destroyed ) {
socket . destroy ( ) ;
}
// Remove from active connections (cleanup after error)
this . activeConnections . delete ( socket ) ;
// Remove from session manager
2025-05-22 23:02:37 +00:00
this . smtpServer . getSessionManager ( ) . removeSession ( socket ) ;
2025-05-21 17:05:42 +00:00
} catch ( handlerError ) {
// Meta-error handling (errors in the error handler)
SmtpLogger . error ( ` Error in handleSocketError: ${ handlerError instanceof Error ? handlerError.message : String ( handlerError ) } ` ) ;
// Ensure socket is destroyed and removed from active connections
if ( ! socket . destroyed ) {
socket . destroy ( ) ;
}
this . activeConnections . delete ( socket ) ;
2025-05-21 12:52:24 +00:00
}
}
/ * *
* Handle socket timeout event
* @param socket - Client socket
* /
private handleSocketTimeout ( socket : plugins.net.Socket | plugins . tls . TLSSocket ) : void {
2025-05-21 17:05:42 +00:00
try {
// Update connection statistics
this . connectionStats . timedOutConnections ++ ;
2025-05-21 12:52:24 +00:00
2025-05-21 17:05:42 +00:00
// Get socket details for context
2025-05-21 12:52:24 +00:00
const socketDetails = getSocketDetails ( socket ) ;
2025-05-21 17:05:42 +00:00
const socketId = ` ${ socketDetails . remoteAddress } : ${ socketDetails . remotePort } ` ;
// Get the session
2025-05-22 23:02:37 +00:00
const session = this . smtpServer . getSessionManager ( ) . getSession ( socket ) ;
2025-05-21 17:05:42 +00:00
// Get timing information for better debugging
const now = Date . now ( ) ;
const idleTime = session ? . lastActivity ? now - session . lastActivity : 'unknown' ;
if ( session ) {
// Log the timeout with extended details
SmtpLogger . warn ( ` Socket timeout from ${ session . remoteAddress } ` , {
sessionId : session.id ,
remoteAddress : session.remoteAddress ,
state : session.state ,
timeout : this.options.socketTimeout ,
idleTime : idleTime ,
emailState : session.envelope?.mailFrom ? 'has-sender' : 'no-sender' ,
recipientCount : session.envelope?.rcptTo?.length || 0
} ) ;
// Cancel any timeout ID stored in the session
if ( session . dataTimeoutId ) {
clearTimeout ( session . dataTimeoutId ) ;
}
// Send timeout notification to client
this . sendResponse ( socket , ` ${ SmtpResponseCode . SERVICE_NOT_AVAILABLE } Connection timeout - closing connection ` ) ;
} else {
// Log timeout without session context
SmtpLogger . warn ( ` Socket timeout without session from ${ socketId } ` ) ;
}
// Close the socket gracefully
try {
socket . end ( ) ;
// Set a forced close timeout in case socket.end() doesn't close the connection
2025-05-23 01:00:37 +00:00
const timeoutDestroyTimer = setTimeout ( ( ) = > {
2025-05-21 17:05:42 +00:00
if ( ! socket . destroyed ) {
SmtpLogger . warn ( ` Forcing destroy of timed out socket: ${ socketId } ` ) ;
socket . destroy ( ) ;
}
2025-05-23 01:00:37 +00:00
this . cleanupTimers . delete ( timeoutDestroyTimer ) ;
2025-05-21 17:05:42 +00:00
} , 5000 ) ; // 5 second grace period for socket to end properly
2025-05-23 01:00:37 +00:00
this . cleanupTimers . add ( timeoutDestroyTimer ) ;
2025-05-21 17:05:42 +00:00
} catch ( error ) {
SmtpLogger . error ( ` Error ending timed out socket: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
// Ensure socket is destroyed even if end() fails
if ( ! socket . destroyed ) {
socket . destroy ( ) ;
}
}
// Clean up resources
this . activeConnections . delete ( socket ) ;
2025-05-22 23:02:37 +00:00
this . smtpServer . getSessionManager ( ) . removeSession ( socket ) ;
2025-05-21 17:05:42 +00:00
} catch ( handlerError ) {
// Handle any unexpected errors during timeout handling
SmtpLogger . error ( ` Error in handleSocketTimeout: ${ handlerError instanceof Error ? handlerError.message : String ( handlerError ) } ` ) ;
// Ensure socket is destroyed and removed from tracking
if ( ! socket . destroyed ) {
socket . destroy ( ) ;
}
this . activeConnections . delete ( socket ) ;
2025-05-21 12:52:24 +00:00
}
}
/ * *
* Reject a connection
* @param socket - Client socket
* @param reason - Reason for rejection
* /
private rejectConnection ( socket : plugins.net.Socket | plugins . tls . TLSSocket , reason : string ) : void {
// Log the rejection
const socketDetails = getSocketDetails ( socket ) ;
SmtpLogger . warn ( ` Connection rejected from ${ socketDetails . remoteAddress } : ${ socketDetails . remotePort } : ${ reason } ` ) ;
// Send rejection message
this . sendResponse ( socket , ` ${ SmtpResponseCode . SERVICE_NOT_AVAILABLE } ${ this . options . hostname } Service temporarily unavailable - ${ reason } ` ) ;
// Close the socket
try {
socket . end ( ) ;
} catch ( error ) {
SmtpLogger . error ( ` Error ending rejected socket: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
}
}
/ * *
* Send greeting message
* @param socket - Client socket
* /
private sendGreeting ( socket : plugins.net.Socket | plugins . tls . TLSSocket ) : void {
const greeting = ` ${ SmtpResponseCode . SERVICE_READY } ${ this . options . hostname } ESMTP service ready ` ;
this . sendResponse ( socket , greeting ) ;
}
/ * *
* Send service closing notification
* @param socket - Client socket
* /
private sendServiceClosing ( socket : plugins.net.Socket | plugins . tls . TLSSocket ) : void {
const message = ` ${ SmtpResponseCode . SERVICE_CLOSING } ${ this . options . hostname } Service closing transmission channel ` ;
this . sendResponse ( socket , message ) ;
}
/ * *
* Send response to client
* @param socket - Client socket
* @param response - Response to send
* /
private sendResponse ( socket : plugins.net.Socket | plugins . tls . TLSSocket , response : string ) : void {
2025-05-22 18:38:04 +00:00
// Check if socket is still writable before attempting to write
if ( socket . destroyed || socket . readyState !== 'open' || ! socket . writable ) {
SmtpLogger . debug ( ` Skipping response to closed/destroyed socket: ${ response } ` , {
remoteAddress : socket.remoteAddress ,
remotePort : socket.remotePort ,
destroyed : socket.destroyed ,
readyState : socket.readyState ,
writable : socket.writable
} ) ;
return ;
}
2025-05-21 12:52:24 +00:00
try {
socket . write ( ` ${ response } ${ SMTP_DEFAULTS . CRLF } ` ) ;
2025-05-22 09:22:55 +00:00
adaptiveLogger . logResponse ( response , socket ) ;
2025-05-21 12:52:24 +00:00
} catch ( error ) {
// Log error and destroy socket
SmtpLogger . error ( ` Error sending response: ${ error instanceof Error ? error.message : String ( error ) } ` , {
response ,
remoteAddress : socket.remoteAddress ,
remotePort : socket.remotePort ,
error : error instanceof Error ? error : new Error ( String ( error ) )
} ) ;
socket . destroy ( ) ;
}
}
2025-05-22 23:02:37 +00:00
2025-05-23 00:06:07 +00:00
/ * *
* Handle a new connection ( interface requirement )
* /
public async handleConnection ( socket : plugins.net.Socket | plugins . tls . TLSSocket , secure : boolean ) : Promise < void > {
if ( secure ) {
this . handleNewSecureConnection ( socket as plugins . tls . TLSSocket ) ;
} else {
this . handleNewConnection ( socket as plugins . net . Socket ) ;
}
}
/ * *
* Check if accepting new connections ( interface requirement )
* /
public canAcceptConnection ( ) : boolean {
return ! this . hasReachedMaxConnections ( ) ;
}
2025-05-22 23:02:37 +00:00
/ * *
* Clean up resources
* /
public destroy ( ) : void {
// Clear resource monitoring interval
if ( this . resourceCheckInterval ) {
clearInterval ( this . resourceCheckInterval ) ;
this . resourceCheckInterval = null ;
}
2025-05-23 01:00:37 +00:00
// Clear all cleanup timers
for ( const timer of this . cleanupTimers ) {
clearTimeout ( timer ) ;
}
this . cleanupTimers . clear ( ) ;
2025-05-22 23:02:37 +00:00
// Close all active connections
this . closeAllConnections ( ) ;
// Clear maps
this . activeConnections . clear ( ) ;
2025-05-23 00:06:07 +00:00
this . ipConnections . clear ( ) ;
// Reset connection stats
this . connectionStats = {
totalConnections : 0 ,
activeConnections : 0 ,
peakConnections : 0 ,
rejectedConnections : 0 ,
closedConnections : 0 ,
erroredConnections : 0 ,
timedOutConnections : 0
} ;
2025-05-22 23:02:37 +00:00
SmtpLogger . debug ( 'ConnectionManager destroyed' ) ;
}
2025-05-21 12:52:24 +00:00
}