1043 lines
37 KiB
TypeScript
1043 lines
37 KiB
TypeScript
/**
|
|
* SMTP Connection Manager
|
|
* Responsible for managing socket connections to the SMTP server
|
|
*/
|
|
|
|
import * as plugins from '../../../plugins.js';
|
|
import type { IConnectionManager, ISmtpServer } from './interfaces.js';
|
|
import { SmtpResponseCode, SMTP_DEFAULTS, SmtpState } from './constants.js';
|
|
import { SmtpLogger } from './utils/logging.js';
|
|
import { adaptiveLogger } from './utils/adaptive-logging.js';
|
|
import { getSocketDetails, formatMultilineResponse } from './utils/helpers.js';
|
|
|
|
/**
|
|
* Manager for SMTP connections
|
|
* Handles connection setup, event listeners, and lifecycle management
|
|
* Provides resource management, connection tracking, and monitoring
|
|
*/
|
|
export class ConnectionManager implements IConnectionManager {
|
|
/**
|
|
* Reference to the SMTP server instance
|
|
*/
|
|
private smtpServer: ISmtpServer;
|
|
|
|
/**
|
|
* Set of active socket connections
|
|
*/
|
|
private activeConnections: Set<plugins.net.Socket | plugins.tls.TLSSocket> = new Set();
|
|
|
|
/**
|
|
* Connection tracking for resource management
|
|
*/
|
|
private connectionStats = {
|
|
totalConnections: 0,
|
|
activeConnections: 0,
|
|
peakConnections: 0,
|
|
rejectedConnections: 0,
|
|
closedConnections: 0,
|
|
erroredConnections: 0,
|
|
timedOutConnections: 0
|
|
};
|
|
|
|
/**
|
|
* Per-IP connection tracking for rate limiting
|
|
*/
|
|
private ipConnections: Map<string, {
|
|
count: number;
|
|
firstConnection: number;
|
|
lastConnection: number;
|
|
}> = new Map();
|
|
|
|
/**
|
|
* Resource monitoring interval
|
|
*/
|
|
private resourceCheckInterval: NodeJS.Timeout | null = null;
|
|
|
|
/**
|
|
* Track cleanup timers so we can clear them
|
|
*/
|
|
private cleanupTimers: Set<NodeJS.Timeout> = new Set();
|
|
|
|
/**
|
|
* SMTP server options with enhanced resource controls
|
|
*/
|
|
private options: {
|
|
hostname: string;
|
|
maxConnections: number;
|
|
socketTimeout: number;
|
|
maxConnectionsPerIP: number;
|
|
connectionRateLimit: number;
|
|
connectionRateWindow: number;
|
|
bufferSizeLimit: number;
|
|
resourceCheckInterval: number;
|
|
};
|
|
|
|
/**
|
|
* Creates a new connection manager with enhanced resource management
|
|
* @param smtpServer - SMTP server instance
|
|
*/
|
|
constructor(smtpServer: ISmtpServer) {
|
|
this.smtpServer = smtpServer;
|
|
|
|
// Get options from server
|
|
const serverOptions = this.smtpServer.getOptions();
|
|
|
|
// Default values for resource management - adjusted for production scalability
|
|
const DEFAULT_MAX_CONNECTIONS_PER_IP = 50; // Increased to support high-concurrency scenarios
|
|
const DEFAULT_CONNECTION_RATE_LIMIT = 200; // Increased for production load handling
|
|
const DEFAULT_CONNECTION_RATE_WINDOW = 60 * 1000; // 60 seconds window
|
|
const DEFAULT_BUFFER_SIZE_LIMIT = 10 * 1024 * 1024; // 10 MB
|
|
const DEFAULT_RESOURCE_CHECK_INTERVAL = 30 * 1000; // 30 seconds
|
|
|
|
this.options = {
|
|
hostname: serverOptions.hostname || SMTP_DEFAULTS.HOSTNAME,
|
|
maxConnections: serverOptions.maxConnections || SMTP_DEFAULTS.MAX_CONNECTIONS,
|
|
socketTimeout: serverOptions.socketTimeout || SMTP_DEFAULTS.SOCKET_TIMEOUT,
|
|
maxConnectionsPerIP: DEFAULT_MAX_CONNECTIONS_PER_IP,
|
|
connectionRateLimit: DEFAULT_CONNECTION_RATE_LIMIT,
|
|
connectionRateWindow: DEFAULT_CONNECTION_RATE_WINDOW,
|
|
bufferSizeLimit: DEFAULT_BUFFER_SIZE_LIMIT,
|
|
resourceCheckInterval: DEFAULT_RESOURCE_CHECK_INTERVAL
|
|
};
|
|
|
|
// Start resource monitoring
|
|
this.startResourceMonitoring();
|
|
}
|
|
|
|
/**
|
|
* Start resource monitoring interval to check resource usage
|
|
*/
|
|
private startResourceMonitoring(): void {
|
|
// Clear any existing interval
|
|
if (this.resourceCheckInterval) {
|
|
clearInterval(this.resourceCheckInterval);
|
|
}
|
|
|
|
// Set up new interval
|
|
this.resourceCheckInterval = setInterval(() => {
|
|
this.monitorResourceUsage();
|
|
}, this.options.resourceCheckInterval);
|
|
}
|
|
|
|
/**
|
|
* Monitor resource usage and log statistics
|
|
*/
|
|
private monitorResourceUsage(): void {
|
|
// Calculate memory usage
|
|
const memoryUsage = process.memoryUsage();
|
|
const memoryUsageMB = {
|
|
rss: Math.round(memoryUsage.rss / 1024 / 1024),
|
|
heapTotal: Math.round(memoryUsage.heapTotal / 1024 / 1024),
|
|
heapUsed: Math.round(memoryUsage.heapUsed / 1024 / 1024),
|
|
external: Math.round(memoryUsage.external / 1024 / 1024)
|
|
};
|
|
|
|
// Calculate connection rate metrics
|
|
const activeIPs = Array.from(this.ipConnections.entries())
|
|
.filter(([_, data]) => data.count > 0).length;
|
|
|
|
const highVolumeIPs = Array.from(this.ipConnections.entries())
|
|
.filter(([_, data]) => data.count > this.options.connectionRateLimit / 2).length;
|
|
|
|
// Log resource usage with more detailed metrics
|
|
SmtpLogger.info('Resource usage stats', {
|
|
connections: {
|
|
active: this.activeConnections.size,
|
|
total: this.connectionStats.totalConnections,
|
|
peak: this.connectionStats.peakConnections,
|
|
rejected: this.connectionStats.rejectedConnections,
|
|
closed: this.connectionStats.closedConnections,
|
|
errored: this.connectionStats.erroredConnections,
|
|
timedOut: this.connectionStats.timedOutConnections
|
|
},
|
|
memory: memoryUsageMB,
|
|
ipTracking: {
|
|
uniqueIPs: this.ipConnections.size,
|
|
activeIPs: activeIPs,
|
|
highVolumeIPs: highVolumeIPs
|
|
},
|
|
resourceLimits: {
|
|
maxConnections: this.options.maxConnections,
|
|
maxConnectionsPerIP: this.options.maxConnectionsPerIP,
|
|
connectionRateLimit: this.options.connectionRateLimit,
|
|
bufferSizeLimit: Math.round(this.options.bufferSizeLimit / 1024 / 1024) + 'MB'
|
|
}
|
|
});
|
|
|
|
// Check for potential DoS conditions
|
|
if (highVolumeIPs > 3) {
|
|
SmtpLogger.warn(`Potential DoS detected: ${highVolumeIPs} IPs with high connection rates`);
|
|
}
|
|
|
|
// Assess memory usage trends
|
|
if (memoryUsageMB.heapUsed > 500) { // Over 500MB heap used
|
|
SmtpLogger.warn(`High memory usage detected: ${memoryUsageMB.heapUsed}MB heap used`);
|
|
}
|
|
|
|
// Clean up expired IP rate limits and validate resource tracking
|
|
this.cleanupIpRateLimits();
|
|
}
|
|
|
|
/**
|
|
* Clean up expired IP rate limits and perform additional resource monitoring
|
|
*/
|
|
private cleanupIpRateLimits(): void {
|
|
const now = Date.now();
|
|
const windowThreshold = now - this.options.connectionRateWindow;
|
|
let activeIps = 0;
|
|
let removedEntries = 0;
|
|
|
|
// Iterate through IP connections and manage entries
|
|
for (const [ip, data] of this.ipConnections.entries()) {
|
|
// If the last connection was before the window threshold + one extra window, remove the entry
|
|
if (data.lastConnection < windowThreshold - this.options.connectionRateWindow) {
|
|
// Remove stale entries to prevent memory growth
|
|
this.ipConnections.delete(ip);
|
|
removedEntries++;
|
|
}
|
|
// If last connection was before the window threshold, reset the count
|
|
else if (data.lastConnection < windowThreshold) {
|
|
if (data.count > 0) {
|
|
// Reset but keep the IP in the map with a zero count
|
|
this.ipConnections.set(ip, {
|
|
count: 0,
|
|
firstConnection: now,
|
|
lastConnection: now
|
|
});
|
|
}
|
|
} else {
|
|
// This IP is still active within the current window
|
|
activeIps++;
|
|
}
|
|
}
|
|
|
|
// Log cleanup activity if significant changes occurred
|
|
if (removedEntries > 0) {
|
|
SmtpLogger.debug(`IP rate limit cleanup: removed ${removedEntries} stale entries, ${this.ipConnections.size} remaining, ${activeIps} active in current window`);
|
|
}
|
|
|
|
// Check for memory leaks in connection tracking
|
|
if (this.activeConnections.size > 0 && this.connectionStats.activeConnections !== this.activeConnections.size) {
|
|
SmtpLogger.warn(`Connection tracking inconsistency detected: stats.active=${this.connectionStats.activeConnections}, actual=${this.activeConnections.size}`);
|
|
// Fix the inconsistency
|
|
this.connectionStats.activeConnections = this.activeConnections.size;
|
|
}
|
|
|
|
// Validate and clean leaked resources if needed
|
|
this.validateResourceTracking();
|
|
}
|
|
|
|
/**
|
|
* Validate and repair resource tracking to prevent leaks
|
|
*/
|
|
private validateResourceTracking(): void {
|
|
// Prepare a detailed report if inconsistencies are found
|
|
const inconsistenciesFound = [];
|
|
|
|
// 1. Check active connections count matches activeConnections set size
|
|
if (this.connectionStats.activeConnections !== this.activeConnections.size) {
|
|
inconsistenciesFound.push({
|
|
issue: 'Active connection count mismatch',
|
|
stats: this.connectionStats.activeConnections,
|
|
actual: this.activeConnections.size,
|
|
action: 'Auto-corrected'
|
|
});
|
|
this.connectionStats.activeConnections = this.activeConnections.size;
|
|
}
|
|
|
|
// 2. Check for destroyed sockets in active connections
|
|
let destroyedSocketsCount = 0;
|
|
for (const socket of this.activeConnections) {
|
|
if (socket.destroyed) {
|
|
destroyedSocketsCount++;
|
|
// This should not happen - remove destroyed sockets from tracking
|
|
this.activeConnections.delete(socket);
|
|
}
|
|
}
|
|
|
|
if (destroyedSocketsCount > 0) {
|
|
inconsistenciesFound.push({
|
|
issue: 'Destroyed sockets in active list',
|
|
count: destroyedSocketsCount,
|
|
action: 'Removed from tracking'
|
|
});
|
|
// Update active connections count after cleanup
|
|
this.connectionStats.activeConnections = this.activeConnections.size;
|
|
}
|
|
|
|
// 3. Check for sessions without corresponding active connections
|
|
const sessionCount = this.smtpServer.getSessionManager().getSessionCount();
|
|
if (sessionCount > this.activeConnections.size) {
|
|
inconsistenciesFound.push({
|
|
issue: 'Orphaned sessions',
|
|
sessions: sessionCount,
|
|
connections: this.activeConnections.size,
|
|
action: 'Session cleanup recommended'
|
|
});
|
|
}
|
|
|
|
// If any inconsistencies found, log a detailed report
|
|
if (inconsistenciesFound.length > 0) {
|
|
SmtpLogger.warn('Resource tracking inconsistencies detected and repaired', { inconsistencies: inconsistenciesFound });
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a new connection with resource management
|
|
* @param socket - Client socket
|
|
*/
|
|
public async handleNewConnection(socket: plugins.net.Socket): Promise<void> {
|
|
// Update connection stats
|
|
this.connectionStats.totalConnections++;
|
|
this.connectionStats.activeConnections = this.activeConnections.size + 1;
|
|
|
|
if (this.connectionStats.activeConnections > this.connectionStats.peakConnections) {
|
|
this.connectionStats.peakConnections = this.connectionStats.activeConnections;
|
|
}
|
|
|
|
// Get client IP
|
|
const remoteAddress = socket.remoteAddress || '0.0.0.0';
|
|
|
|
// Check rate limits by IP
|
|
if (this.isIPRateLimited(remoteAddress)) {
|
|
this.rejectConnection(socket, 'Rate limit exceeded');
|
|
this.connectionStats.rejectedConnections++;
|
|
return;
|
|
}
|
|
|
|
// Check per-IP connection limit
|
|
if (this.hasReachedIPConnectionLimit(remoteAddress)) {
|
|
this.rejectConnection(socket, 'Too many connections from this IP');
|
|
this.connectionStats.rejectedConnections++;
|
|
return;
|
|
}
|
|
|
|
// Check if maximum global connections reached
|
|
if (this.hasReachedMaxConnections()) {
|
|
this.rejectConnection(socket, 'Too many connections');
|
|
this.connectionStats.rejectedConnections++;
|
|
return;
|
|
}
|
|
|
|
// Add socket to active connections
|
|
this.activeConnections.add(socket);
|
|
|
|
// Set up socket options
|
|
socket.setKeepAlive(true);
|
|
socket.setTimeout(this.options.socketTimeout);
|
|
|
|
// Explicitly set socket buffer sizes to prevent memory issues
|
|
socket.setNoDelay(true); // Disable Nagle's algorithm for better responsiveness
|
|
|
|
// Set limits on socket buffer size if supported by Node.js version
|
|
try {
|
|
// Here we set reasonable buffer limits to prevent memory exhaustion attacks
|
|
const highWaterMark = 64 * 1024; // 64 KB
|
|
// Note: Socket high water mark methods can't be set directly in newer Node.js versions
|
|
// These would need to be set during socket creation or with a different API
|
|
} catch (error) {
|
|
// Ignore errors from older Node.js versions that don't support these methods
|
|
SmtpLogger.debug(`Could not set socket buffer limits: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
|
|
// Track this IP connection
|
|
this.trackIPConnection(remoteAddress);
|
|
|
|
// Set up event handlers
|
|
this.setupSocketEventHandlers(socket);
|
|
|
|
// Create a session for this connection
|
|
this.smtpServer.getSessionManager().createSession(socket, false);
|
|
|
|
// Log the new connection using adaptive logger
|
|
const socketDetails = getSocketDetails(socket);
|
|
adaptiveLogger.logConnection(socket, 'connect');
|
|
|
|
// Update adaptive logger with current connection count
|
|
adaptiveLogger.updateConnectionCount(this.connectionStats.activeConnections);
|
|
|
|
// Send greeting
|
|
this.sendGreeting(socket);
|
|
}
|
|
|
|
/**
|
|
* Check if an IP has exceeded the rate limit
|
|
* @param ip - Client IP address
|
|
* @returns True if rate limited
|
|
*/
|
|
private isIPRateLimited(ip: string): boolean {
|
|
const now = Date.now();
|
|
const ipData = this.ipConnections.get(ip);
|
|
|
|
if (!ipData) {
|
|
return false; // No previous connections
|
|
}
|
|
|
|
// Check if we're within the rate window
|
|
const isWithinWindow = now - ipData.firstConnection <= this.options.connectionRateWindow;
|
|
|
|
// If within window and count exceeds limit, rate limit is applied
|
|
if (isWithinWindow && ipData.count >= this.options.connectionRateLimit) {
|
|
SmtpLogger.warn(`Rate limit exceeded for IP ${ip}: ${ipData.count} connections in ${Math.round((now - ipData.firstConnection) / 1000)}s`);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Track a new connection from an IP
|
|
* @param ip - Client IP address
|
|
*/
|
|
private trackIPConnection(ip: string): void {
|
|
const now = Date.now();
|
|
const ipData = this.ipConnections.get(ip);
|
|
|
|
if (!ipData) {
|
|
// First connection from this IP
|
|
this.ipConnections.set(ip, {
|
|
count: 1,
|
|
firstConnection: now,
|
|
lastConnection: now
|
|
});
|
|
} else {
|
|
// Check if we need to reset the window
|
|
if (now - ipData.lastConnection > this.options.connectionRateWindow) {
|
|
// Reset the window
|
|
this.ipConnections.set(ip, {
|
|
count: 1,
|
|
firstConnection: now,
|
|
lastConnection: now
|
|
});
|
|
} else {
|
|
// Increment within the current window
|
|
this.ipConnections.set(ip, {
|
|
count: ipData.count + 1,
|
|
firstConnection: ipData.firstConnection,
|
|
lastConnection: now
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if an IP has reached its connection limit
|
|
* @param ip - Client IP address
|
|
* @returns True if limit reached
|
|
*/
|
|
private hasReachedIPConnectionLimit(ip: string): boolean {
|
|
let ipConnectionCount = 0;
|
|
|
|
// Count active connections from this IP
|
|
for (const socket of this.activeConnections) {
|
|
if (socket.remoteAddress === ip) {
|
|
ipConnectionCount++;
|
|
}
|
|
}
|
|
|
|
return ipConnectionCount >= this.options.maxConnectionsPerIP;
|
|
}
|
|
|
|
/**
|
|
* Handle a new secure TLS connection with resource management
|
|
* @param socket - Client TLS socket
|
|
*/
|
|
public async handleNewSecureConnection(socket: plugins.tls.TLSSocket): Promise<void> {
|
|
// Update connection stats
|
|
this.connectionStats.totalConnections++;
|
|
this.connectionStats.activeConnections = this.activeConnections.size + 1;
|
|
|
|
if (this.connectionStats.activeConnections > this.connectionStats.peakConnections) {
|
|
this.connectionStats.peakConnections = this.connectionStats.activeConnections;
|
|
}
|
|
|
|
// Get client IP
|
|
const remoteAddress = socket.remoteAddress || '0.0.0.0';
|
|
|
|
// Check rate limits by IP
|
|
if (this.isIPRateLimited(remoteAddress)) {
|
|
this.rejectConnection(socket, 'Rate limit exceeded');
|
|
this.connectionStats.rejectedConnections++;
|
|
return;
|
|
}
|
|
|
|
// Check per-IP connection limit
|
|
if (this.hasReachedIPConnectionLimit(remoteAddress)) {
|
|
this.rejectConnection(socket, 'Too many connections from this IP');
|
|
this.connectionStats.rejectedConnections++;
|
|
return;
|
|
}
|
|
|
|
// Check if maximum global connections reached
|
|
if (this.hasReachedMaxConnections()) {
|
|
this.rejectConnection(socket, 'Too many connections');
|
|
this.connectionStats.rejectedConnections++;
|
|
return;
|
|
}
|
|
|
|
// Add socket to active connections
|
|
this.activeConnections.add(socket);
|
|
|
|
// Set up socket options
|
|
socket.setKeepAlive(true);
|
|
socket.setTimeout(this.options.socketTimeout);
|
|
|
|
// Explicitly set socket buffer sizes to prevent memory issues
|
|
socket.setNoDelay(true); // Disable Nagle's algorithm for better responsiveness
|
|
|
|
// Set limits on socket buffer size if supported by Node.js version
|
|
try {
|
|
// Here we set reasonable buffer limits to prevent memory exhaustion attacks
|
|
const highWaterMark = 64 * 1024; // 64 KB
|
|
// Note: Socket high water mark methods can't be set directly in newer Node.js versions
|
|
// These would need to be set during socket creation or with a different API
|
|
} catch (error) {
|
|
// Ignore errors from older Node.js versions that don't support these methods
|
|
SmtpLogger.debug(`Could not set socket buffer limits: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
|
|
// Track this IP connection
|
|
this.trackIPConnection(remoteAddress);
|
|
|
|
// Set up event handlers
|
|
this.setupSocketEventHandlers(socket);
|
|
|
|
// Create a session for this connection
|
|
this.smtpServer.getSessionManager().createSession(socket, true);
|
|
|
|
// Log the new secure connection using adaptive logger
|
|
adaptiveLogger.logConnection(socket, 'connect');
|
|
|
|
// Update adaptive logger with current connection count
|
|
adaptiveLogger.updateConnectionCount(this.connectionStats.activeConnections);
|
|
|
|
// Send greeting
|
|
this.sendGreeting(socket);
|
|
}
|
|
|
|
/**
|
|
* Set up event handlers for a socket with enhanced resource management
|
|
* @param socket - Client socket
|
|
*/
|
|
public setupSocketEventHandlers(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
|
// Store existing socket event handlers before adding new ones
|
|
const existingDataHandler = socket.listeners('data')[0] as (...args: any[]) => void;
|
|
const existingCloseHandler = socket.listeners('close')[0] as (...args: any[]) => void;
|
|
const existingErrorHandler = socket.listeners('error')[0] as (...args: any[]) => void;
|
|
const existingTimeoutHandler = socket.listeners('timeout')[0] as (...args: any[]) => void;
|
|
|
|
// Remove existing event handlers if they exist
|
|
if (existingDataHandler) socket.removeListener('data', existingDataHandler);
|
|
if (existingCloseHandler) socket.removeListener('close', existingCloseHandler);
|
|
if (existingErrorHandler) socket.removeListener('error', existingErrorHandler);
|
|
if (existingTimeoutHandler) socket.removeListener('timeout', existingTimeoutHandler);
|
|
|
|
// Data event - process incoming data from the client with resource limits
|
|
let buffer = '';
|
|
let totalBytesReceived = 0;
|
|
|
|
socket.on('data', async (data) => {
|
|
try {
|
|
// Get current session and update activity timestamp
|
|
const session = this.smtpServer.getSessionManager().getSession(socket);
|
|
if (session) {
|
|
this.smtpServer.getSessionManager().updateSessionActivity(session);
|
|
}
|
|
|
|
// Check if we're in DATA receiving mode - handle differently
|
|
if (session && session.state === SmtpState.DATA_RECEIVING) {
|
|
// In DATA mode, pass raw chunks directly to command handler with special marker
|
|
// Don't line-buffer large email content
|
|
try {
|
|
const dataString = data.toString('utf8');
|
|
// Use a special prefix to indicate this is raw data, not a command line
|
|
// CRITICAL FIX: Must await to prevent async pile-up
|
|
await this.smtpServer.getCommandHandler().processCommand(socket, `__RAW_DATA__${dataString}`);
|
|
return;
|
|
} catch (dataError) {
|
|
SmtpLogger.error(`Data handler error during DATA mode: ${dataError instanceof Error ? dataError.message : String(dataError)}`);
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
}
|
|
|
|
// For command mode, continue with line-buffered processing
|
|
// Check buffer size limits to prevent memory attacks
|
|
totalBytesReceived += data.length;
|
|
|
|
if (buffer.length > this.options.bufferSizeLimit) {
|
|
// Buffer is too large, reject the connection
|
|
SmtpLogger.warn(`Buffer size limit exceeded: ${buffer.length} bytes for ${socket.remoteAddress}`);
|
|
this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Message too large, disconnecting`);
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
// Impose a total transfer limit to prevent DoS
|
|
if (totalBytesReceived > this.options.bufferSizeLimit * 2) {
|
|
SmtpLogger.warn(`Total transfer limit exceeded: ${totalBytesReceived} bytes for ${socket.remoteAddress}`);
|
|
this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Transfer limit exceeded, disconnecting`);
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
// Convert buffer to string safely with explicit encoding
|
|
const dataString = data.toString('utf8');
|
|
|
|
// Buffer incoming data
|
|
buffer += dataString;
|
|
|
|
// Process complete lines
|
|
let lineEndPos;
|
|
while ((lineEndPos = buffer.indexOf(SMTP_DEFAULTS.CRLF)) !== -1) {
|
|
// Extract a complete line
|
|
const line = buffer.substring(0, lineEndPos);
|
|
buffer = buffer.substring(lineEndPos + 2); // +2 to skip CRLF
|
|
|
|
// Check line length to prevent extremely long lines
|
|
if (line.length > 4096) { // 4KB line limit is reasonable for SMTP
|
|
SmtpLogger.warn(`Line length limit exceeded: ${line.length} bytes for ${socket.remoteAddress}`);
|
|
this.sendResponse(socket, `${SmtpResponseCode.SYNTAX_ERROR} Line too long, disconnecting`);
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
// Process non-empty lines
|
|
if (line.length > 0) {
|
|
try {
|
|
// CRITICAL FIX: Must await processCommand to prevent async pile-up
|
|
// This was causing the busy loop with high CPU usage when many empty lines were processed
|
|
await this.smtpServer.getCommandHandler().processCommand(socket, line);
|
|
} catch (error) {
|
|
// Handle any errors in command processing
|
|
SmtpLogger.error(`Command handler error: ${error instanceof Error ? error.message : String(error)}`);
|
|
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Internal server error`);
|
|
|
|
// If there's a severe error, close the connection
|
|
if (error instanceof Error &&
|
|
(error.message.includes('fatal') || error.message.includes('critical'))) {
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If buffer is getting too large without CRLF, it might be a DoS attempt
|
|
if (buffer.length > 10240) { // 10KB is a reasonable limit for a line without CRLF
|
|
SmtpLogger.warn(`Incomplete line too large: ${buffer.length} bytes for ${socket.remoteAddress}`);
|
|
this.sendResponse(socket, `${SmtpResponseCode.SYNTAX_ERROR} Incomplete line too large, disconnecting`);
|
|
socket.destroy();
|
|
}
|
|
} catch (error) {
|
|
// Handle any unexpected errors during data processing
|
|
SmtpLogger.error(`Data handler error: ${error instanceof Error ? error.message : String(error)}`);
|
|
socket.destroy();
|
|
}
|
|
});
|
|
|
|
// Add drain event handler to manage flow control
|
|
socket.on('drain', () => {
|
|
// Socket buffer has been emptied, resume data flow if needed
|
|
if (socket.isPaused()) {
|
|
socket.resume();
|
|
SmtpLogger.debug(`Resumed socket for ${socket.remoteAddress} after drain`);
|
|
}
|
|
});
|
|
|
|
// Close event - clean up when connection is closed
|
|
socket.on('close', (hadError) => {
|
|
this.handleSocketClose(socket, hadError);
|
|
});
|
|
|
|
// Error event - handle socket errors
|
|
socket.on('error', (err) => {
|
|
this.handleSocketError(socket, err);
|
|
});
|
|
|
|
// Timeout event - handle socket timeouts
|
|
socket.on('timeout', () => {
|
|
this.handleSocketTimeout(socket);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Get the current connection count
|
|
* @returns Number of active connections
|
|
*/
|
|
public getConnectionCount(): number {
|
|
return this.activeConnections.size;
|
|
}
|
|
|
|
/**
|
|
* Check if the server has reached the maximum number of connections
|
|
* @returns True if max connections reached
|
|
*/
|
|
public hasReachedMaxConnections(): boolean {
|
|
return this.activeConnections.size >= this.options.maxConnections;
|
|
}
|
|
|
|
/**
|
|
* Close all active connections
|
|
*/
|
|
public closeAllConnections(): void {
|
|
const connectionCount = this.activeConnections.size;
|
|
if (connectionCount === 0) {
|
|
return;
|
|
}
|
|
|
|
SmtpLogger.info(`Closing all connections (count: ${connectionCount})`);
|
|
|
|
for (const socket of this.activeConnections) {
|
|
try {
|
|
// Send service closing notification
|
|
this.sendServiceClosing(socket);
|
|
|
|
// End the socket gracefully
|
|
socket.end();
|
|
|
|
// Force destroy after a short delay if not already destroyed
|
|
const destroyTimer = setTimeout(() => {
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
this.cleanupTimers.delete(destroyTimer);
|
|
}, 100);
|
|
this.cleanupTimers.add(destroyTimer);
|
|
} catch (error) {
|
|
SmtpLogger.error(`Error closing connection: ${error instanceof Error ? error.message : String(error)}`);
|
|
// Force destroy on error
|
|
try {
|
|
socket.destroy();
|
|
} catch (e) {
|
|
// Ignore destroy errors
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clear active connections
|
|
this.activeConnections.clear();
|
|
|
|
// Stop resource monitoring to prevent hanging timers
|
|
if (this.resourceCheckInterval) {
|
|
clearInterval(this.resourceCheckInterval);
|
|
this.resourceCheckInterval = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle socket close event
|
|
* @param socket - Client socket
|
|
* @param hadError - Whether the socket was closed due to error
|
|
*/
|
|
private handleSocketClose(socket: plugins.net.Socket | plugins.tls.TLSSocket, hadError: boolean): void {
|
|
try {
|
|
// Update connection statistics
|
|
this.connectionStats.closedConnections++;
|
|
this.connectionStats.activeConnections = this.activeConnections.size - 1;
|
|
|
|
// Get socket details for logging
|
|
const socketDetails = getSocketDetails(socket);
|
|
const socketId = `${socketDetails.remoteAddress}:${socketDetails.remotePort}`;
|
|
|
|
// Log with appropriate level based on whether there was an error
|
|
if (hadError) {
|
|
SmtpLogger.warn(`Socket closed with error: ${socketId}`);
|
|
} else {
|
|
SmtpLogger.debug(`Socket closed normally: ${socketId}`);
|
|
}
|
|
|
|
// Get the session before removing it
|
|
const session = this.smtpServer.getSessionManager().getSession(socket);
|
|
|
|
// Remove from active connections
|
|
this.activeConnections.delete(socket);
|
|
|
|
// Remove from session manager
|
|
this.smtpServer.getSessionManager().removeSession(socket);
|
|
|
|
// Cancel any timeout ID stored in the session
|
|
if (session?.dataTimeoutId) {
|
|
clearTimeout(session.dataTimeoutId);
|
|
}
|
|
|
|
// Log connection close with session details if available
|
|
adaptiveLogger.logConnection(socket, 'close', session);
|
|
|
|
// Update adaptive logger with new connection count
|
|
adaptiveLogger.updateConnectionCount(this.connectionStats.activeConnections);
|
|
} catch (error) {
|
|
// Handle any unexpected errors during cleanup
|
|
SmtpLogger.error(`Error in handleSocketClose: ${error instanceof Error ? error.message : String(error)}`);
|
|
|
|
// Ensure socket is removed from active connections even if an error occurs
|
|
this.activeConnections.delete(socket);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle socket error event
|
|
* @param socket - Client socket
|
|
* @param error - Error object
|
|
*/
|
|
private handleSocketError(socket: plugins.net.Socket | plugins.tls.TLSSocket, error: Error): void {
|
|
try {
|
|
// Update connection statistics
|
|
this.connectionStats.erroredConnections++;
|
|
|
|
// Get socket details for context
|
|
const socketDetails = getSocketDetails(socket);
|
|
const socketId = `${socketDetails.remoteAddress}:${socketDetails.remotePort}`;
|
|
|
|
// Get the session
|
|
const session = this.smtpServer.getSessionManager().getSession(socket);
|
|
|
|
// Detailed error logging with context information
|
|
SmtpLogger.error(`Socket error for ${socketId}: ${error.message}`, {
|
|
errorCode: (error as any).code,
|
|
errorStack: error.stack,
|
|
sessionId: session?.id,
|
|
sessionState: session?.state,
|
|
remoteAddress: socketDetails.remoteAddress,
|
|
remotePort: socketDetails.remotePort
|
|
});
|
|
|
|
// Log the error for connection tracking using adaptive logger
|
|
adaptiveLogger.logConnection(socket, 'error', session, error);
|
|
|
|
// Cancel any timeout ID stored in the session
|
|
if (session?.dataTimeoutId) {
|
|
clearTimeout(session.dataTimeoutId);
|
|
}
|
|
|
|
// Close the socket if not already closed
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
|
|
// Remove from active connections (cleanup after error)
|
|
this.activeConnections.delete(socket);
|
|
|
|
// Remove from session manager
|
|
this.smtpServer.getSessionManager().removeSession(socket);
|
|
} catch (handlerError) {
|
|
// Meta-error handling (errors in the error handler)
|
|
SmtpLogger.error(`Error in handleSocketError: ${handlerError instanceof Error ? handlerError.message : String(handlerError)}`);
|
|
|
|
// Ensure socket is destroyed and removed from active connections
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
this.activeConnections.delete(socket);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle socket timeout event
|
|
* @param socket - Client socket
|
|
*/
|
|
private handleSocketTimeout(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
|
try {
|
|
// Update connection statistics
|
|
this.connectionStats.timedOutConnections++;
|
|
|
|
// Get socket details for context
|
|
const socketDetails = getSocketDetails(socket);
|
|
const socketId = `${socketDetails.remoteAddress}:${socketDetails.remotePort}`;
|
|
|
|
// Get the session
|
|
const session = this.smtpServer.getSessionManager().getSession(socket);
|
|
|
|
// Get timing information for better debugging
|
|
const now = Date.now();
|
|
const idleTime = session?.lastActivity ? now - session.lastActivity : 'unknown';
|
|
|
|
if (session) {
|
|
// Log the timeout with extended details
|
|
SmtpLogger.warn(`Socket timeout from ${session.remoteAddress}`, {
|
|
sessionId: session.id,
|
|
remoteAddress: session.remoteAddress,
|
|
state: session.state,
|
|
timeout: this.options.socketTimeout,
|
|
idleTime: idleTime,
|
|
emailState: session.envelope?.mailFrom ? 'has-sender' : 'no-sender',
|
|
recipientCount: session.envelope?.rcptTo?.length || 0
|
|
});
|
|
|
|
// Cancel any timeout ID stored in the session
|
|
if (session.dataTimeoutId) {
|
|
clearTimeout(session.dataTimeoutId);
|
|
}
|
|
|
|
// Send timeout notification to client
|
|
this.sendResponse(socket, `${SmtpResponseCode.SERVICE_NOT_AVAILABLE} Connection timeout - closing connection`);
|
|
} else {
|
|
// Log timeout without session context
|
|
SmtpLogger.warn(`Socket timeout without session from ${socketId}`);
|
|
}
|
|
|
|
// Close the socket gracefully
|
|
try {
|
|
socket.end();
|
|
|
|
// Set a forced close timeout in case socket.end() doesn't close the connection
|
|
const timeoutDestroyTimer = setTimeout(() => {
|
|
if (!socket.destroyed) {
|
|
SmtpLogger.warn(`Forcing destroy of timed out socket: ${socketId}`);
|
|
socket.destroy();
|
|
}
|
|
this.cleanupTimers.delete(timeoutDestroyTimer);
|
|
}, 5000); // 5 second grace period for socket to end properly
|
|
this.cleanupTimers.add(timeoutDestroyTimer);
|
|
} catch (error) {
|
|
SmtpLogger.error(`Error ending timed out socket: ${error instanceof Error ? error.message : String(error)}`);
|
|
|
|
// Ensure socket is destroyed even if end() fails
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
}
|
|
|
|
// Clean up resources
|
|
this.activeConnections.delete(socket);
|
|
this.smtpServer.getSessionManager().removeSession(socket);
|
|
} catch (handlerError) {
|
|
// Handle any unexpected errors during timeout handling
|
|
SmtpLogger.error(`Error in handleSocketTimeout: ${handlerError instanceof Error ? handlerError.message : String(handlerError)}`);
|
|
|
|
// Ensure socket is destroyed and removed from tracking
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
this.activeConnections.delete(socket);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reject a connection
|
|
* @param socket - Client socket
|
|
* @param reason - Reason for rejection
|
|
*/
|
|
private rejectConnection(socket: plugins.net.Socket | plugins.tls.TLSSocket, reason: string): void {
|
|
// Log the rejection
|
|
const socketDetails = getSocketDetails(socket);
|
|
SmtpLogger.warn(`Connection rejected from ${socketDetails.remoteAddress}:${socketDetails.remotePort}: ${reason}`);
|
|
|
|
// Send rejection message
|
|
this.sendResponse(socket, `${SmtpResponseCode.SERVICE_NOT_AVAILABLE} ${this.options.hostname} Service temporarily unavailable - ${reason}`);
|
|
|
|
// Close the socket
|
|
try {
|
|
socket.end();
|
|
} catch (error) {
|
|
SmtpLogger.error(`Error ending rejected socket: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send greeting message
|
|
* @param socket - Client socket
|
|
*/
|
|
private sendGreeting(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
|
const greeting = `${SmtpResponseCode.SERVICE_READY} ${this.options.hostname} ESMTP service ready`;
|
|
this.sendResponse(socket, greeting);
|
|
}
|
|
|
|
/**
|
|
* Send service closing notification
|
|
* @param socket - Client socket
|
|
*/
|
|
private sendServiceClosing(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
|
const message = `${SmtpResponseCode.SERVICE_CLOSING} ${this.options.hostname} Service closing transmission channel`;
|
|
this.sendResponse(socket, message);
|
|
}
|
|
|
|
/**
|
|
* Send response to client
|
|
* @param socket - Client socket
|
|
* @param response - Response to send
|
|
*/
|
|
private sendResponse(socket: plugins.net.Socket | plugins.tls.TLSSocket, response: string): void {
|
|
// Check if socket is still writable before attempting to write
|
|
if (socket.destroyed || socket.readyState !== 'open' || !socket.writable) {
|
|
SmtpLogger.debug(`Skipping response to closed/destroyed socket: ${response}`, {
|
|
remoteAddress: socket.remoteAddress,
|
|
remotePort: socket.remotePort,
|
|
destroyed: socket.destroyed,
|
|
readyState: socket.readyState,
|
|
writable: socket.writable
|
|
});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
socket.write(`${response}${SMTP_DEFAULTS.CRLF}`);
|
|
adaptiveLogger.logResponse(response, socket);
|
|
} catch (error) {
|
|
// Log error and destroy socket
|
|
SmtpLogger.error(`Error sending response: ${error instanceof Error ? error.message : String(error)}`, {
|
|
response,
|
|
remoteAddress: socket.remoteAddress,
|
|
remotePort: socket.remotePort,
|
|
error: error instanceof Error ? error : new Error(String(error))
|
|
});
|
|
|
|
socket.destroy();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a new connection (interface requirement)
|
|
*/
|
|
public async handleConnection(socket: plugins.net.Socket | plugins.tls.TLSSocket, secure: boolean): Promise<void> {
|
|
if (secure) {
|
|
this.handleNewSecureConnection(socket as plugins.tls.TLSSocket);
|
|
} else {
|
|
this.handleNewConnection(socket as plugins.net.Socket);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if accepting new connections (interface requirement)
|
|
*/
|
|
public canAcceptConnection(): boolean {
|
|
return !this.hasReachedMaxConnections();
|
|
}
|
|
|
|
/**
|
|
* Clean up resources
|
|
*/
|
|
public destroy(): void {
|
|
// Clear resource monitoring interval
|
|
if (this.resourceCheckInterval) {
|
|
clearInterval(this.resourceCheckInterval);
|
|
this.resourceCheckInterval = null;
|
|
}
|
|
|
|
// Clear all cleanup timers
|
|
for (const timer of this.cleanupTimers) {
|
|
clearTimeout(timer);
|
|
}
|
|
this.cleanupTimers.clear();
|
|
|
|
// Close all active connections
|
|
this.closeAllConnections();
|
|
|
|
// Clear maps
|
|
this.activeConnections.clear();
|
|
this.ipConnections.clear();
|
|
|
|
// Reset connection stats
|
|
this.connectionStats = {
|
|
totalConnections: 0,
|
|
activeConnections: 0,
|
|
peakConnections: 0,
|
|
rejectedConnections: 0,
|
|
closedConnections: 0,
|
|
erroredConnections: 0,
|
|
timedOutConnections: 0
|
|
};
|
|
|
|
SmtpLogger.debug('ConnectionManager destroyed');
|
|
}
|
|
} |