1090 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
		
		
			
		
	
	
			1090 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
|  | import * as plugins from '../../plugins.ts'; | ||
|  | import { EventEmitter } from 'node:events'; | ||
|  | import * as net from 'node:net'; | ||
|  | import * as tls from 'node:tls'; | ||
|  | import { logger } from '../../logger.ts'; | ||
|  | import {  | ||
|  |   SecurityLogger,  | ||
|  |   SecurityLogLevel,  | ||
|  |   SecurityEventType  | ||
|  | } from '../../security/index.ts'; | ||
|  | import { UnifiedDeliveryQueue, type IQueueItem } from './classes.delivery.queue.ts'; | ||
|  | import type { Email } from '../core/classes.email.ts'; | ||
|  | import type { UnifiedEmailServer } from '../routing/classes.unified.email.server.ts'; | ||
|  | import type { SmtpClient } from './smtpclient/smtp-client.ts'; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Delivery status enumeration | ||
|  |  */ | ||
|  | export enum DeliveryStatus { | ||
|  |   PENDING = 'pending', | ||
|  |   DELIVERING = 'delivering', | ||
|  |   DELIVERED = 'delivered', | ||
|  |   DEFERRED = 'deferred', | ||
|  |   FAILED = 'failed' | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Delivery handler interface | ||
|  |  */ | ||
|  | export interface IDeliveryHandler { | ||
|  |   deliver(item: IQueueItem): Promise<any>; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Delivery options | ||
|  |  */ | ||
|  | export interface IMultiModeDeliveryOptions { | ||
|  |   // Connection options
 | ||
|  |   connectionPoolSize?: number; | ||
|  |   socketTimeout?: number; | ||
|  |    | ||
|  |   // Delivery behavior
 | ||
|  |   concurrentDeliveries?: number; | ||
|  |   sendTimeout?: number; | ||
|  |    | ||
|  |   // TLS options
 | ||
|  |   verifyCertificates?: boolean; | ||
|  |   tlsMinVersion?: string; | ||
|  |    | ||
|  |   // Mode-specific handlers
 | ||
|  |   forwardHandler?: IDeliveryHandler; | ||
|  |   deliveryHandler?: IDeliveryHandler; | ||
|  |   processHandler?: IDeliveryHandler; | ||
|  |    | ||
|  |   // Rate limiting
 | ||
|  |   globalRateLimit?: number; | ||
|  |   perPatternRateLimit?: Record<string, number>; | ||
|  |    | ||
|  |   // Bounce handling
 | ||
|  |   processBounces?: boolean; | ||
|  |   bounceHandler?: { | ||
|  |     processSmtpFailure: (recipient: string, smtpResponse: string, options: any) => Promise<any>; | ||
|  |   }; | ||
|  |    | ||
|  |   // Event hooks
 | ||
|  |   onDeliveryStart?: (item: IQueueItem) => Promise<void>; | ||
|  |   onDeliverySuccess?: (item: IQueueItem, result: any) => Promise<void>; | ||
|  |   onDeliveryFailed?: (item: IQueueItem, error: string) => Promise<void>; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Delivery system statistics | ||
|  |  */ | ||
|  | export interface IDeliveryStats { | ||
|  |   activeDeliveries: number; | ||
|  |   totalSuccessful: number; | ||
|  |   totalFailed: number; | ||
|  |   avgDeliveryTime: number; | ||
|  |   byMode: { | ||
|  |     forward: { | ||
|  |       successful: number; | ||
|  |       failed: number; | ||
|  |     }; | ||
|  |     mta: { | ||
|  |       successful: number; | ||
|  |       failed: number; | ||
|  |     }; | ||
|  |     process: { | ||
|  |       successful: number; | ||
|  |       failed: number; | ||
|  |     }; | ||
|  |   }; | ||
|  |   rateLimiting: { | ||
|  |     currentRate: number; | ||
|  |     globalLimit: number; | ||
|  |     throttled: number; | ||
|  |   }; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Handles delivery for all email processing modes | ||
|  |  */ | ||
|  | export class MultiModeDeliverySystem extends EventEmitter { | ||
|  |   private queue: UnifiedDeliveryQueue; | ||
|  |   private options: Required<IMultiModeDeliveryOptions>; | ||
|  |   private stats: IDeliveryStats; | ||
|  |   private deliveryTimes: number[] = []; | ||
|  |   private activeDeliveries: Set<string> = new Set(); | ||
|  |   private running: boolean = false; | ||
|  |   private throttled: boolean = false; | ||
|  |   private rateLimitLastCheck: number = Date.now(); | ||
|  |   private rateLimitCounter: number = 0; | ||
|  |   private emailServer?: UnifiedEmailServer; | ||
|  |    | ||
|  |   /** | ||
|  |    * Create a new multi-mode delivery system | ||
|  |    * @param queue Unified delivery queue | ||
|  |    * @param options Delivery options | ||
|  |    * @param emailServer Optional reference to unified email server for SmtpClient access | ||
|  |    */ | ||
|  |   constructor(queue: UnifiedDeliveryQueue, options: IMultiModeDeliveryOptions, emailServer?: UnifiedEmailServer) { | ||
|  |     super(); | ||
|  |      | ||
|  |     this.queue = queue; | ||
|  |     this.emailServer = emailServer; | ||
|  |      | ||
|  |     // Set default options
 | ||
|  |     this.options = { | ||
|  |       connectionPoolSize: options.connectionPoolSize || 10, | ||
|  |       socketTimeout: options.socketTimeout || 30000, // 30 seconds
 | ||
|  |       concurrentDeliveries: options.concurrentDeliveries || 10, | ||
|  |       sendTimeout: options.sendTimeout || 60000, // 1 minute
 | ||
|  |       verifyCertificates: options.verifyCertificates !== false, // Default to true
 | ||
|  |       tlsMinVersion: options.tlsMinVersion || 'TLSv1.2', | ||
|  |       forwardHandler: options.forwardHandler || { | ||
|  |         deliver: this.handleForwardDelivery.bind(this) | ||
|  |       }, | ||
|  |       deliveryHandler: options.deliveryHandler || { | ||
|  |         deliver: this.handleMtaDelivery.bind(this) | ||
|  |       }, | ||
|  |       processHandler: options.processHandler || { | ||
|  |         deliver: this.handleProcessDelivery.bind(this) | ||
|  |       }, | ||
|  |       globalRateLimit: options.globalRateLimit || 100, // 100 emails per minute
 | ||
|  |       perPatternRateLimit: options.perPatternRateLimit || {}, | ||
|  |       processBounces: options.processBounces !== false, // Default to true
 | ||
|  |       bounceHandler: options.bounceHandler || null, | ||
|  |       onDeliveryStart: options.onDeliveryStart || (async () => {}), | ||
|  |       onDeliverySuccess: options.onDeliverySuccess || (async () => {}), | ||
|  |       onDeliveryFailed: options.onDeliveryFailed || (async () => {}) | ||
|  |     }; | ||
|  |      | ||
|  |     // Initialize statistics
 | ||
|  |     this.stats = { | ||
|  |       activeDeliveries: 0, | ||
|  |       totalSuccessful: 0, | ||
|  |       totalFailed: 0, | ||
|  |       avgDeliveryTime: 0, | ||
|  |       byMode: { | ||
|  |         forward: { | ||
|  |           successful: 0, | ||
|  |           failed: 0 | ||
|  |         }, | ||
|  |         mta: { | ||
|  |           successful: 0, | ||
|  |           failed: 0 | ||
|  |         }, | ||
|  |         process: { | ||
|  |           successful: 0, | ||
|  |           failed: 0 | ||
|  |         } | ||
|  |       }, | ||
|  |       rateLimiting: { | ||
|  |         currentRate: 0, | ||
|  |         globalLimit: this.options.globalRateLimit, | ||
|  |         throttled: 0 | ||
|  |       } | ||
|  |     }; | ||
|  |      | ||
|  |     // Set up event listeners
 | ||
|  |     this.queue.on('itemsReady', this.processItems.bind(this)); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Start the delivery system | ||
|  |    */ | ||
|  |   public async start(): Promise<void> { | ||
|  |     logger.log('info', 'Starting MultiModeDeliverySystem'); | ||
|  |      | ||
|  |     if (this.running) { | ||
|  |       logger.log('warn', 'MultiModeDeliverySystem is already running'); | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     this.running = true; | ||
|  |      | ||
|  |     // Emit started event
 | ||
|  |     this.emit('started'); | ||
|  |     logger.log('info', 'MultiModeDeliverySystem started successfully'); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Stop the delivery system | ||
|  |    */ | ||
|  |   public async stop(): Promise<void> { | ||
|  |     logger.log('info', 'Stopping MultiModeDeliverySystem'); | ||
|  |      | ||
|  |     if (!this.running) { | ||
|  |       logger.log('warn', 'MultiModeDeliverySystem is already stopped'); | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     this.running = false; | ||
|  |      | ||
|  |     // Wait for active deliveries to complete
 | ||
|  |     if (this.activeDeliveries.size > 0) { | ||
|  |       logger.log('info', `Waiting for ${this.activeDeliveries.size} active deliveries to complete`); | ||
|  |        | ||
|  |       // Wait for a maximum of 30 seconds
 | ||
|  |       await new Promise<void>(resolve => { | ||
|  |         const checkInterval = setInterval(() => { | ||
|  |           if (this.activeDeliveries.size === 0) { | ||
|  |             clearInterval(checkInterval); | ||
|  |             clearTimeout(forceTimeout); | ||
|  |             resolve(); | ||
|  |           } | ||
|  |         }, 1000); | ||
|  |          | ||
|  |         // Force resolve after 30 seconds
 | ||
|  |         const forceTimeout = setTimeout(() => { | ||
|  |           clearInterval(checkInterval); | ||
|  |           resolve(); | ||
|  |         }, 30000); | ||
|  |       }); | ||
|  |     } | ||
|  |      | ||
|  |     // Emit stopped event
 | ||
|  |     this.emit('stopped'); | ||
|  |     logger.log('info', 'MultiModeDeliverySystem stopped successfully'); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Process ready items from the queue | ||
|  |    * @param items Queue items ready for processing | ||
|  |    */ | ||
|  |   private async processItems(items: IQueueItem[]): Promise<void> { | ||
|  |     if (!this.running) { | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     // Check if we're already at max concurrent deliveries
 | ||
|  |     if (this.activeDeliveries.size >= this.options.concurrentDeliveries) { | ||
|  |       logger.log('debug', `Already at max concurrent deliveries (${this.activeDeliveries.size})`); | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     // Check rate limiting
 | ||
|  |     if (this.checkRateLimit()) { | ||
|  |       logger.log('debug', 'Rate limit exceeded, throttling deliveries'); | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     // Calculate how many more deliveries we can start
 | ||
|  |     const availableSlots = this.options.concurrentDeliveries - this.activeDeliveries.size; | ||
|  |     const itemsToProcess = items.slice(0, availableSlots); | ||
|  |      | ||
|  |     if (itemsToProcess.length === 0) { | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     logger.log('info', `Processing ${itemsToProcess.length} items for delivery`); | ||
|  |      | ||
|  |     // Process each item
 | ||
|  |     for (const item of itemsToProcess) { | ||
|  |       // Mark as processing
 | ||
|  |       await this.queue.markProcessing(item.id); | ||
|  |        | ||
|  |       // Add to active deliveries
 | ||
|  |       this.activeDeliveries.add(item.id); | ||
|  |       this.stats.activeDeliveries = this.activeDeliveries.size; | ||
|  |        | ||
|  |       // Deliver asynchronously
 | ||
|  |       this.deliverItem(item).catch(err => { | ||
|  |         logger.log('error', `Unhandled error in delivery: ${err.message}`); | ||
|  |       }); | ||
|  |     } | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.emit('statsUpdated', this.stats); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Deliver an item from the queue | ||
|  |    * @param item Queue item to deliver | ||
|  |    */ | ||
|  |   private async deliverItem(item: IQueueItem): Promise<void> { | ||
|  |     const startTime = Date.now(); | ||
|  |      | ||
|  |     try { | ||
|  |       // Call delivery start hook
 | ||
|  |       await this.options.onDeliveryStart(item); | ||
|  |        | ||
|  |       // Emit delivery start event
 | ||
|  |       this.emit('deliveryStart', item); | ||
|  |       logger.log('info', `Starting delivery of item ${item.id}, mode: ${item.processingMode}`); | ||
|  |        | ||
|  |       // Choose the appropriate handler based on mode
 | ||
|  |       let result: any; | ||
|  |        | ||
|  |       switch (item.processingMode) { | ||
|  |         case 'forward': | ||
|  |           result = await this.options.forwardHandler.deliver(item); | ||
|  |           break; | ||
|  |            | ||
|  |         case 'mta': | ||
|  |           result = await this.options.deliveryHandler.deliver(item); | ||
|  |           break; | ||
|  |            | ||
|  |         case 'process': | ||
|  |           result = await this.options.processHandler.deliver(item); | ||
|  |           break; | ||
|  |            | ||
|  |         default: | ||
|  |           throw new Error(`Unknown processing mode: ${item.processingMode}`); | ||
|  |       } | ||
|  |        | ||
|  |       // Mark as delivered
 | ||
|  |       await this.queue.markDelivered(item.id); | ||
|  |        | ||
|  |       // Update statistics
 | ||
|  |       this.stats.totalSuccessful++; | ||
|  |       this.stats.byMode[item.processingMode].successful++; | ||
|  |        | ||
|  |       // Calculate delivery time
 | ||
|  |       const deliveryTime = Date.now() - startTime; | ||
|  |       this.deliveryTimes.push(deliveryTime); | ||
|  |       this.updateDeliveryTimeStats(); | ||
|  |        | ||
|  |       // Call delivery success hook
 | ||
|  |       await this.options.onDeliverySuccess(item, result); | ||
|  |        | ||
|  |       // Emit delivery success event
 | ||
|  |       this.emit('deliverySuccess', item, result); | ||
|  |       logger.log('info', `Item ${item.id} delivered successfully in ${deliveryTime}ms`); | ||
|  |        | ||
|  |       SecurityLogger.getInstance().logEvent({ | ||
|  |         level: SecurityLogLevel.INFO, | ||
|  |         type: SecurityEventType.EMAIL_DELIVERY, | ||
|  |         message: 'Email delivery successful', | ||
|  |         details: { | ||
|  |           itemId: item.id, | ||
|  |           mode: item.processingMode, | ||
|  |           routeName: item.route?.name || 'unknown', | ||
|  |           deliveryTime | ||
|  |         }, | ||
|  |         success: true | ||
|  |       }); | ||
|  |     } catch (error: any) { | ||
|  |       // Calculate delivery attempt time even for failures
 | ||
|  |       const deliveryTime = Date.now() - startTime; | ||
|  |        | ||
|  |       // Mark as failed
 | ||
|  |       await this.queue.markFailed(item.id, error.message); | ||
|  |        | ||
|  |       // Update statistics
 | ||
|  |       this.stats.totalFailed++; | ||
|  |       this.stats.byMode[item.processingMode].failed++; | ||
|  |        | ||
|  |       // Call delivery failed hook
 | ||
|  |       await this.options.onDeliveryFailed(item, error.message); | ||
|  |        | ||
|  |       // Process as bounce if enabled and we have a bounce handler
 | ||
|  |       if (this.options.processBounces && this.options.bounceHandler) { | ||
|  |         try { | ||
|  |           const email = item.processingResult as Email; | ||
|  |            | ||
|  |           // Extract recipient and error message
 | ||
|  |           // For multiple recipients, we'd need more sophisticated parsing
 | ||
|  |           const recipient = email.to.length > 0 ? email.to[0] : ''; | ||
|  |            | ||
|  |           if (recipient) { | ||
|  |             logger.log('info', `Processing delivery failure as bounce for recipient ${recipient}`); | ||
|  |              | ||
|  |             // Process SMTP failure through bounce handler
 | ||
|  |             await this.options.bounceHandler.processSmtpFailure( | ||
|  |               recipient, | ||
|  |               error.message, | ||
|  |               { | ||
|  |                 sender: email.from, | ||
|  |                 originalEmailId: item.id, | ||
|  |                 headers: email.headers | ||
|  |               } | ||
|  |             ); | ||
|  |              | ||
|  |             logger.log('info', `Bounce record created for failed delivery to ${recipient}`); | ||
|  |           } | ||
|  |         } catch (bounceError) { | ||
|  |           logger.log('error', `Failed to process bounce: ${bounceError.message}`); | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       // Emit delivery failed event
 | ||
|  |       this.emit('deliveryFailed', item, error); | ||
|  |       logger.log('error', `Item ${item.id} delivery failed: ${error.message}`); | ||
|  |        | ||
|  |       SecurityLogger.getInstance().logEvent({ | ||
|  |         level: SecurityLogLevel.ERROR, | ||
|  |         type: SecurityEventType.EMAIL_DELIVERY, | ||
|  |         message: 'Email delivery failed', | ||
|  |         details: { | ||
|  |           itemId: item.id, | ||
|  |           mode: item.processingMode, | ||
|  |           routeName: item.route?.name || 'unknown', | ||
|  |           error: error.message, | ||
|  |           deliveryTime | ||
|  |         }, | ||
|  |         success: false | ||
|  |       }); | ||
|  |     } finally { | ||
|  |       // Remove from active deliveries
 | ||
|  |       this.activeDeliveries.delete(item.id); | ||
|  |       this.stats.activeDeliveries = this.activeDeliveries.size; | ||
|  |        | ||
|  |       // Update statistics
 | ||
|  |       this.emit('statsUpdated', this.stats); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Default handler for forward mode delivery | ||
|  |    * @param item Queue item | ||
|  |    */ | ||
|  |   private async handleForwardDelivery(item: IQueueItem): Promise<any> { | ||
|  |     logger.log('info', `Forward delivery for item ${item.id}`); | ||
|  |      | ||
|  |     const email = item.processingResult as Email; | ||
|  |     const route = item.route; | ||
|  |      | ||
|  |     // Get target server information
 | ||
|  |     const targetServer = route?.action.forward?.host; | ||
|  |     const targetPort = route?.action.forward?.port || 25; | ||
|  |     const useTls = false; // TLS configuration can be enhanced later
 | ||
|  |      | ||
|  |     if (!targetServer) { | ||
|  |       throw new Error('No target server configured for forward mode'); | ||
|  |     } | ||
|  |      | ||
|  |     logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`); | ||
|  |      | ||
|  |     try { | ||
|  |       // Get SMTP client from email server if available
 | ||
|  |       if (!this.emailServer) { | ||
|  |         // Fall back to raw socket implementation if no email server
 | ||
|  |         logger.log('warn', 'No email server available, falling back to raw socket implementation'); | ||
|  |         return this.handleForwardDeliveryLegacy(item); | ||
|  |       } | ||
|  |        | ||
|  |       // Get SMTP client from UnifiedEmailServer
 | ||
|  |       const smtpClient = this.emailServer.getSmtpClient(targetServer, targetPort); | ||
|  |        | ||
|  |       // Apply DKIM signing if configured in the route
 | ||
|  |       if (item.route?.action.options?.mtaOptions?.dkimSign) { | ||
|  |         await this.applyDkimSigning(email, item.route.action.options.mtaOptions); | ||
|  |       } | ||
|  |        | ||
|  |       // Send the email using SmtpClient
 | ||
|  |       const result = await smtpClient.sendMail(email); | ||
|  |        | ||
|  |       if (result.success) { | ||
|  |         logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`); | ||
|  |          | ||
|  |         return { | ||
|  |           targetServer: targetServer, | ||
|  |           targetPort: targetPort, | ||
|  |           recipients: result.acceptedRecipients.length, | ||
|  |           messageId: result.messageId, | ||
|  |           rejectedRecipients: result.rejectedRecipients | ||
|  |         }; | ||
|  |       } else { | ||
|  |         throw new Error(result.error?.message || 'Failed to forward email'); | ||
|  |       } | ||
|  |     } catch (error: any) { | ||
|  |       logger.log('error', `Failed to forward email: ${error.message}`); | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Legacy forward delivery using raw sockets (fallback) | ||
|  |    * @param item Queue item | ||
|  |    */ | ||
|  |   private async handleForwardDeliveryLegacy(item: IQueueItem): Promise<any> { | ||
|  |     const email = item.processingResult as Email; | ||
|  |     const route = item.route; | ||
|  |      | ||
|  |     // Get target server information
 | ||
|  |     const targetServer = route?.action.forward?.host; | ||
|  |     const targetPort = route?.action.forward?.port || 25; | ||
|  |     const useTls = false; // TLS configuration can be enhanced later
 | ||
|  |      | ||
|  |     if (!targetServer) { | ||
|  |       throw new Error('No target server configured for forward mode'); | ||
|  |     } | ||
|  |      | ||
|  |     // Create a socket connection to the target server
 | ||
|  |     const socket = new net.Socket(); | ||
|  |      | ||
|  |     // Set timeout
 | ||
|  |     socket.setTimeout(this.options.socketTimeout); | ||
|  |      | ||
|  |     try { | ||
|  |       // Connect to the target server
 | ||
|  |       await new Promise<void>((resolve, reject) => { | ||
|  |         // Handle connection events
 | ||
|  |         socket.on('connect', () => { | ||
|  |           logger.log('debug', `Connected to ${targetServer}:${targetPort}`); | ||
|  |           resolve(); | ||
|  |         }); | ||
|  |          | ||
|  |         socket.on('timeout', () => { | ||
|  |           reject(new Error(`Connection timeout to ${targetServer}:${targetPort}`)); | ||
|  |         }); | ||
|  |          | ||
|  |         socket.on('error', (err) => { | ||
|  |           reject(new Error(`Connection error to ${targetServer}:${targetPort}: ${err.message}`)); | ||
|  |         }); | ||
|  |          | ||
|  |         // Connect to the server
 | ||
|  |         socket.connect({ | ||
|  |           host: targetServer, | ||
|  |           port: targetPort | ||
|  |         }); | ||
|  |       }); | ||
|  |        | ||
|  |       // Send EHLO
 | ||
|  |       await this.smtpCommand(socket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`); | ||
|  |        | ||
|  |       // Start TLS if required
 | ||
|  |       if (useTls) { | ||
|  |         await this.smtpCommand(socket, 'STARTTLS'); | ||
|  |          | ||
|  |         // Upgrade to TLS
 | ||
|  |         const tlsSocket = await this.upgradeTls(socket, targetServer); | ||
|  |          | ||
|  |         // Send EHLO again after STARTTLS
 | ||
|  |         await this.smtpCommand(tlsSocket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`); | ||
|  |          | ||
|  |         // Use tlsSocket for remaining commands
 | ||
|  |         return this.completeSMTPExchange(tlsSocket, email, route); | ||
|  |       } | ||
|  |        | ||
|  |       // Complete the SMTP exchange
 | ||
|  |       return this.completeSMTPExchange(socket, email, route); | ||
|  |     } catch (error: any) { | ||
|  |       logger.log('error', `Failed to forward email: ${error.message}`); | ||
|  |        | ||
|  |       // Close the connection
 | ||
|  |       socket.destroy(); | ||
|  |        | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Complete the SMTP exchange after connection and initial setup | ||
|  |    * @param socket Network socket | ||
|  |    * @param email Email to send | ||
|  |    * @param rule Domain rule | ||
|  |    */ | ||
|  |   private async completeSMTPExchange(socket: net.Socket | tls.TLSSocket, email: Email, route: any): Promise<any> { | ||
|  |     try { | ||
|  |       // Authenticate if credentials provided
 | ||
|  |       if (route?.action?.forward?.auth?.user && route?.action?.forward?.auth?.pass) { | ||
|  |         // Send AUTH LOGIN
 | ||
|  |         await this.smtpCommand(socket, 'AUTH LOGIN'); | ||
|  |          | ||
|  |         // Send username (base64)
 | ||
|  |         const username = Buffer.from(route.action.forward.auth.user).toString('base64'); | ||
|  |         await this.smtpCommand(socket, username); | ||
|  |          | ||
|  |         // Send password (base64)
 | ||
|  |         const password = Buffer.from(route.action.forward.auth.pass).toString('base64'); | ||
|  |         await this.smtpCommand(socket, password); | ||
|  |       } | ||
|  |        | ||
|  |       // Send MAIL FROM
 | ||
|  |       await this.smtpCommand(socket, `MAIL FROM:<${email.from}>`); | ||
|  |        | ||
|  |       // Send RCPT TO for each recipient
 | ||
|  |       for (const recipient of email.getAllRecipients()) { | ||
|  |         await this.smtpCommand(socket, `RCPT TO:<${recipient}>`); | ||
|  |       } | ||
|  |        | ||
|  |       // Send DATA
 | ||
|  |       await this.smtpCommand(socket, 'DATA'); | ||
|  |        | ||
|  |       // Send email content (simplified)
 | ||
|  |       const emailContent = await this.getFormattedEmail(email); | ||
|  |       await this.smtpData(socket, emailContent); | ||
|  |        | ||
|  |       // Send QUIT
 | ||
|  |       await this.smtpCommand(socket, 'QUIT'); | ||
|  |        | ||
|  |       // Close the connection
 | ||
|  |       socket.end(); | ||
|  |        | ||
|  |       logger.log('info', `Email forwarded successfully to ${route?.action?.forward?.host}:${route?.action?.forward?.port || 25}`); | ||
|  |        | ||
|  |       return { | ||
|  |         targetServer: route?.action?.forward?.host, | ||
|  |         targetPort: route?.action?.forward?.port || 25, | ||
|  |         recipients: email.getAllRecipients().length | ||
|  |       }; | ||
|  |     } catch (error: any) { | ||
|  |       logger.log('error', `Failed to forward email: ${error.message}`); | ||
|  |        | ||
|  |       // Close the connection
 | ||
|  |       socket.destroy(); | ||
|  |        | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Default handler for MTA mode delivery | ||
|  |    * @param item Queue item | ||
|  |    */ | ||
|  |   private async handleMtaDelivery(item: IQueueItem): Promise<any> { | ||
|  |     logger.log('info', `MTA delivery for item ${item.id}`); | ||
|  |      | ||
|  |     const email = item.processingResult as Email; | ||
|  |     const route = item.route; | ||
|  |      | ||
|  |     try { | ||
|  |       // Apply DKIM signing if configured in the route
 | ||
|  |       if (item.route?.action.options?.mtaOptions?.dkimSign) { | ||
|  |         await this.applyDkimSigning(email, item.route.action.options.mtaOptions); | ||
|  |       } | ||
|  |        | ||
|  |       // In a full implementation, this would use the MTA service
 | ||
|  |       // For now, we'll simulate a successful delivery
 | ||
|  |        | ||
|  |       logger.log('info', `Email processed by MTA: ${email.subject} to ${email.getAllRecipients().join(', ')}`); | ||
|  |        | ||
|  |       // Note: The MTA implementation would handle actual local delivery
 | ||
|  |        | ||
|  |       // Simulate successful delivery
 | ||
|  |       return { | ||
|  |         recipients: email.getAllRecipients().length, | ||
|  |         subject: email.subject, | ||
|  |         dkimSigned: !!item.route?.action.options?.mtaOptions?.dkimSign | ||
|  |       }; | ||
|  |     } catch (error: any) { | ||
|  |       logger.log('error', `Failed to process email in MTA mode: ${error.message}`); | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Default handler for process mode delivery | ||
|  |    * @param item Queue item | ||
|  |    */ | ||
|  |   private async handleProcessDelivery(item: IQueueItem): Promise<any> { | ||
|  |     logger.log('info', `Process delivery for item ${item.id}`); | ||
|  |      | ||
|  |     const email = item.processingResult as Email; | ||
|  |     const route = item.route; | ||
|  |      | ||
|  |     try { | ||
|  |       // Apply content scanning if enabled
 | ||
|  |       if (route?.action.options?.contentScanning && route?.action.options?.scanners && route.action.options.scanners.length > 0) { | ||
|  |         logger.log('info', 'Performing content scanning'); | ||
|  |          | ||
|  |         // Apply each scanner
 | ||
|  |         for (const scanner of route.action.options.scanners) { | ||
|  |           switch (scanner.type) { | ||
|  |             case 'spam': | ||
|  |               logger.log('info', 'Scanning for spam content'); | ||
|  |               // Implement spam scanning
 | ||
|  |               break; | ||
|  |                | ||
|  |             case 'virus': | ||
|  |               logger.log('info', 'Scanning for virus content'); | ||
|  |               // Implement virus scanning
 | ||
|  |               break; | ||
|  |                | ||
|  |             case 'attachment': | ||
|  |               logger.log('info', 'Scanning attachments'); | ||
|  |                | ||
|  |               // Check for blocked extensions
 | ||
|  |               if (scanner.blockedExtensions && scanner.blockedExtensions.length > 0) { | ||
|  |                 for (const attachment of email.attachments) { | ||
|  |                   const ext = this.getFileExtension(attachment.filename); | ||
|  |                   if (scanner.blockedExtensions.includes(ext)) { | ||
|  |                     if (scanner.action === 'reject') { | ||
|  |                       throw new Error(`Blocked attachment type: ${ext}`); | ||
|  |                     } else { // tag
 | ||
|  |                       email.addHeader('X-Attachment-Warning', `Potentially unsafe attachment: ${attachment.filename}`); | ||
|  |                     } | ||
|  |                   } | ||
|  |                 } | ||
|  |               } | ||
|  |               break; | ||
|  |           } | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       // Apply transformations if defined
 | ||
|  |       if (route?.action.options?.transformations && route?.action.options?.transformations.length > 0) { | ||
|  |         logger.log('info', 'Applying email transformations'); | ||
|  |          | ||
|  |         for (const transform of route.action.options.transformations) { | ||
|  |           switch (transform.type) { | ||
|  |             case 'addHeader': | ||
|  |               if (transform.header && transform.value) { | ||
|  |                 email.addHeader(transform.header, transform.value); | ||
|  |               } | ||
|  |               break; | ||
|  |           } | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       // Apply DKIM signing if configured (after all transformations)
 | ||
|  |       if (item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim) { | ||
|  |         await this.applyDkimSigning(email, item.route.action.options?.mtaOptions || {}); | ||
|  |       } | ||
|  |        | ||
|  |       logger.log('info', `Email successfully processed in store-and-forward mode`); | ||
|  |        | ||
|  |       // Simulate successful delivery
 | ||
|  |       return { | ||
|  |         recipients: email.getAllRecipients().length, | ||
|  |         subject: email.subject, | ||
|  |         scanned: !!route?.action.options?.contentScanning, | ||
|  |         transformed: !!(route?.action.options?.transformations && route?.action.options?.transformations.length > 0), | ||
|  |         dkimSigned: !!(item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim) | ||
|  |       }; | ||
|  |     } catch (error: any) { | ||
|  |       logger.log('error', `Failed to process email: ${error.message}`); | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Get file extension from filename | ||
|  |    */ | ||
|  |   private getFileExtension(filename: string): string { | ||
|  |     return filename.substring(filename.lastIndexOf('.')).toLowerCase(); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Apply DKIM signing to an email | ||
|  |    */ | ||
|  |   private async applyDkimSigning(email: Email, mtaOptions: any): Promise<void> { | ||
|  |     if (!this.emailServer) { | ||
|  |       logger.log('warn', 'Cannot apply DKIM signing without email server reference'); | ||
|  |       return; | ||
|  |     } | ||
|  |      | ||
|  |     const domainName = mtaOptions.dkimOptions?.domainName || email.from.split('@')[1]; | ||
|  |     const keySelector = mtaOptions.dkimOptions?.keySelector || 'default'; | ||
|  |      | ||
|  |     try { | ||
|  |       // Ensure DKIM keys exist for the domain
 | ||
|  |       await this.emailServer.dkimCreator.handleDKIMKeysForDomain(domainName); | ||
|  |        | ||
|  |       // Convert Email to raw format for signing
 | ||
|  |       const rawEmail = email.toRFC822String(); | ||
|  |        | ||
|  |       // Sign the email
 | ||
|  |       const signResult = await plugins.dkimSign(rawEmail, { | ||
|  |         canonicalization: 'relaxed/relaxed', | ||
|  |         algorithm: 'rsa-sha256', | ||
|  |         signTime: new Date(), | ||
|  |         signatureData: [ | ||
|  |           { | ||
|  |             signingDomain: domainName, | ||
|  |             selector: keySelector, | ||
|  |             privateKey: (await this.emailServer.dkimCreator.readDKIMKeys(domainName)).privateKey, | ||
|  |             algorithm: 'rsa-sha256', | ||
|  |             canonicalization: 'relaxed/relaxed' | ||
|  |           } | ||
|  |         ] | ||
|  |       }); | ||
|  |        | ||
|  |       // Add the DKIM-Signature header to the email
 | ||
|  |       if (signResult.signatures) { | ||
|  |         email.addHeader('DKIM-Signature', signResult.signatures); | ||
|  |         logger.log('info', `Successfully added DKIM signature for ${domainName}`); | ||
|  |       } | ||
|  |     } catch (error) { | ||
|  |       logger.log('error', `Failed to apply DKIM signature: ${error.message}`); | ||
|  |       // Don't throw - allow email to be sent without DKIM if signing fails
 | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Format email for SMTP transmission | ||
|  |    * @param email Email to format | ||
|  |    */ | ||
|  |   private async getFormattedEmail(email: Email): Promise<string> { | ||
|  |     // This is a simplified implementation
 | ||
|  |     // In a full implementation, this would use proper MIME formatting
 | ||
|  |      | ||
|  |     let content = ''; | ||
|  |      | ||
|  |     // Add headers
 | ||
|  |     content += `From: ${email.from}\r\n`; | ||
|  |     content += `To: ${email.to.join(', ')}\r\n`; | ||
|  |     content += `Subject: ${email.subject}\r\n`; | ||
|  |      | ||
|  |     // Add additional headers
 | ||
|  |     for (const [name, value] of Object.entries(email.headers || {})) { | ||
|  |       content += `${name}: ${value}\r\n`; | ||
|  |     } | ||
|  |      | ||
|  |     // Add content type for multipart
 | ||
|  |     if (email.attachments && email.attachments.length > 0) { | ||
|  |       const boundary = `----_=_NextPart_${Math.random().toString(36).substr(2)}`; | ||
|  |       content += `MIME-Version: 1.0\r\n`; | ||
|  |       content += `Content-Type: multipart/mixed; boundary="${boundary}"\r\n`; | ||
|  |       content += `\r\n`; | ||
|  |        | ||
|  |       // Add text part
 | ||
|  |       content += `--${boundary}\r\n`; | ||
|  |       content += `Content-Type: text/plain; charset="UTF-8"\r\n`; | ||
|  |       content += `\r\n`; | ||
|  |       content += `${email.text}\r\n`; | ||
|  |        | ||
|  |       // Add HTML part if present
 | ||
|  |       if (email.html) { | ||
|  |         content += `--${boundary}\r\n`; | ||
|  |         content += `Content-Type: text/html; charset="UTF-8"\r\n`; | ||
|  |         content += `\r\n`; | ||
|  |         content += `${email.html}\r\n`; | ||
|  |       } | ||
|  |        | ||
|  |       // Add attachments
 | ||
|  |       for (const attachment of email.attachments) { | ||
|  |         content += `--${boundary}\r\n`; | ||
|  |         content += `Content-Type: ${attachment.contentType || 'application/octet-stream'}; name="${attachment.filename}"\r\n`; | ||
|  |         content += `Content-Disposition: attachment; filename="${attachment.filename}"\r\n`; | ||
|  |         content += `Content-Transfer-Encoding: base64\r\n`; | ||
|  |         content += `\r\n`; | ||
|  |          | ||
|  |         // Add base64 encoded content
 | ||
|  |         const base64Content = attachment.content.toString('base64'); | ||
|  |          | ||
|  |         // Split into lines of 76 characters
 | ||
|  |         for (let i = 0; i < base64Content.length; i += 76) { | ||
|  |           content += base64Content.substring(i, i + 76) + '\r\n'; | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       // End boundary
 | ||
|  |       content += `--${boundary}--\r\n`; | ||
|  |     } else { | ||
|  |       // Simple email with just text
 | ||
|  |       content += `Content-Type: text/plain; charset="UTF-8"\r\n`; | ||
|  |       content += `\r\n`; | ||
|  |       content += `${email.text}\r\n`; | ||
|  |     } | ||
|  |      | ||
|  |     return content; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Send SMTP command and wait for response | ||
|  |    * @param socket Socket connection | ||
|  |    * @param command SMTP command to send | ||
|  |    */ | ||
|  |   private async smtpCommand(socket: net.Socket, command: string): Promise<string> { | ||
|  |     return new Promise<string>((resolve, reject) => { | ||
|  |       const onData = (data: Buffer) => { | ||
|  |         const response = data.toString().trim(); | ||
|  |          | ||
|  |         // Clean up listeners
 | ||
|  |         socket.removeListener('data', onData); | ||
|  |         socket.removeListener('error', onError); | ||
|  |         socket.removeListener('timeout', onTimeout); | ||
|  |          | ||
|  |         // Check response code
 | ||
|  |         if (response.charAt(0) === '2' || response.charAt(0) === '3') { | ||
|  |           resolve(response); | ||
|  |         } else { | ||
|  |           reject(new Error(`SMTP error: ${response}`)); | ||
|  |         } | ||
|  |       }; | ||
|  |        | ||
|  |       const onError = (err: Error) => { | ||
|  |         // Clean up listeners
 | ||
|  |         socket.removeListener('data', onData); | ||
|  |         socket.removeListener('error', onError); | ||
|  |         socket.removeListener('timeout', onTimeout); | ||
|  |          | ||
|  |         reject(err); | ||
|  |       }; | ||
|  |        | ||
|  |       const onTimeout = () => { | ||
|  |         // Clean up listeners
 | ||
|  |         socket.removeListener('data', onData); | ||
|  |         socket.removeListener('error', onError); | ||
|  |         socket.removeListener('timeout', onTimeout); | ||
|  |          | ||
|  |         reject(new Error('SMTP command timeout')); | ||
|  |       }; | ||
|  |        | ||
|  |       // Set up listeners
 | ||
|  |       socket.once('data', onData); | ||
|  |       socket.once('error', onError); | ||
|  |       socket.once('timeout', onTimeout); | ||
|  |        | ||
|  |       // Send command
 | ||
|  |       socket.write(command + '\r\n'); | ||
|  |     }); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Send SMTP DATA command with content | ||
|  |    * @param socket Socket connection | ||
|  |    * @param data Email content to send | ||
|  |    */ | ||
|  |   private async smtpData(socket: net.Socket, data: string): Promise<string> { | ||
|  |     return new Promise<string>((resolve, reject) => { | ||
|  |       const onData = (responseData: Buffer) => { | ||
|  |         const response = responseData.toString().trim(); | ||
|  |          | ||
|  |         // Clean up listeners
 | ||
|  |         socket.removeListener('data', onData); | ||
|  |         socket.removeListener('error', onError); | ||
|  |         socket.removeListener('timeout', onTimeout); | ||
|  |          | ||
|  |         // Check response code
 | ||
|  |         if (response.charAt(0) === '2') { | ||
|  |           resolve(response); | ||
|  |         } else { | ||
|  |           reject(new Error(`SMTP error: ${response}`)); | ||
|  |         } | ||
|  |       }; | ||
|  |        | ||
|  |       const onError = (err: Error) => { | ||
|  |         // Clean up listeners
 | ||
|  |         socket.removeListener('data', onData); | ||
|  |         socket.removeListener('error', onError); | ||
|  |         socket.removeListener('timeout', onTimeout); | ||
|  |          | ||
|  |         reject(err); | ||
|  |       }; | ||
|  |        | ||
|  |       const onTimeout = () => { | ||
|  |         // Clean up listeners
 | ||
|  |         socket.removeListener('data', onData); | ||
|  |         socket.removeListener('error', onError); | ||
|  |         socket.removeListener('timeout', onTimeout); | ||
|  |          | ||
|  |         reject(new Error('SMTP data timeout')); | ||
|  |       }; | ||
|  |        | ||
|  |       // Set up listeners
 | ||
|  |       socket.once('data', onData); | ||
|  |       socket.once('error', onError); | ||
|  |       socket.once('timeout', onTimeout); | ||
|  |        | ||
|  |       // Send data and end with CRLF.CRLF
 | ||
|  |       socket.write(data + '\r\n.\r\n'); | ||
|  |     }); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Upgrade socket to TLS | ||
|  |    * @param socket Socket connection | ||
|  |    * @param hostname Target hostname for TLS | ||
|  |    */ | ||
|  |   private async upgradeTls(socket: net.Socket, hostname: string): Promise<tls.TLSSocket> { | ||
|  |     return new Promise<tls.TLSSocket>((resolve, reject) => { | ||
|  |       const tlsOptions: tls.ConnectionOptions = { | ||
|  |         socket, | ||
|  |         servername: hostname, | ||
|  |         rejectUnauthorized: this.options.verifyCertificates, | ||
|  |         minVersion: this.options.tlsMinVersion as tls.SecureVersion | ||
|  |       }; | ||
|  |        | ||
|  |       const tlsSocket = tls.connect(tlsOptions); | ||
|  |        | ||
|  |       tlsSocket.once('secureConnect', () => { | ||
|  |         resolve(tlsSocket); | ||
|  |       }); | ||
|  |        | ||
|  |       tlsSocket.once('error', (err) => { | ||
|  |         reject(new Error(`TLS error: ${err.message}`)); | ||
|  |       }); | ||
|  |        | ||
|  |       tlsSocket.setTimeout(this.options.socketTimeout); | ||
|  |        | ||
|  |       tlsSocket.once('timeout', () => { | ||
|  |         reject(new Error('TLS connection timeout')); | ||
|  |       }); | ||
|  |     }); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Update delivery time statistics | ||
|  |    */ | ||
|  |   private updateDeliveryTimeStats(): void { | ||
|  |     if (this.deliveryTimes.length === 0) return; | ||
|  |      | ||
|  |     // Keep only the last 1000 delivery times
 | ||
|  |     if (this.deliveryTimes.length > 1000) { | ||
|  |       this.deliveryTimes = this.deliveryTimes.slice(-1000); | ||
|  |     } | ||
|  |      | ||
|  |     // Calculate average
 | ||
|  |     const sum = this.deliveryTimes.reduce((acc, time) => acc + time, 0); | ||
|  |     this.stats.avgDeliveryTime = sum / this.deliveryTimes.length; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Check if rate limit is exceeded | ||
|  |    * @returns True if rate limited, false otherwise | ||
|  |    */ | ||
|  |   private checkRateLimit(): boolean { | ||
|  |     const now = Date.now(); | ||
|  |     const elapsed = now - this.rateLimitLastCheck; | ||
|  |      | ||
|  |     // Reset counter if more than a minute has passed
 | ||
|  |     if (elapsed >= 60000) { | ||
|  |       this.rateLimitLastCheck = now; | ||
|  |       this.rateLimitCounter = 0; | ||
|  |       this.throttled = false; | ||
|  |       this.stats.rateLimiting.currentRate = 0; | ||
|  |       return false; | ||
|  |     } | ||
|  |      | ||
|  |     // Check if we're already throttled
 | ||
|  |     if (this.throttled) { | ||
|  |       return true; | ||
|  |     } | ||
|  |      | ||
|  |     // Increment counter
 | ||
|  |     this.rateLimitCounter++; | ||
|  |      | ||
|  |     // Calculate current rate (emails per minute)
 | ||
|  |     const rate = (this.rateLimitCounter / elapsed) * 60000; | ||
|  |     this.stats.rateLimiting.currentRate = rate; | ||
|  |      | ||
|  |     // Check if rate limit is exceeded
 | ||
|  |     if (rate > this.options.globalRateLimit) { | ||
|  |       this.throttled = true; | ||
|  |       this.stats.rateLimiting.throttled++; | ||
|  |        | ||
|  |       // Schedule throttle reset
 | ||
|  |       const resetDelay = 60000 - elapsed; | ||
|  |       setTimeout(() => { | ||
|  |         this.throttled = false; | ||
|  |         this.rateLimitLastCheck = Date.now(); | ||
|  |         this.rateLimitCounter = 0; | ||
|  |         this.stats.rateLimiting.currentRate = 0; | ||
|  |       }, resetDelay); | ||
|  |        | ||
|  |       return true; | ||
|  |     } | ||
|  |      | ||
|  |     return false; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Update delivery options | ||
|  |    * @param options New options | ||
|  |    */ | ||
|  |   public updateOptions(options: Partial<IMultiModeDeliveryOptions>): void { | ||
|  |     this.options = { | ||
|  |       ...this.options, | ||
|  |       ...options | ||
|  |     }; | ||
|  |      | ||
|  |     // Update rate limit statistics
 | ||
|  |     if (options.globalRateLimit) { | ||
|  |       this.stats.rateLimiting.globalLimit = options.globalRateLimit; | ||
|  |     } | ||
|  |      | ||
|  |     logger.log('info', 'MultiModeDeliverySystem options updated'); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Get delivery statistics | ||
|  |    */ | ||
|  |   public getStats(): IDeliveryStats { | ||
|  |     return { ...this.stats }; | ||
|  |   } | ||
|  | } |