import * as plugins from '../../plugins.js'; import { EventEmitter } from 'node:events'; import * as net from 'node:net'; import * as tls from 'node:tls'; import { logger } from '../../logger.js'; import { SecurityLogger, SecurityLogLevel, SecurityEventType } from '../../security/index.js'; import { UnifiedDeliveryQueue } from './classes.delivery.queue.js'; /** * Delivery status enumeration */ export var DeliveryStatus; (function (DeliveryStatus) { DeliveryStatus["PENDING"] = "pending"; DeliveryStatus["DELIVERING"] = "delivering"; DeliveryStatus["DELIVERED"] = "delivered"; DeliveryStatus["DEFERRED"] = "deferred"; DeliveryStatus["FAILED"] = "failed"; })(DeliveryStatus || (DeliveryStatus = {})); /** * Handles delivery for all email processing modes */ export class MultiModeDeliverySystem extends EventEmitter { queue; options; stats; deliveryTimes = []; activeDeliveries = new Set(); running = false; throttled = false; rateLimitLastCheck = Date.now(); rateLimitCounter = 0; emailServer; /** * Create a new multi-mode delivery system * @param queue Unified delivery queue * @param options Delivery options * @param emailServer Optional reference to unified email server for SmtpClient access */ constructor(queue, options, emailServer) { super(); this.queue = queue; this.emailServer = emailServer; // Set default options this.options = { connectionPoolSize: options.connectionPoolSize || 10, socketTimeout: options.socketTimeout || 30000, // 30 seconds concurrentDeliveries: options.concurrentDeliveries || 10, sendTimeout: options.sendTimeout || 60000, // 1 minute verifyCertificates: options.verifyCertificates !== false, // Default to true tlsMinVersion: options.tlsMinVersion || 'TLSv1.2', forwardHandler: options.forwardHandler || { deliver: this.handleForwardDelivery.bind(this) }, deliveryHandler: options.deliveryHandler || { deliver: this.handleMtaDelivery.bind(this) }, processHandler: options.processHandler || { deliver: this.handleProcessDelivery.bind(this) }, globalRateLimit: options.globalRateLimit || 100, // 100 emails per minute perPatternRateLimit: options.perPatternRateLimit || {}, processBounces: options.processBounces !== false, // Default to true bounceHandler: options.bounceHandler || null, onDeliveryStart: options.onDeliveryStart || (async () => { }), onDeliverySuccess: options.onDeliverySuccess || (async () => { }), onDeliveryFailed: options.onDeliveryFailed || (async () => { }) }; // Initialize statistics this.stats = { activeDeliveries: 0, totalSuccessful: 0, totalFailed: 0, avgDeliveryTime: 0, byMode: { forward: { successful: 0, failed: 0 }, mta: { successful: 0, failed: 0 }, process: { successful: 0, failed: 0 } }, rateLimiting: { currentRate: 0, globalLimit: this.options.globalRateLimit, throttled: 0 } }; // Set up event listeners this.queue.on('itemsReady', this.processItems.bind(this)); } /** * Start the delivery system */ async start() { logger.log('info', 'Starting MultiModeDeliverySystem'); if (this.running) { logger.log('warn', 'MultiModeDeliverySystem is already running'); return; } this.running = true; // Emit started event this.emit('started'); logger.log('info', 'MultiModeDeliverySystem started successfully'); } /** * Stop the delivery system */ async stop() { logger.log('info', 'Stopping MultiModeDeliverySystem'); if (!this.running) { logger.log('warn', 'MultiModeDeliverySystem is already stopped'); return; } this.running = false; // Wait for active deliveries to complete if (this.activeDeliveries.size > 0) { logger.log('info', `Waiting for ${this.activeDeliveries.size} active deliveries to complete`); // Wait for a maximum of 30 seconds await new Promise(resolve => { const checkInterval = setInterval(() => { if (this.activeDeliveries.size === 0) { clearInterval(checkInterval); clearTimeout(forceTimeout); resolve(); } }, 1000); // Force resolve after 30 seconds const forceTimeout = setTimeout(() => { clearInterval(checkInterval); resolve(); }, 30000); }); } // Emit stopped event this.emit('stopped'); logger.log('info', 'MultiModeDeliverySystem stopped successfully'); } /** * Process ready items from the queue * @param items Queue items ready for processing */ async processItems(items) { if (!this.running) { return; } // Check if we're already at max concurrent deliveries if (this.activeDeliveries.size >= this.options.concurrentDeliveries) { logger.log('debug', `Already at max concurrent deliveries (${this.activeDeliveries.size})`); return; } // Check rate limiting if (this.checkRateLimit()) { logger.log('debug', 'Rate limit exceeded, throttling deliveries'); return; } // Calculate how many more deliveries we can start const availableSlots = this.options.concurrentDeliveries - this.activeDeliveries.size; const itemsToProcess = items.slice(0, availableSlots); if (itemsToProcess.length === 0) { return; } logger.log('info', `Processing ${itemsToProcess.length} items for delivery`); // Process each item for (const item of itemsToProcess) { // Mark as processing await this.queue.markProcessing(item.id); // Add to active deliveries this.activeDeliveries.add(item.id); this.stats.activeDeliveries = this.activeDeliveries.size; // Deliver asynchronously this.deliverItem(item).catch(err => { logger.log('error', `Unhandled error in delivery: ${err.message}`); }); } // Update statistics this.emit('statsUpdated', this.stats); } /** * Deliver an item from the queue * @param item Queue item to deliver */ async deliverItem(item) { const startTime = Date.now(); try { // Call delivery start hook await this.options.onDeliveryStart(item); // Emit delivery start event this.emit('deliveryStart', item); logger.log('info', `Starting delivery of item ${item.id}, mode: ${item.processingMode}`); // Choose the appropriate handler based on mode let result; switch (item.processingMode) { case 'forward': result = await this.options.forwardHandler.deliver(item); break; case 'mta': result = await this.options.deliveryHandler.deliver(item); break; case 'process': result = await this.options.processHandler.deliver(item); break; default: throw new Error(`Unknown processing mode: ${item.processingMode}`); } // Mark as delivered await this.queue.markDelivered(item.id); // Update statistics this.stats.totalSuccessful++; this.stats.byMode[item.processingMode].successful++; // Calculate delivery time const deliveryTime = Date.now() - startTime; this.deliveryTimes.push(deliveryTime); this.updateDeliveryTimeStats(); // Call delivery success hook await this.options.onDeliverySuccess(item, result); // Emit delivery success event this.emit('deliverySuccess', item, result); logger.log('info', `Item ${item.id} delivered successfully in ${deliveryTime}ms`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_DELIVERY, message: 'Email delivery successful', details: { itemId: item.id, mode: item.processingMode, routeName: item.route?.name || 'unknown', deliveryTime }, success: true }); } catch (error) { // Calculate delivery attempt time even for failures const deliveryTime = Date.now() - startTime; // Mark as failed await this.queue.markFailed(item.id, error.message); // Update statistics this.stats.totalFailed++; this.stats.byMode[item.processingMode].failed++; // Call delivery failed hook await this.options.onDeliveryFailed(item, error.message); // Process as bounce if enabled and we have a bounce handler if (this.options.processBounces && this.options.bounceHandler) { try { const email = item.processingResult; // Extract recipient and error message // For multiple recipients, we'd need more sophisticated parsing const recipient = email.to.length > 0 ? email.to[0] : ''; if (recipient) { logger.log('info', `Processing delivery failure as bounce for recipient ${recipient}`); // Process SMTP failure through bounce handler await this.options.bounceHandler.processSmtpFailure(recipient, error.message, { sender: email.from, originalEmailId: item.id, headers: email.headers }); logger.log('info', `Bounce record created for failed delivery to ${recipient}`); } } catch (bounceError) { logger.log('error', `Failed to process bounce: ${bounceError.message}`); } } // Emit delivery failed event this.emit('deliveryFailed', item, error); logger.log('error', `Item ${item.id} delivery failed: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_DELIVERY, message: 'Email delivery failed', details: { itemId: item.id, mode: item.processingMode, routeName: item.route?.name || 'unknown', error: error.message, deliveryTime }, success: false }); } finally { // Remove from active deliveries this.activeDeliveries.delete(item.id); this.stats.activeDeliveries = this.activeDeliveries.size; // Update statistics this.emit('statsUpdated', this.stats); } } /** * Default handler for forward mode delivery * @param item Queue item */ async handleForwardDelivery(item) { logger.log('info', `Forward delivery for item ${item.id}`); const email = item.processingResult; const route = item.route; // Get target server information const targetServer = route?.action.forward?.host; const targetPort = route?.action.forward?.port || 25; const useTls = false; // TLS configuration can be enhanced later if (!targetServer) { throw new Error('No target server configured for forward mode'); } logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`); try { // Get SMTP client from email server if available if (!this.emailServer) { // Fall back to raw socket implementation if no email server logger.log('warn', 'No email server available, falling back to raw socket implementation'); return this.handleForwardDeliveryLegacy(item); } // Get SMTP client from UnifiedEmailServer const smtpClient = this.emailServer.getSmtpClient(targetServer, targetPort); // Apply DKIM signing if configured in the route if (item.route?.action.options?.mtaOptions?.dkimSign) { await this.applyDkimSigning(email, item.route.action.options.mtaOptions); } // Send the email using SmtpClient const result = await smtpClient.sendMail(email); if (result.success) { logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`); return { targetServer: targetServer, targetPort: targetPort, recipients: result.acceptedRecipients.length, messageId: result.messageId, rejectedRecipients: result.rejectedRecipients }; } else { throw new Error(result.error?.message || 'Failed to forward email'); } } catch (error) { logger.log('error', `Failed to forward email: ${error.message}`); throw error; } } /** * Legacy forward delivery using raw sockets (fallback) * @param item Queue item */ async handleForwardDeliveryLegacy(item) { const email = item.processingResult; const route = item.route; // Get target server information const targetServer = route?.action.forward?.host; const targetPort = route?.action.forward?.port || 25; const useTls = false; // TLS configuration can be enhanced later if (!targetServer) { throw new Error('No target server configured for forward mode'); } // Create a socket connection to the target server const socket = new net.Socket(); // Set timeout socket.setTimeout(this.options.socketTimeout); try { // Connect to the target server await new Promise((resolve, reject) => { // Handle connection events socket.on('connect', () => { logger.log('debug', `Connected to ${targetServer}:${targetPort}`); resolve(); }); socket.on('timeout', () => { reject(new Error(`Connection timeout to ${targetServer}:${targetPort}`)); }); socket.on('error', (err) => { reject(new Error(`Connection error to ${targetServer}:${targetPort}: ${err.message}`)); }); // Connect to the server socket.connect({ host: targetServer, port: targetPort }); }); // Send EHLO await this.smtpCommand(socket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`); // Start TLS if required if (useTls) { await this.smtpCommand(socket, 'STARTTLS'); // Upgrade to TLS const tlsSocket = await this.upgradeTls(socket, targetServer); // Send EHLO again after STARTTLS await this.smtpCommand(tlsSocket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`); // Use tlsSocket for remaining commands return this.completeSMTPExchange(tlsSocket, email, route); } // Complete the SMTP exchange return this.completeSMTPExchange(socket, email, route); } catch (error) { logger.log('error', `Failed to forward email: ${error.message}`); // Close the connection socket.destroy(); throw error; } } /** * Complete the SMTP exchange after connection and initial setup * @param socket Network socket * @param email Email to send * @param rule Domain rule */ async completeSMTPExchange(socket, email, route) { try { // Authenticate if credentials provided if (route?.action?.forward?.auth?.user && route?.action?.forward?.auth?.pass) { // Send AUTH LOGIN await this.smtpCommand(socket, 'AUTH LOGIN'); // Send username (base64) const username = Buffer.from(route.action.forward.auth.user).toString('base64'); await this.smtpCommand(socket, username); // Send password (base64) const password = Buffer.from(route.action.forward.auth.pass).toString('base64'); await this.smtpCommand(socket, password); } // Send MAIL FROM await this.smtpCommand(socket, `MAIL FROM:<${email.from}>`); // Send RCPT TO for each recipient for (const recipient of email.getAllRecipients()) { await this.smtpCommand(socket, `RCPT TO:<${recipient}>`); } // Send DATA await this.smtpCommand(socket, 'DATA'); // Send email content (simplified) const emailContent = await this.getFormattedEmail(email); await this.smtpData(socket, emailContent); // Send QUIT await this.smtpCommand(socket, 'QUIT'); // Close the connection socket.end(); logger.log('info', `Email forwarded successfully to ${route?.action?.forward?.host}:${route?.action?.forward?.port || 25}`); return { targetServer: route?.action?.forward?.host, targetPort: route?.action?.forward?.port || 25, recipients: email.getAllRecipients().length }; } catch (error) { logger.log('error', `Failed to forward email: ${error.message}`); // Close the connection socket.destroy(); throw error; } } /** * Default handler for MTA mode delivery * @param item Queue item */ async handleMtaDelivery(item) { logger.log('info', `MTA delivery for item ${item.id}`); const email = item.processingResult; const route = item.route; try { // Apply DKIM signing if configured in the route if (item.route?.action.options?.mtaOptions?.dkimSign) { await this.applyDkimSigning(email, item.route.action.options.mtaOptions); } // In a full implementation, this would use the MTA service // For now, we'll simulate a successful delivery logger.log('info', `Email processed by MTA: ${email.subject} to ${email.getAllRecipients().join(', ')}`); // Note: The MTA implementation would handle actual local delivery // Simulate successful delivery return { recipients: email.getAllRecipients().length, subject: email.subject, dkimSigned: !!item.route?.action.options?.mtaOptions?.dkimSign }; } catch (error) { logger.log('error', `Failed to process email in MTA mode: ${error.message}`); throw error; } } /** * Default handler for process mode delivery * @param item Queue item */ async handleProcessDelivery(item) { logger.log('info', `Process delivery for item ${item.id}`); const email = item.processingResult; const route = item.route; try { // Apply content scanning if enabled if (route?.action.options?.contentScanning && route?.action.options?.scanners && route.action.options.scanners.length > 0) { logger.log('info', 'Performing content scanning'); // Apply each scanner for (const scanner of route.action.options.scanners) { switch (scanner.type) { case 'spam': logger.log('info', 'Scanning for spam content'); // Implement spam scanning break; case 'virus': logger.log('info', 'Scanning for virus content'); // Implement virus scanning break; case 'attachment': logger.log('info', 'Scanning attachments'); // Check for blocked extensions if (scanner.blockedExtensions && scanner.blockedExtensions.length > 0) { for (const attachment of email.attachments) { const ext = this.getFileExtension(attachment.filename); if (scanner.blockedExtensions.includes(ext)) { if (scanner.action === 'reject') { throw new Error(`Blocked attachment type: ${ext}`); } else { // tag email.addHeader('X-Attachment-Warning', `Potentially unsafe attachment: ${attachment.filename}`); } } } } break; } } } // Apply transformations if defined if (route?.action.options?.transformations && route?.action.options?.transformations.length > 0) { logger.log('info', 'Applying email transformations'); for (const transform of route.action.options.transformations) { switch (transform.type) { case 'addHeader': if (transform.header && transform.value) { email.addHeader(transform.header, transform.value); } break; } } } // Apply DKIM signing if configured (after all transformations) if (item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim) { await this.applyDkimSigning(email, item.route.action.options?.mtaOptions || {}); } logger.log('info', `Email successfully processed in store-and-forward mode`); // Simulate successful delivery return { recipients: email.getAllRecipients().length, subject: email.subject, scanned: !!route?.action.options?.contentScanning, transformed: !!(route?.action.options?.transformations && route?.action.options?.transformations.length > 0), dkimSigned: !!(item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim) }; } catch (error) { logger.log('error', `Failed to process email: ${error.message}`); throw error; } } /** * Get file extension from filename */ getFileExtension(filename) { return filename.substring(filename.lastIndexOf('.')).toLowerCase(); } /** * Apply DKIM signing to an email */ async applyDkimSigning(email, mtaOptions) { if (!this.emailServer) { logger.log('warn', 'Cannot apply DKIM signing without email server reference'); return; } const domainName = mtaOptions.dkimOptions?.domainName || email.from.split('@')[1]; const keySelector = mtaOptions.dkimOptions?.keySelector || 'default'; try { // Ensure DKIM keys exist for the domain await this.emailServer.dkimCreator.handleDKIMKeysForDomain(domainName); // Convert Email to raw format for signing const rawEmail = email.toRFC822String(); // Sign the email const dkimPrivateKey = (await this.emailServer.dkimCreator.readDKIMKeys(domainName)).privateKey; const signResult = await plugins.dkimSign(rawEmail, { signingDomain: domainName, selector: keySelector, privateKey: dkimPrivateKey, canonicalization: 'relaxed/relaxed', algorithm: 'rsa-sha256', signTime: new Date(), signatureData: [ { signingDomain: domainName, selector: keySelector, privateKey: dkimPrivateKey, algorithm: 'rsa-sha256', canonicalization: 'relaxed/relaxed' } ] }); // Add the DKIM-Signature header to the email if (signResult.signatures) { email.addHeader('DKIM-Signature', signResult.signatures); logger.log('info', `Successfully added DKIM signature for ${domainName}`); } } catch (error) { logger.log('error', `Failed to apply DKIM signature: ${error.message}`); // Don't throw - allow email to be sent without DKIM if signing fails } } /** * Format email for SMTP transmission * @param email Email to format */ async getFormattedEmail(email) { // This is a simplified implementation // In a full implementation, this would use proper MIME formatting let content = ''; // Add headers content += `From: ${email.from}\r\n`; content += `To: ${email.to.join(', ')}\r\n`; content += `Subject: ${email.subject}\r\n`; // Add additional headers for (const [name, value] of Object.entries(email.headers || {})) { content += `${name}: ${value}\r\n`; } // Add content type for multipart if (email.attachments && email.attachments.length > 0) { const boundary = `----_=_NextPart_${Math.random().toString(36).substr(2)}`; content += `MIME-Version: 1.0\r\n`; content += `Content-Type: multipart/mixed; boundary="${boundary}"\r\n`; content += `\r\n`; // Add text part content += `--${boundary}\r\n`; content += `Content-Type: text/plain; charset="UTF-8"\r\n`; content += `\r\n`; content += `${email.text}\r\n`; // Add HTML part if present if (email.html) { content += `--${boundary}\r\n`; content += `Content-Type: text/html; charset="UTF-8"\r\n`; content += `\r\n`; content += `${email.html}\r\n`; } // Add attachments for (const attachment of email.attachments) { content += `--${boundary}\r\n`; content += `Content-Type: ${attachment.contentType || 'application/octet-stream'}; name="${attachment.filename}"\r\n`; content += `Content-Disposition: attachment; filename="${attachment.filename}"\r\n`; content += `Content-Transfer-Encoding: base64\r\n`; content += `\r\n`; // Add base64 encoded content const base64Content = attachment.content.toString('base64'); // Split into lines of 76 characters for (let i = 0; i < base64Content.length; i += 76) { content += base64Content.substring(i, i + 76) + '\r\n'; } } // End boundary content += `--${boundary}--\r\n`; } else { // Simple email with just text content += `Content-Type: text/plain; charset="UTF-8"\r\n`; content += `\r\n`; content += `${email.text}\r\n`; } return content; } /** * Send SMTP command and wait for response * @param socket Socket connection * @param command SMTP command to send */ async smtpCommand(socket, command) { return new Promise((resolve, reject) => { const onData = (data) => { const response = data.toString().trim(); // Clean up listeners socket.removeListener('data', onData); socket.removeListener('error', onError); socket.removeListener('timeout', onTimeout); // Check response code if (response.charAt(0) === '2' || response.charAt(0) === '3') { resolve(response); } else { reject(new Error(`SMTP error: ${response}`)); } }; const onError = (err) => { // Clean up listeners socket.removeListener('data', onData); socket.removeListener('error', onError); socket.removeListener('timeout', onTimeout); reject(err); }; const onTimeout = () => { // Clean up listeners socket.removeListener('data', onData); socket.removeListener('error', onError); socket.removeListener('timeout', onTimeout); reject(new Error('SMTP command timeout')); }; // Set up listeners socket.once('data', onData); socket.once('error', onError); socket.once('timeout', onTimeout); // Send command socket.write(command + '\r\n'); }); } /** * Send SMTP DATA command with content * @param socket Socket connection * @param data Email content to send */ async smtpData(socket, data) { return new Promise((resolve, reject) => { const onData = (responseData) => { const response = responseData.toString().trim(); // Clean up listeners socket.removeListener('data', onData); socket.removeListener('error', onError); socket.removeListener('timeout', onTimeout); // Check response code if (response.charAt(0) === '2') { resolve(response); } else { reject(new Error(`SMTP error: ${response}`)); } }; const onError = (err) => { // Clean up listeners socket.removeListener('data', onData); socket.removeListener('error', onError); socket.removeListener('timeout', onTimeout); reject(err); }; const onTimeout = () => { // Clean up listeners socket.removeListener('data', onData); socket.removeListener('error', onError); socket.removeListener('timeout', onTimeout); reject(new Error('SMTP data timeout')); }; // Set up listeners socket.once('data', onData); socket.once('error', onError); socket.once('timeout', onTimeout); // Send data and end with CRLF.CRLF socket.write(data + '\r\n.\r\n'); }); } /** * Upgrade socket to TLS * @param socket Socket connection * @param hostname Target hostname for TLS */ async upgradeTls(socket, hostname) { return new Promise((resolve, reject) => { const tlsOptions = { socket, servername: hostname, rejectUnauthorized: this.options.verifyCertificates, minVersion: this.options.tlsMinVersion }; const tlsSocket = tls.connect(tlsOptions); tlsSocket.once('secureConnect', () => { resolve(tlsSocket); }); tlsSocket.once('error', (err) => { reject(new Error(`TLS error: ${err.message}`)); }); tlsSocket.setTimeout(this.options.socketTimeout); tlsSocket.once('timeout', () => { reject(new Error('TLS connection timeout')); }); }); } /** * Update delivery time statistics */ updateDeliveryTimeStats() { if (this.deliveryTimes.length === 0) return; // Keep only the last 1000 delivery times if (this.deliveryTimes.length > 1000) { this.deliveryTimes = this.deliveryTimes.slice(-1000); } // Calculate average const sum = this.deliveryTimes.reduce((acc, time) => acc + time, 0); this.stats.avgDeliveryTime = sum / this.deliveryTimes.length; } /** * Check if rate limit is exceeded * @returns True if rate limited, false otherwise */ checkRateLimit() { const now = Date.now(); const elapsed = now - this.rateLimitLastCheck; // Reset counter if more than a minute has passed if (elapsed >= 60000) { this.rateLimitLastCheck = now; this.rateLimitCounter = 0; this.throttled = false; this.stats.rateLimiting.currentRate = 0; return false; } // Check if we're already throttled if (this.throttled) { return true; } // Increment counter this.rateLimitCounter++; // Calculate current rate (emails per minute) const rate = (this.rateLimitCounter / elapsed) * 60000; this.stats.rateLimiting.currentRate = rate; // Check if rate limit is exceeded if (rate > this.options.globalRateLimit) { this.throttled = true; this.stats.rateLimiting.throttled++; // Schedule throttle reset const resetDelay = 60000 - elapsed; setTimeout(() => { this.throttled = false; this.rateLimitLastCheck = Date.now(); this.rateLimitCounter = 0; this.stats.rateLimiting.currentRate = 0; }, resetDelay); return true; } return false; } /** * Update delivery options * @param options New options */ updateOptions(options) { this.options = { ...this.options, ...options }; // Update rate limit statistics if (options.globalRateLimit) { this.stats.rateLimiting.globalLimit = options.globalRateLimit; } logger.log('info', 'MultiModeDeliverySystem options updated'); } /** * Get delivery statistics */ getStats() { return { ...this.stats }; } } //# sourceMappingURL=data:application/json;base64,