diff --git a/ts/mail/delivery/classes.smtp.client.ts b/ts/mail/delivery/classes.smtp.client.ts index 244f057..e48204e 100644 --- a/ts/mail/delivery/classes.smtp.client.ts +++ b/ts/mail/delivery/classes.smtp.client.ts @@ -407,8 +407,8 @@ export class SmtpClient { // Clear previous extensions this.supportedExtensions.clear(); - // Send EHLO - const response = await this.sendCommand(`EHLO ${this.options.domain}`); + // Send EHLO - don't allow pipelining for this command + const response = await this.sendCommand(`EHLO ${this.options.domain}`, false); // Parse supported extensions const lines = response.split('\r\n'); @@ -420,7 +420,13 @@ export class SmtpClient { } } + // Check if server supports pipelining + this.supportsPipelining = this.supportedExtensions.has('PIPELINING'); + logger.log('debug', `Server supports extensions: ${Array.from(this.supportedExtensions).join(', ')}`); + if (this.supportsPipelining) { + logger.log('info', 'Server supports PIPELINING - will use for improved performance'); + } } /** @@ -661,20 +667,75 @@ export class SmtpClient { result.dkimSigned = true; } - // Send MAIL FROM + // Get envelope and recipients const envelope_from = email.getEnvelopeFrom() || email.from; - await this.sendCommand(`MAIL FROM:<${envelope_from}> SIZE=${this.getEmailSize(email)}`); - - // Send RCPT TO for each recipient const recipients = email.getAllRecipients(); - for (const recipient of recipients) { + // Check if we can use pipelining for MAIL FROM and RCPT TO commands + if (this.supportsPipelining && recipients.length > 0) { + logger.log('debug', 'Using SMTP pipelining for sending'); + + // Send MAIL FROM command first (always needed) + const mailFromCmd = `MAIL FROM:<${envelope_from}> SIZE=${this.getEmailSize(email)}`; + let mailFromResponse: string; + try { - await this.sendCommand(`RCPT TO:<${recipient}>`); - result.acceptedRecipients.push(recipient); + mailFromResponse = await this.sendCommand(mailFromCmd); + + if (!mailFromResponse.startsWith('250')) { + throw new MtaDeliveryError( + `MAIL FROM command failed: ${mailFromResponse}`, + { + data: { + command: mailFromCmd, + response: mailFromResponse + } + } + ); + } } catch (error) { - logger.log('warn', `Recipient ${recipient} rejected: ${error.message}`); - result.rejectedRecipients.push(recipient); + logger.log('error', `MAIL FROM failed: ${error.message}`); + throw error; + } + + // Pipeline all RCPT TO commands + const rcptPromises = recipients.map(recipient => { + return this.sendCommand(`RCPT TO:<${recipient}>`) + .then(response => { + if (response.startsWith('250')) { + result.acceptedRecipients.push(recipient); + return { recipient, accepted: true, response }; + } else { + result.rejectedRecipients.push(recipient); + logger.log('warn', `Recipient ${recipient} rejected: ${response}`); + return { recipient, accepted: false, response }; + } + }) + .catch(error => { + result.rejectedRecipients.push(recipient); + logger.log('warn', `Recipient ${recipient} rejected with error: ${error.message}`); + return { recipient, accepted: false, error: error.message }; + }); + }); + + // Wait for all RCPT TO commands to complete + await Promise.all(rcptPromises); + } else { + // Fall back to sequential commands if pipelining not supported + logger.log('debug', 'Using sequential SMTP commands for sending'); + + // Send MAIL FROM + await this.sendCommand(`MAIL FROM:<${envelope_from}> SIZE=${this.getEmailSize(email)}`); + + // Send RCPT TO for each recipient + for (const recipient of recipients) { + try { + await this.sendCommand(`RCPT TO:<${recipient}>`); + result.acceptedRecipients.push(recipient); + } catch (error) { + logger.log('warn', `Recipient ${recipient} rejected: ${error.message}`); + result.rejectedRecipients.push(recipient); + } } } @@ -705,7 +766,7 @@ export class SmtpClient { ); } - // Format email content (simplified for now) + // Format email content efficiently const emailContent = await this.getFormattedEmail(email); // Send email content @@ -923,7 +984,26 @@ export class SmtpClient { * Send SMTP command and wait for response * @param command SMTP command to send */ - private async sendCommand(command: string): Promise { + // Queue for command pipelining + private commandQueue: Array<{ + command: string; + resolve: (response: string) => void; + reject: (error: any) => void; + timeout: NodeJS.Timeout; + }> = []; + + // Flag to indicate if we're currently processing commands + private processingCommands = false; + + // Flag to indicate if server supports pipelining + private supportsPipelining = false; + + /** + * Send an SMTP command and wait for response + * @param command SMTP command to send + * @param allowPipelining Whether this command can be pipelined + */ + private async sendCommand(command: string, allowPipelining = true): Promise { if (!this.socket) { throw new MtaConnectionError( 'Not connected to server', @@ -946,6 +1026,12 @@ export class SmtpClient { return new Promise((resolve, reject) => { // Set up timeout for command const timeout = setTimeout(() => { + // Remove this command from the queue if it times out + const index = this.commandQueue.findIndex(item => item.command === command); + if (index !== -1) { + this.commandQueue.splice(index, 1); + } + reject(MtaTimeoutError.commandTimeout( command.split(' ')[0], this.options.host, @@ -953,35 +1039,168 @@ export class SmtpClient { )); }, this.options.commandTimeout); - // Send command - this.socket.write(command + '\r\n', (err) => { - if (err) { - clearTimeout(timeout); - reject(new MtaConnectionError( - `Failed to send command: ${err.message}`, - { - data: { - command: command.split(' ')[0], - error: err.message - } - } - )); - } + // Add command to the queue + this.commandQueue.push({ + command, + resolve, + reject, + timeout }); + // Process command queue if we can pipeline or if not currently processing commands + if ((this.supportsPipelining && allowPipelining) || !this.processingCommands) { + this.processCommandQueue(); + } + }); + } + + /** + * Process the command queue - either one by one or pipelined if supported + */ + private processCommandQueue(): void { + if (this.processingCommands || this.commandQueue.length === 0 || !this.socket) { + return; + } + + this.processingCommands = true; + + try { + // If pipelining is supported, send all commands at once + if (this.supportsPipelining) { + // Send all commands in queue at once + const commands = this.commandQueue.map(item => item.command).join('\r\n') + '\r\n'; + + this.socket.write(commands, (err) => { + if (err) { + // Handle write error for all commands + const error = new MtaConnectionError( + `Failed to send commands: ${err.message}`, + { + data: { + error: err.message + } + } + ); + + // Fail all pending commands + while (this.commandQueue.length > 0) { + const item = this.commandQueue.shift(); + clearTimeout(item.timeout); + item.reject(error); + } + + this.processingCommands = false; + } + }); + + // Process responses one by one in order + this.processResponses(); + } else { + // Process commands one by one if pipelining not supported + this.processNextCommand(); + } + } catch (error) { + logger.log('error', `Error processing command queue: ${error.message}`); + this.processingCommands = false; + } + } + + /** + * Process the next command in the queue (non-pipelined mode) + */ + private processNextCommand(): void { + if (this.commandQueue.length === 0 || !this.socket) { + this.processingCommands = false; + return; + } + + const currentCommand = this.commandQueue[0]; + + this.socket.write(currentCommand.command + '\r\n', (err) => { + if (err) { + // Handle write error + const error = new MtaConnectionError( + `Failed to send command: ${err.message}`, + { + data: { + command: currentCommand.command.split(' ')[0], + error: err.message + } + } + ); + + // Remove from queue + this.commandQueue.shift(); + clearTimeout(currentCommand.timeout); + currentCommand.reject(error); + + // Continue with next command + this.processNextCommand(); + return; + } + // Read response this.readResponse() .then((response) => { - clearTimeout(timeout); - resolve(response); + // Remove from queue and resolve + this.commandQueue.shift(); + clearTimeout(currentCommand.timeout); + currentCommand.resolve(response); + + // Process next command + this.processNextCommand(); }) .catch((err) => { - clearTimeout(timeout); - reject(err); + // Remove from queue and reject + this.commandQueue.shift(); + clearTimeout(currentCommand.timeout); + currentCommand.reject(err); + + // Process next command + this.processNextCommand(); }); }); } + /** + * Process responses for pipelined commands + */ + private async processResponses(): Promise { + try { + // Process responses for each command in order + while (this.commandQueue.length > 0) { + const currentCommand = this.commandQueue[0]; + + try { + // Wait for response + const response = await this.readResponse(); + + // Remove from queue and resolve + this.commandQueue.shift(); + clearTimeout(currentCommand.timeout); + currentCommand.resolve(response); + } catch (error) { + // Remove from queue and reject + this.commandQueue.shift(); + clearTimeout(currentCommand.timeout); + currentCommand.reject(error); + + // Stop processing if this is a critical error + if ( + error instanceof MtaConnectionError && + (error.message.includes('Connection closed') || error.message.includes('Not connected')) + ) { + break; + } + } + } + } catch (error) { + logger.log('error', `Error processing responses: ${error.message}`); + } finally { + this.processingCommands = false; + } + } + /** * Read response from the server */ @@ -999,37 +1218,45 @@ export class SmtpClient { } return new Promise((resolve, reject) => { - let responseData = ''; + // Use an array to collect response chunks instead of string concatenation + const responseChunks: Buffer[] = []; + + // Single function to clean up all listeners + const cleanupListeners = () => { + if (!this.socket) return; + this.socket.removeListener('data', onData); + this.socket.removeListener('error', onError); + this.socket.removeListener('close', onClose); + this.socket.removeListener('end', onEnd); + }; const onData = (data: Buffer) => { - responseData += data.toString(); + // Store buffer directly, avoiding unnecessary string conversion + responseChunks.push(data); + + // Convert to string only for response checking + const responseData = Buffer.concat(responseChunks).toString(); // Check if this is a complete response if (this.isCompleteResponse(responseData)) { // Clean up listeners - this.socket.removeListener('data', onData); - this.socket.removeListener('error', onError); - this.socket.removeListener('close', onClose); - this.socket.removeListener('end', onEnd); + cleanupListeners(); - logger.log('debug', `< ${responseData.trim()}`); + const trimmedResponse = responseData.trim(); + logger.log('debug', `< ${trimmedResponse}`); // Check if this is an error response if (this.isErrorResponse(responseData)) { const code = responseData.substring(0, 3); - reject(this.createErrorFromResponse(responseData, code)); + reject(this.createErrorFromResponse(trimmedResponse, code)); } else { - resolve(responseData.trim()); + resolve(trimmedResponse); } } }; const onError = (err: Error) => { - // Clean up listeners - this.socket.removeListener('data', onData); - this.socket.removeListener('error', onError); - this.socket.removeListener('close', onClose); - this.socket.removeListener('end', onEnd); + cleanupListeners(); reject(new MtaConnectionError( `Socket error while waiting for response: ${err.message}`, @@ -1042,12 +1269,9 @@ export class SmtpClient { }; const onClose = () => { - // Clean up listeners - this.socket.removeListener('data', onData); - this.socket.removeListener('error', onError); - this.socket.removeListener('close', onClose); - this.socket.removeListener('end', onEnd); + cleanupListeners(); + const responseData = Buffer.concat(responseChunks).toString(); reject(new MtaConnectionError( 'Connection closed while waiting for response', { @@ -1059,12 +1283,9 @@ export class SmtpClient { }; const onEnd = () => { - // Clean up listeners - this.socket.removeListener('data', onData); - this.socket.removeListener('error', onError); - this.socket.removeListener('close', onClose); - this.socket.removeListener('end', onEnd); + cleanupListeners(); + const responseData = Buffer.concat(responseChunks).toString(); reject(new MtaConnectionError( 'Connection ended while waiting for response', { diff --git a/ts/mail/delivery/classes.smtpserver.ts b/ts/mail/delivery/classes.smtpserver.ts index 637d4d6..a09b882 100644 --- a/ts/mail/delivery/classes.smtpserver.ts +++ b/ts/mail/delivery/classes.smtpserver.ts @@ -23,7 +23,11 @@ export class SMTPServer { private smtpServerOptions: ISmtpServerOptions; private server: plugins.net.Server; private sessions: Map; + private sessionTimeouts: Map; private hostname: string; + private sessionIdCounter: number = 0; + private connectionCount: number = 0; + private maxConnections: number = 100; // Default max connections constructor(emailServerRefArg: UnifiedEmailServer, optionsArg: ISmtpServerOptions) { console.log('SMTPServer instance is being created...'); @@ -31,21 +35,113 @@ export class SMTPServer { this.emailServerRef = emailServerRefArg; this.smtpServerOptions = optionsArg; this.sessions = new Map(); + this.sessionTimeouts = new Map(); this.hostname = optionsArg.hostname || 'mail.lossless.one'; + this.maxConnections = optionsArg.maxSize || 100; + + // Start session cleanup interval + setInterval(() => this.cleanupIdleSessions(), 30000); // Run every 30 seconds this.server = plugins.net.createServer((socket) => { + // Check if we've exceeded maximum connections + if (this.connectionCount >= this.maxConnections) { + logger.log('warn', `Connection limit reached (${this.maxConnections}), rejecting new connection`); + socket.write('421 Too many connections, try again later\r\n'); + socket.destroy(); + return; + } + this.handleNewConnection(socket); }); } + + /** + * Clean up idle sessions + * @private + */ + private cleanupIdleSessions(): void { + const now = Date.now(); + const sessionTimeout = this.smtpServerOptions.socketTimeout || 300000; // Default 5 minutes + + // Check all sessions for timeout + for (const [socket, session] of this.sessions.entries()) { + if (!session.lastActivity) continue; + + const idleTime = now - session.lastActivity; + if (idleTime > sessionTimeout) { + logger.log('info', `Session ${session.id} timed out after ${idleTime}ms of inactivity`); + + try { + // Send timeout message and end connection + this.sendResponse(socket, '421 Timeout - closing connection'); + socket.destroy(); + } catch (error) { + logger.log('error', `Error closing timed out session: ${error.message}`); + } + + // Clean up session + this.removeSession(socket); + } + } + } + + /** + * Create a new session ID + * @private + */ + private generateSessionId(): string { + return `${Date.now()}-${++this.sessionIdCounter}`; + } + + /** + * Properly remove a session and clean up resources + * @private + */ + private removeSession(socket: plugins.net.Socket | plugins.tls.TLSSocket): void { + const session = this.sessions.get(socket); + if (!session) return; + + // Clear session timeout if exists + const timeoutId = this.sessionTimeouts.get(session.id); + if (timeoutId) { + clearTimeout(timeoutId); + this.sessionTimeouts.delete(session.id); + } + + // Remove session from map + this.sessions.delete(socket); + + // Decrement connection count + this.connectionCount--; + + logger.log('debug', `Session ${session.id} removed, active connections: ${this.connectionCount}`); + } + + /** + * Update last activity timestamp for a session + * @private + */ + private updateSessionActivity(socket: plugins.net.Socket | plugins.tls.TLSSocket): void { + const session = this.sessions.get(socket); + if (!session) return; + + session.lastActivity = Date.now(); + } private async handleNewConnection(socket: plugins.net.Socket): Promise { const clientIp = socket.remoteAddress; const clientPort = socket.remotePort; console.log(`New connection from ${clientIp}:${clientPort}`); + // Increment connection count + this.connectionCount++; + + // Generate unique session ID + const sessionId = this.generateSessionId(); + // Initialize a new session this.sessions.set(socket, { - id: `${socket.remoteAddress}:${socket.remotePort}`, + id: sessionId, state: SmtpState.GREETING, clientHostname: '', mailFrom: '', @@ -56,6 +152,7 @@ export class SMTPServer { remoteAddress: socket.remoteAddress || '', secure: false, authenticated: false, + lastActivity: Date.now(), envelope: { mailFrom: { address: '', @@ -129,7 +226,29 @@ export class SMTPServer { // Send greeting this.sendResponse(socket, `220 ${this.hostname} ESMTP Service Ready`); + // Set session timeout + const sessionTimeout = setTimeout(() => { + logger.log('info', `Initial connection timeout for session ${sessionId}`); + this.sendResponse(socket, '421 Connection timeout'); + socket.destroy(); + this.removeSession(socket); + }, this.smtpServerOptions.connectionTimeout || 30000); + + // Store timeout reference + this.sessionTimeouts.set(sessionId, sessionTimeout); + socket.on('data', (data) => { + // Clear initial connection timeout on first data + const timeoutId = this.sessionTimeouts.get(sessionId); + if (timeoutId) { + clearTimeout(timeoutId); + this.sessionTimeouts.delete(sessionId); + } + + // Update last activity timestamp + this.updateSessionActivity(socket); + + // Process the data this.processData(socket, data); }); @@ -151,16 +270,21 @@ export class SMTPServer { details: { clientPort, state: SmtpState[session.state], - from: session.mailFrom || 'not set' + from: session.mailFrom || 'not set', + sessionId: session.id } }); } + + // Clean up session + this.removeSession(socket); }); socket.on('error', (err) => { const clientIp = socket.remoteAddress; const clientPort = socket.remotePort; - console.error(`Socket error: ${err.message}`); + const session = this.sessions.get(socket); + console.error(`Socket error for session ${session?.id}: ${err.message}`); // Log connection error as security event SecurityLogger.getInstance().logEvent({ @@ -172,18 +296,21 @@ export class SMTPServer { clientPort, error: err.message, errorCode: (err as any).code, - from: this.sessions.get(socket)?.mailFrom || 'not set' + from: session?.mailFrom || 'not set', + sessionId: session?.id } }); - this.sessions.delete(socket); + // Clean up session resources + this.removeSession(socket); socket.destroy(); }); socket.on('close', () => { const clientIp = socket.remoteAddress; const clientPort = socket.remotePort; - console.log(`Connection closed from ${clientIp}:${clientPort}`); + const session = this.sessions.get(socket); + console.log(`Connection closed for session ${session?.id} from ${clientIp}:${clientPort}`); // Log connection closure as security event SecurityLogger.getInstance().logEvent({ @@ -193,11 +320,13 @@ export class SMTPServer { ipAddress: clientIp, details: { clientPort, - sessionEnded: this.sessions.get(socket)?.connectionEnded || false + sessionId: session?.id, + sessionEnded: session?.connectionEnded || false } }); - this.sessions.delete(socket); + // Clean up session resources + this.removeSession(socket); }); } @@ -239,6 +368,9 @@ export class SMTPServer { private processCommand(socket: plugins.net.Socket | plugins.tls.TLSSocket, commandLine: string): void { const session = this.sessions.get(socket); if (!session || session.connectionEnded) return; + + // Update session activity timestamp + this.updateSessionActivity(socket); const [command, ...args] = commandLine.split(' '); const upperCommand = command.toUpperCase(); @@ -431,19 +563,32 @@ export class SMTPServer { const session = this.sessions.get(socket); if (!session) return; + // Initialize email data buffer if it doesn't exist + if (!session.emailDataChunks) { + session.emailDataChunks = []; + } + // Check for end of data marker if (data.endsWith('\r\n.\r\n')) { // Remove the end of data marker const emailData = data.slice(0, -5); - session.emailData += emailData; + + // Add final chunk + session.emailDataChunks.push(emailData); + + // Join chunks efficiently + session.emailData = session.emailDataChunks.join(''); + + // Free memory + session.emailDataChunks = undefined; session.state = SmtpState.FINISHED; // Save and process the email this.saveEmail(socket); this.sendResponse(socket, '250 OK: Message accepted for delivery'); } else { - // Accumulate the data - session.emailData += data; + // Accumulate the data as chunks + session.emailDataChunks.push(data); } } @@ -801,6 +946,4 @@ export class SMTPServer { const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; return emailRegex.test(email); } - - // These methods are defined elsewhere in the class, duplicates removed } \ No newline at end of file diff --git a/ts/mail/delivery/interfaces.ts b/ts/mail/delivery/interfaces.ts index 4876b99..b4234ed 100644 --- a/ts/mail/delivery/interfaces.ts +++ b/ts/mail/delivery/interfaces.ts @@ -96,6 +96,11 @@ export interface ISmtpSession { */ emailData: string; + /** + * Chunks of email data for more efficient buffer management + */ + emailDataChunks?: string[]; + /** * Whether the connection is using TLS */ @@ -130,6 +135,11 @@ export interface ISmtpSession { * Email processing mode to use for this session */ processingMode?: EmailProcessingMode; + + /** + * Timestamp of last activity for session timeout tracking + */ + lastActivity?: number; } /**