/** * SMTP Data Handler * Responsible for processing email data during and after DATA command */ import * as plugins from '../../../plugins.js'; import * as fs from 'fs'; import * as path from 'path'; import { SmtpState } from './interfaces.js'; import type { ISmtpSession, ISmtpTransactionResult } from './interfaces.js'; import type { IDataHandler, ISessionManager } from './interfaces.js'; import { SmtpResponseCode, SMTP_PATTERNS, SMTP_DEFAULTS } from './constants.js'; import { SmtpLogger } from './utils/logging.js'; import { Email } from '../../core/classes.email.js'; import { UnifiedEmailServer } from '../../routing/classes.unified.email.server.js'; /** * Handles SMTP DATA command and email data processing */ export class DataHandler implements IDataHandler { /** * Session manager instance */ private sessionManager: ISessionManager; /** * Email server reference */ private emailServer: UnifiedEmailServer; /** * SMTP server options */ private options: { size: number; tempDir?: string; hostname?: string; }; /** * Creates a new data handler * @param sessionManager - Session manager instance * @param emailServer - Email server reference * @param options - Data handler options */ constructor( sessionManager: ISessionManager, emailServer: UnifiedEmailServer, options: { size?: number; tempDir?: string; hostname?: string; } = {} ) { this.sessionManager = sessionManager; this.emailServer = emailServer; this.options = { size: options.size || SMTP_DEFAULTS.MAX_MESSAGE_SIZE, tempDir: options.tempDir, hostname: options.hostname || SMTP_DEFAULTS.HOSTNAME }; // Create temp directory if specified and doesn't exist if (this.options.tempDir) { try { if (!fs.existsSync(this.options.tempDir)) { fs.mkdirSync(this.options.tempDir, { recursive: true }); } } catch (error) { SmtpLogger.error(`Failed to create temp directory: ${error instanceof Error ? error.message : String(error)}`, { error: error instanceof Error ? error : new Error(String(error)), tempDir: this.options.tempDir }); this.options.tempDir = undefined; } } } /** * Process incoming email data * @param socket - Client socket * @param data - Data chunk * @returns Promise that resolves when the data is processed */ public async processEmailData(socket: plugins.net.Socket | plugins.tls.TLSSocket, data: string): Promise { // Get the session for this socket const session = this.sessionManager.getSession(socket); if (!session) { this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Internal server error - session not found`); return; } // Clear any existing timeout and set a new one if (session.dataTimeoutId) { clearTimeout(session.dataTimeoutId); } session.dataTimeoutId = setTimeout(() => { if (session.state === SmtpState.DATA_RECEIVING) { SmtpLogger.warn(`DATA timeout for session ${session.id}`, { sessionId: session.id }); this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Data timeout`); this.resetSession(session); } }, SMTP_DEFAULTS.DATA_TIMEOUT); // Update activity timestamp this.sessionManager.updateSessionActivity(session); // Store data in chunks for better memory efficiency if (!session.emailDataChunks) { session.emailDataChunks = []; } session.emailDataChunks.push(data); // Check if we've reached the max size let totalSize = 0; for (const chunk of session.emailDataChunks) { totalSize += chunk.length; } if (totalSize > this.options.size) { SmtpLogger.warn(`Message size exceeds limit for session ${session.id}`, { sessionId: session.id, size: totalSize, limit: this.options.size }); this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Message too big, size limit is ${this.options.size} bytes`); this.resetSession(session); return; } // Check for end of data marker - combine all chunks to ensure we don't miss it if split across chunks const combinedData = session.emailDataChunks.join(''); // More permissive check for the end-of-data marker // Check for various formats: \r\n.\r\n, \n.\r\n, \r\n.\n, \n.\n, or just . or .\r\n at the end if (combinedData.endsWith('\r\n.\r\n') || combinedData.endsWith('\n.\r\n') || combinedData.endsWith('\r\n.\n') || combinedData.endsWith('\n.\n') || data === '.\r\n' || data === '.') { SmtpLogger.debug(`End of data marker found for session ${session.id}`, { sessionId: session.id }); // End of data marker found await this.handleEndOfData(socket, session); } } /** * Process a complete email * @param session - SMTP session * @returns Promise that resolves with the result of the transaction */ public async processEmail(session: ISmtpSession): Promise { // Combine all chunks and remove end of data marker session.emailData = (session.emailDataChunks || []).join(''); // Remove trailing end-of-data marker: various formats session.emailData = session.emailData .replace(/\r\n\.\r\n$/, '') .replace(/\n\.\r\n$/, '') .replace(/\r\n\.\n$/, '') .replace(/\n\.\n$/, '') .replace(/\.$/, ''); // Handle a lone dot at the end // Remove dot-stuffing (RFC 5321, section 4.5.2) session.emailData = session.emailData.replace(/\r\n\.\./g, '\r\n.'); try { // Parse email into Email object const email = await this.parseEmail(session); // Process the email based on the processing mode const processingMode = session.processingMode || 'mta'; let result: ISmtpTransactionResult = { success: false, error: 'Email processing failed' }; switch (processingMode) { case 'mta': // Process through the MTA system try { SmtpLogger.debug(`Processing email in MTA mode for session ${session.id}`, { sessionId: session.id, messageId: email.getMessageId() }); // Generate a message ID since queueEmail is not available const messageId = `${Date.now()}-${Math.floor(Math.random() * 1000000)}@${this.options.hostname || 'mail.example.com'}`; // In a full implementation, the email would be queued to the delivery system // await this.emailServer.queueEmail(email); result = { success: true, messageId, email }; } catch (error) { SmtpLogger.error(`Failed to queue email: ${error instanceof Error ? error.message : String(error)}`, { sessionId: session.id, error: error instanceof Error ? error : new Error(String(error)) }); result = { success: false, error: `Failed to queue email: ${error instanceof Error ? error.message : String(error)}` }; } break; case 'forward': // Forward email to another server SmtpLogger.debug(`Processing email in FORWARD mode for session ${session.id}`, { sessionId: session.id, messageId: email.getMessageId() }); // Forward logic would be implemented here result = { success: true, messageId: email.getMessageId(), email }; break; case 'process': // Process the email immediately SmtpLogger.debug(`Processing email in PROCESS mode for session ${session.id}`, { sessionId: session.id, messageId: email.getMessageId() }); // Direct processing logic would be implemented here result = { success: true, messageId: email.getMessageId(), email }; break; default: SmtpLogger.warn(`Unknown processing mode: ${processingMode}`, { sessionId: session.id }); result = { success: false, error: `Unknown processing mode: ${processingMode}` }; } return result; } catch (error) { SmtpLogger.error(`Failed to parse email: ${error instanceof Error ? error.message : String(error)}`, { sessionId: session.id, error: error instanceof Error ? error : new Error(String(error)) }); return { success: false, error: `Failed to parse email: ${error instanceof Error ? error.message : String(error)}` }; } } /** * Save an email to disk * @param session - SMTP session */ public saveEmail(session: ISmtpSession): void { if (!this.options.tempDir) { return; } try { const timestamp = Date.now(); const filename = `${session.id}-${timestamp}.eml`; const filePath = path.join(this.options.tempDir, filename); fs.writeFileSync(filePath, session.emailData); SmtpLogger.debug(`Saved email to disk: ${filePath}`, { sessionId: session.id, filePath }); } catch (error) { SmtpLogger.error(`Failed to save email to disk: ${error instanceof Error ? error.message : String(error)}`, { sessionId: session.id, error: error instanceof Error ? error : new Error(String(error)) }); } } /** * Parse an email into an Email object * @param session - SMTP session * @returns Promise that resolves with the parsed Email object */ public async parseEmail(session: ISmtpSession): Promise { // Parse raw email text to extract headers const rawData = session.emailData; const headerEndIndex = rawData.indexOf('\r\n\r\n'); if (headerEndIndex === -1) { // No headers/body separation, create basic email return new Email({ from: session.envelope.mailFrom.address, to: session.envelope.rcptTo.map(r => r.address), subject: 'Received via SMTP', text: rawData }); } // Extract headers and body const headersText = rawData.substring(0, headerEndIndex); const bodyText = rawData.substring(headerEndIndex + 4); // Skip the \r\n\r\n separator // Parse headers const headers: Record = {}; const headerLines = headersText.split('\r\n'); let currentHeader = ''; for (const line of headerLines) { // Check if this is a continuation of a previous header if (line.startsWith(' ') || line.startsWith('\t')) { if (currentHeader) { headers[currentHeader] += ' ' + line.trim(); } continue; } // This is a new header const separatorIndex = line.indexOf(':'); if (separatorIndex !== -1) { const name = line.substring(0, separatorIndex).trim().toLowerCase(); const value = line.substring(separatorIndex + 1).trim(); headers[name] = value; currentHeader = name; } } // Extract common headers const subject = headers['subject'] || 'No Subject'; const from = headers['from'] || session.envelope.mailFrom.address; const to = headers['to'] || session.envelope.rcptTo.map(r => r.address).join(', '); const messageId = headers['message-id'] || `<${Date.now()}.${Math.random().toString(36).substring(2)}@${this.options.hostname}>`; // Create email object const email = new Email({ from: from, to: to.split(',').map(addr => addr.trim()), subject: subject, text: bodyText, // Add original session envelope data for accurate routing as headers headers: { 'X-Original-Mail-From': session.envelope.mailFrom.address, 'X-Original-Rcpt-To': session.envelope.rcptTo.map(r => r.address).join(', '), 'Message-Id': messageId } }); // Add received header const timestamp = new Date().toUTCString(); const receivedHeader = `from ${session.clientHostname || 'unknown'} (${session.remoteAddress}) by ${this.options.hostname} with ESMTP id ${session.id}; ${timestamp}`; email.addHeader('Received', receivedHeader); // Add all original headers for (const [name, value] of Object.entries(headers)) { if (!['from', 'to', 'subject', 'message-id'].includes(name)) { email.addHeader(name, value); } } return email; } /** * Handle end of data marker received * @param socket - Client socket * @param session - SMTP session */ private async handleEndOfData(socket: plugins.net.Socket | plugins.tls.TLSSocket, session: ISmtpSession): Promise { // Clear the data timeout if (session.dataTimeoutId) { clearTimeout(session.dataTimeoutId); session.dataTimeoutId = undefined; } try { // Update session state this.sessionManager.updateSessionState(session, SmtpState.FINISHED); // Optionally save email to disk this.saveEmail(session); // Process the email const result = await this.processEmail(session); if (result.success) { // Send success response this.sendResponse(socket, `${SmtpResponseCode.OK} OK message queued as ${result.messageId}`); } else { // Send error response this.sendResponse(socket, `${SmtpResponseCode.TRANSACTION_FAILED} Failed to process email: ${result.error}`); } // Reset session for new transaction this.resetSession(session); } catch (error) { SmtpLogger.error(`Error processing email: ${error instanceof Error ? error.message : String(error)}`, { sessionId: session.id, error: error instanceof Error ? error : new Error(String(error)) }); this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Error processing email: ${error instanceof Error ? error.message : String(error)}`); this.resetSession(session); } } /** * Reset session after email processing * @param session - SMTP session */ private resetSession(session: ISmtpSession): void { // Clear any data timeout if (session.dataTimeoutId) { clearTimeout(session.dataTimeoutId); session.dataTimeoutId = undefined; } // Reset data fields but keep authentication state session.mailFrom = ''; session.rcptTo = []; session.emailData = ''; session.emailDataChunks = []; session.envelope = { mailFrom: { address: '', args: {} }, rcptTo: [] }; // Reset state to after EHLO this.sessionManager.updateSessionState(session, SmtpState.AFTER_EHLO); } /** * Send a response to the client * @param socket - Client socket * @param response - Response message */ private sendResponse(socket: plugins.net.Socket | plugins.tls.TLSSocket, response: string): void { try { socket.write(`${response}${SMTP_DEFAULTS.CRLF}`); SmtpLogger.logResponse(response, socket); } catch (error) { // Attempt to recover from specific transient errors if (this.isRecoverableSocketError(error)) { this.handleSocketError(socket, error, response); } else { // Log error for non-recoverable errors SmtpLogger.error(`Error sending response: ${error instanceof Error ? error.message : String(error)}`, { response, remoteAddress: socket.remoteAddress, remotePort: socket.remotePort, error: error instanceof Error ? error : new Error(String(error)) }); } } } /** * Check if a socket error is potentially recoverable * @param error - The error that occurred * @returns Whether the error is potentially recoverable */ private isRecoverableSocketError(error: unknown): boolean { const recoverableErrorCodes = [ 'EPIPE', // Broken pipe 'ECONNRESET', // Connection reset by peer 'ETIMEDOUT', // Connection timed out 'ECONNABORTED' // Connection aborted ]; return ( error instanceof Error && 'code' in error && typeof (error as any).code === 'string' && recoverableErrorCodes.includes((error as any).code) ); } /** * Handle recoverable socket errors with retry logic * @param socket - Client socket * @param error - The error that occurred * @param response - The response that failed to send */ private handleSocketError(socket: plugins.net.Socket | plugins.tls.TLSSocket, error: unknown, response: string): void { // Get the session for this socket const session = this.sessionManager.getSession(socket); if (!session) { SmtpLogger.error(`Session not found when handling socket error`); if (!socket.destroyed) { socket.destroy(); } return; } // Get error details for logging const errorMessage = error instanceof Error ? error.message : String(error); const errorCode = error instanceof Error && 'code' in error ? (error as any).code : 'UNKNOWN'; SmtpLogger.warn(`Recoverable socket error during data handling (${errorCode}): ${errorMessage}`, { sessionId: session.id, remoteAddress: session.remoteAddress, error: error instanceof Error ? error : new Error(String(error)) }); // Check if socket is already destroyed if (socket.destroyed) { SmtpLogger.info(`Socket already destroyed, cannot retry data operation`); return; } // Check if socket is writeable if (!socket.writable) { SmtpLogger.info(`Socket no longer writable, aborting data recovery attempt`); if (!socket.destroyed) { socket.destroy(); } return; } // Attempt to retry the write operation after a short delay setTimeout(() => { try { if (!socket.destroyed && socket.writable) { socket.write(`${response}${SMTP_DEFAULTS.CRLF}`); SmtpLogger.info(`Successfully retried data send operation after error`); } else { SmtpLogger.warn(`Socket no longer available for data retry`); if (!socket.destroyed) { socket.destroy(); } } } catch (retryError) { SmtpLogger.error(`Data retry attempt failed: ${retryError instanceof Error ? retryError.message : String(retryError)}`); if (!socket.destroyed) { socket.destroy(); } } }, 100); // Short delay before retry } }