1087 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			1087 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| import * as plugins from '../../plugins.ts';
 | |
| import { logger } from '../../logger.ts';
 | |
| import { 
 | |
|   SecurityLogger, 
 | |
|   SecurityLogLevel, 
 | |
|   SecurityEventType 
 | |
| } from '../../security/index.ts';
 | |
| import { UnifiedDeliveryQueue, type IQueueItem } from './classes.delivery.queue.ts';
 | |
| import type { Email } from '../core/classes.email.ts';
 | |
| import type { UnifiedEmailServer } from '../routing/classes.unified.email.server.ts';
 | |
| import type { SmtpClient } from './smtpclient/smtp-client.ts';
 | |
| 
 | |
| /**
 | |
|  * Delivery status enumeration
 | |
|  */
 | |
| export enum DeliveryStatus {
 | |
|   PENDING = 'pending',
 | |
|   DELIVERING = 'delivering',
 | |
|   DELIVERED = 'delivered',
 | |
|   DEFERRED = 'deferred',
 | |
|   FAILED = 'failed'
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Delivery handler interface
 | |
|  */
 | |
| export interface IDeliveryHandler {
 | |
|   deliver(item: IQueueItem): Promise<any>;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Delivery options
 | |
|  */
 | |
| export interface IMultiModeDeliveryOptions {
 | |
|   // Connection options
 | |
|   connectionPoolSize?: number;
 | |
|   socketTimeout?: number;
 | |
|   
 | |
|   // Delivery behavior
 | |
|   concurrentDeliveries?: number;
 | |
|   sendTimeout?: number;
 | |
|   
 | |
|   // TLS options
 | |
|   verifyCertificates?: boolean;
 | |
|   tlsMinVersion?: string;
 | |
|   
 | |
|   // Mode-specific handlers
 | |
|   forwardHandler?: IDeliveryHandler;
 | |
|   deliveryHandler?: IDeliveryHandler;
 | |
|   processHandler?: IDeliveryHandler;
 | |
|   
 | |
|   // Rate limiting
 | |
|   globalRateLimit?: number;
 | |
|   perPatternRateLimit?: Record<string, number>;
 | |
|   
 | |
|   // Bounce handling
 | |
|   processBounces?: boolean;
 | |
|   bounceHandler?: {
 | |
|     processSmtpFailure: (recipient: string, smtpResponse: string, options: any) => Promise<any>;
 | |
|   };
 | |
|   
 | |
|   // Event hooks
 | |
|   onDeliveryStart?: (item: IQueueItem) => Promise<void>;
 | |
|   onDeliverySuccess?: (item: IQueueItem, result: any) => Promise<void>;
 | |
|   onDeliveryFailed?: (item: IQueueItem, error: string) => Promise<void>;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Delivery system statistics
 | |
|  */
 | |
| export interface IDeliveryStats {
 | |
|   activeDeliveries: number;
 | |
|   totalSuccessful: number;
 | |
|   totalFailed: number;
 | |
|   avgDeliveryTime: number;
 | |
|   byMode: {
 | |
|     forward: {
 | |
|       successful: number;
 | |
|       failed: number;
 | |
|     };
 | |
|     mta: {
 | |
|       successful: number;
 | |
|       failed: number;
 | |
|     };
 | |
|     process: {
 | |
|       successful: number;
 | |
|       failed: number;
 | |
|     };
 | |
|   };
 | |
|   rateLimiting: {
 | |
|     currentRate: number;
 | |
|     globalLimit: number;
 | |
|     throttled: number;
 | |
|   };
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Handles delivery for all email processing modes
 | |
|  */
 | |
| export class MultiModeDeliverySystem extends plugins.EventEmitter {
 | |
|   private queue: UnifiedDeliveryQueue;
 | |
|   private options: Required<IMultiModeDeliveryOptions>;
 | |
|   private stats: IDeliveryStats;
 | |
|   private deliveryTimes: number[] = [];
 | |
|   private activeDeliveries: Set<string> = new Set();
 | |
|   private running: boolean = false;
 | |
|   private throttled: boolean = false;
 | |
|   private rateLimitLastCheck: number = Date.now();
 | |
|   private rateLimitCounter: number = 0;
 | |
|   private emailServer?: UnifiedEmailServer;
 | |
|   
 | |
|   /**
 | |
|    * Create a new multi-mode delivery system
 | |
|    * @param queue Unified delivery queue
 | |
|    * @param options Delivery options
 | |
|    * @param emailServer Optional reference to unified email server for SmtpClient access
 | |
|    */
 | |
|   constructor(queue: UnifiedDeliveryQueue, options: IMultiModeDeliveryOptions, emailServer?: UnifiedEmailServer) {
 | |
|     super();
 | |
|     
 | |
|     this.queue = queue;
 | |
|     this.emailServer = emailServer;
 | |
|     
 | |
|     // Set default options
 | |
|     this.options = {
 | |
|       connectionPoolSize: options.connectionPoolSize || 10,
 | |
|       socketTimeout: options.socketTimeout || 30000, // 30 seconds
 | |
|       concurrentDeliveries: options.concurrentDeliveries || 10,
 | |
|       sendTimeout: options.sendTimeout || 60000, // 1 minute
 | |
|       verifyCertificates: options.verifyCertificates !== false, // Default to true
 | |
|       tlsMinVersion: options.tlsMinVersion || 'TLSv1.2',
 | |
|       forwardHandler: options.forwardHandler || {
 | |
|         deliver: this.handleForwardDelivery.bind(this)
 | |
|       },
 | |
|       deliveryHandler: options.deliveryHandler || {
 | |
|         deliver: this.handleMtaDelivery.bind(this)
 | |
|       },
 | |
|       processHandler: options.processHandler || {
 | |
|         deliver: this.handleProcessDelivery.bind(this)
 | |
|       },
 | |
|       globalRateLimit: options.globalRateLimit || 100, // 100 emails per minute
 | |
|       perPatternRateLimit: options.perPatternRateLimit || {},
 | |
|       processBounces: options.processBounces !== false, // Default to true
 | |
|       bounceHandler: options.bounceHandler || null,
 | |
|       onDeliveryStart: options.onDeliveryStart || (async () => {}),
 | |
|       onDeliverySuccess: options.onDeliverySuccess || (async () => {}),
 | |
|       onDeliveryFailed: options.onDeliveryFailed || (async () => {})
 | |
|     };
 | |
|     
 | |
|     // Initialize statistics
 | |
|     this.stats = {
 | |
|       activeDeliveries: 0,
 | |
|       totalSuccessful: 0,
 | |
|       totalFailed: 0,
 | |
|       avgDeliveryTime: 0,
 | |
|       byMode: {
 | |
|         forward: {
 | |
|           successful: 0,
 | |
|           failed: 0
 | |
|         },
 | |
|         mta: {
 | |
|           successful: 0,
 | |
|           failed: 0
 | |
|         },
 | |
|         process: {
 | |
|           successful: 0,
 | |
|           failed: 0
 | |
|         }
 | |
|       },
 | |
|       rateLimiting: {
 | |
|         currentRate: 0,
 | |
|         globalLimit: this.options.globalRateLimit,
 | |
|         throttled: 0
 | |
|       }
 | |
|     };
 | |
|     
 | |
|     // Set up event listeners
 | |
|     this.queue.on('itemsReady', this.processItems.bind(this));
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Start the delivery system
 | |
|    */
 | |
|   public async start(): Promise<void> {
 | |
|     logger.log('info', 'Starting MultiModeDeliverySystem');
 | |
|     
 | |
|     if (this.running) {
 | |
|       logger.log('warn', 'MultiModeDeliverySystem is already running');
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     this.running = true;
 | |
|     
 | |
|     // Emit started event
 | |
|     this.emit('started');
 | |
|     logger.log('info', 'MultiModeDeliverySystem started successfully');
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Stop the delivery system
 | |
|    */
 | |
|   public async stop(): Promise<void> {
 | |
|     logger.log('info', 'Stopping MultiModeDeliverySystem');
 | |
|     
 | |
|     if (!this.running) {
 | |
|       logger.log('warn', 'MultiModeDeliverySystem is already stopped');
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     this.running = false;
 | |
|     
 | |
|     // Wait for active deliveries to complete
 | |
|     if (this.activeDeliveries.size > 0) {
 | |
|       logger.log('info', `Waiting for ${this.activeDeliveries.size} active deliveries to complete`);
 | |
|       
 | |
|       // Wait for a maximum of 30 seconds
 | |
|       await new Promise<void>(resolve => {
 | |
|         const checkInterval = setInterval(() => {
 | |
|           if (this.activeDeliveries.size === 0) {
 | |
|             clearInterval(checkInterval);
 | |
|             clearTimeout(forceTimeout);
 | |
|             resolve();
 | |
|           }
 | |
|         }, 1000);
 | |
|         
 | |
|         // Force resolve after 30 seconds
 | |
|         const forceTimeout = setTimeout(() => {
 | |
|           clearInterval(checkInterval);
 | |
|           resolve();
 | |
|         }, 30000);
 | |
|       });
 | |
|     }
 | |
|     
 | |
|     // Emit stopped event
 | |
|     this.emit('stopped');
 | |
|     logger.log('info', 'MultiModeDeliverySystem stopped successfully');
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Process ready items from the queue
 | |
|    * @param items Queue items ready for processing
 | |
|    */
 | |
|   private async processItems(items: IQueueItem[]): Promise<void> {
 | |
|     if (!this.running) {
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Check if we're already at max concurrent deliveries
 | |
|     if (this.activeDeliveries.size >= this.options.concurrentDeliveries) {
 | |
|       logger.log('debug', `Already at max concurrent deliveries (${this.activeDeliveries.size})`);
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Check rate limiting
 | |
|     if (this.checkRateLimit()) {
 | |
|       logger.log('debug', 'Rate limit exceeded, throttling deliveries');
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     // Calculate how many more deliveries we can start
 | |
|     const availableSlots = this.options.concurrentDeliveries - this.activeDeliveries.size;
 | |
|     const itemsToProcess = items.slice(0, availableSlots);
 | |
|     
 | |
|     if (itemsToProcess.length === 0) {
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     logger.log('info', `Processing ${itemsToProcess.length} items for delivery`);
 | |
|     
 | |
|     // Process each item
 | |
|     for (const item of itemsToProcess) {
 | |
|       // Mark as processing
 | |
|       await this.queue.markProcessing(item.id);
 | |
|       
 | |
|       // Add to active deliveries
 | |
|       this.activeDeliveries.add(item.id);
 | |
|       this.stats.activeDeliveries = this.activeDeliveries.size;
 | |
|       
 | |
|       // Deliver asynchronously
 | |
|       this.deliverItem(item).catch(err => {
 | |
|         logger.log('error', `Unhandled error in delivery: ${err.message}`);
 | |
|       });
 | |
|     }
 | |
|     
 | |
|     // Update statistics
 | |
|     this.emit('statsUpdated', this.stats);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Deliver an item from the queue
 | |
|    * @param item Queue item to deliver
 | |
|    */
 | |
|   private async deliverItem(item: IQueueItem): Promise<void> {
 | |
|     const startTime = Date.now();
 | |
|     
 | |
|     try {
 | |
|       // Call delivery start hook
 | |
|       await this.options.onDeliveryStart(item);
 | |
|       
 | |
|       // Emit delivery start event
 | |
|       this.emit('deliveryStart', item);
 | |
|       logger.log('info', `Starting delivery of item ${item.id}, mode: ${item.processingMode}`);
 | |
|       
 | |
|       // Choose the appropriate handler based on mode
 | |
|       let result: any;
 | |
|       
 | |
|       switch (item.processingMode) {
 | |
|         case 'forward':
 | |
|           result = await this.options.forwardHandler.deliver(item);
 | |
|           break;
 | |
|           
 | |
|         case 'mta':
 | |
|           result = await this.options.deliveryHandler.deliver(item);
 | |
|           break;
 | |
|           
 | |
|         case 'process':
 | |
|           result = await this.options.processHandler.deliver(item);
 | |
|           break;
 | |
|           
 | |
|         default:
 | |
|           throw new Error(`Unknown processing mode: ${item.processingMode}`);
 | |
|       }
 | |
|       
 | |
|       // Mark as delivered
 | |
|       await this.queue.markDelivered(item.id);
 | |
|       
 | |
|       // Update statistics
 | |
|       this.stats.totalSuccessful++;
 | |
|       this.stats.byMode[item.processingMode].successful++;
 | |
|       
 | |
|       // Calculate delivery time
 | |
|       const deliveryTime = Date.now() - startTime;
 | |
|       this.deliveryTimes.push(deliveryTime);
 | |
|       this.updateDeliveryTimeStats();
 | |
|       
 | |
|       // Call delivery success hook
 | |
|       await this.options.onDeliverySuccess(item, result);
 | |
|       
 | |
|       // Emit delivery success event
 | |
|       this.emit('deliverySuccess', item, result);
 | |
|       logger.log('info', `Item ${item.id} delivered successfully in ${deliveryTime}ms`);
 | |
|       
 | |
|       SecurityLogger.getInstance().logEvent({
 | |
|         level: SecurityLogLevel.INFO,
 | |
|         type: SecurityEventType.EMAIL_DELIVERY,
 | |
|         message: 'Email delivery successful',
 | |
|         details: {
 | |
|           itemId: item.id,
 | |
|           mode: item.processingMode,
 | |
|           routeName: item.route?.name || 'unknown',
 | |
|           deliveryTime
 | |
|         },
 | |
|         success: true
 | |
|       });
 | |
|     } catch (error: any) {
 | |
|       // Calculate delivery attempt time even for failures
 | |
|       const deliveryTime = Date.now() - startTime;
 | |
|       
 | |
|       // Mark as failed
 | |
|       await this.queue.markFailed(item.id, error.message);
 | |
|       
 | |
|       // Update statistics
 | |
|       this.stats.totalFailed++;
 | |
|       this.stats.byMode[item.processingMode].failed++;
 | |
|       
 | |
|       // Call delivery failed hook
 | |
|       await this.options.onDeliveryFailed(item, error.message);
 | |
|       
 | |
|       // Process as bounce if enabled and we have a bounce handler
 | |
|       if (this.options.processBounces && this.options.bounceHandler) {
 | |
|         try {
 | |
|           const email = item.processingResult as Email;
 | |
|           
 | |
|           // Extract recipient and error message
 | |
|           // For multiple recipients, we'd need more sophisticated parsing
 | |
|           const recipient = email.to.length > 0 ? email.to[0] : '';
 | |
|           
 | |
|           if (recipient) {
 | |
|             logger.log('info', `Processing delivery failure as bounce for recipient ${recipient}`);
 | |
|             
 | |
|             // Process SMTP failure through bounce handler
 | |
|             await this.options.bounceHandler.processSmtpFailure(
 | |
|               recipient,
 | |
|               error.message,
 | |
|               {
 | |
|                 sender: email.from,
 | |
|                 originalEmailId: item.id,
 | |
|                 headers: email.headers
 | |
|               }
 | |
|             );
 | |
|             
 | |
|             logger.log('info', `Bounce record created for failed delivery to ${recipient}`);
 | |
|           }
 | |
|         } catch (bounceError) {
 | |
|           logger.log('error', `Failed to process bounce: ${bounceError.message}`);
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       // Emit delivery failed event
 | |
|       this.emit('deliveryFailed', item, error);
 | |
|       logger.log('error', `Item ${item.id} delivery failed: ${error.message}`);
 | |
|       
 | |
|       SecurityLogger.getInstance().logEvent({
 | |
|         level: SecurityLogLevel.ERROR,
 | |
|         type: SecurityEventType.EMAIL_DELIVERY,
 | |
|         message: 'Email delivery failed',
 | |
|         details: {
 | |
|           itemId: item.id,
 | |
|           mode: item.processingMode,
 | |
|           routeName: item.route?.name || 'unknown',
 | |
|           error: error.message,
 | |
|           deliveryTime
 | |
|         },
 | |
|         success: false
 | |
|       });
 | |
|     } finally {
 | |
|       // Remove from active deliveries
 | |
|       this.activeDeliveries.delete(item.id);
 | |
|       this.stats.activeDeliveries = this.activeDeliveries.size;
 | |
|       
 | |
|       // Update statistics
 | |
|       this.emit('statsUpdated', this.stats);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Default handler for forward mode delivery
 | |
|    * @param item Queue item
 | |
|    */
 | |
|   private async handleForwardDelivery(item: IQueueItem): Promise<any> {
 | |
|     logger.log('info', `Forward delivery for item ${item.id}`);
 | |
|     
 | |
|     const email = item.processingResult as Email;
 | |
|     const route = item.route;
 | |
|     
 | |
|     // Get target server information
 | |
|     const targetServer = route?.action.forward?.host;
 | |
|     const targetPort = route?.action.forward?.port || 25;
 | |
|     const useTls = false; // TLS configuration can be enhanced later
 | |
|     
 | |
|     if (!targetServer) {
 | |
|       throw new Error('No target server configured for forward mode');
 | |
|     }
 | |
|     
 | |
|     logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`);
 | |
|     
 | |
|     try {
 | |
|       // Get SMTP client from email server if available
 | |
|       if (!this.emailServer) {
 | |
|         // Fall back to raw socket implementation if no email server
 | |
|         logger.log('warn', 'No email server available, falling back to raw socket implementation');
 | |
|         return this.handleForwardDeliveryLegacy(item);
 | |
|       }
 | |
|       
 | |
|       // Get SMTP client from UnifiedEmailServer
 | |
|       const smtpClient = this.emailServer.getSmtpClient(targetServer, targetPort);
 | |
|       
 | |
|       // Apply DKIM signing if configured in the route
 | |
|       if (item.route?.action.options?.mtaOptions?.dkimSign) {
 | |
|         await this.applyDkimSigning(email, item.route.action.options.mtaOptions);
 | |
|       }
 | |
|       
 | |
|       // Send the email using SmtpClient
 | |
|       const result = await smtpClient.sendMail(email);
 | |
|       
 | |
|       if (result.success) {
 | |
|         logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`);
 | |
|         
 | |
|         return {
 | |
|           targetServer: targetServer,
 | |
|           targetPort: targetPort,
 | |
|           recipients: result.acceptedRecipients.length,
 | |
|           messageId: result.messageId,
 | |
|           rejectedRecipients: result.rejectedRecipients
 | |
|         };
 | |
|       } else {
 | |
|         throw new Error(result.error?.message || 'Failed to forward email');
 | |
|       }
 | |
|     } catch (error: any) {
 | |
|       logger.log('error', `Failed to forward email: ${error.message}`);
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Legacy forward delivery using raw sockets (fallback)
 | |
|    * @param item Queue item
 | |
|    */
 | |
|   private async handleForwardDeliveryLegacy(item: IQueueItem): Promise<any> {
 | |
|     const email = item.processingResult as Email;
 | |
|     const route = item.route;
 | |
|     
 | |
|     // Get target server information
 | |
|     const targetServer = route?.action.forward?.host;
 | |
|     const targetPort = route?.action.forward?.port || 25;
 | |
|     const useTls = false; // TLS configuration can be enhanced later
 | |
|     
 | |
|     if (!targetServer) {
 | |
|       throw new Error('No target server configured for forward mode');
 | |
|     }
 | |
|     
 | |
|     // Create a socket connection to the target server
 | |
|     const socket = new net.Socket();
 | |
|     
 | |
|     // Set timeout
 | |
|     socket.setTimeout(this.options.socketTimeout);
 | |
|     
 | |
|     try {
 | |
|       // Connect to the target server
 | |
|       await new Promise<void>((resolve, reject) => {
 | |
|         // Handle connection events
 | |
|         socket.on('connect', () => {
 | |
|           logger.log('debug', `Connected to ${targetServer}:${targetPort}`);
 | |
|           resolve();
 | |
|         });
 | |
|         
 | |
|         socket.on('timeout', () => {
 | |
|           reject(new Error(`Connection timeout to ${targetServer}:${targetPort}`));
 | |
|         });
 | |
|         
 | |
|         socket.on('error', (err) => {
 | |
|           reject(new Error(`Connection error to ${targetServer}:${targetPort}: ${err.message}`));
 | |
|         });
 | |
|         
 | |
|         // Connect to the server
 | |
|         socket.connect({
 | |
|           host: targetServer,
 | |
|           port: targetPort
 | |
|         });
 | |
|       });
 | |
|       
 | |
|       // Send EHLO
 | |
|       await this.smtpCommand(socket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`);
 | |
|       
 | |
|       // Start TLS if required
 | |
|       if (useTls) {
 | |
|         await this.smtpCommand(socket, 'STARTTLS');
 | |
|         
 | |
|         // Upgrade to TLS
 | |
|         const tlsSocket = await this.upgradeTls(socket, targetServer);
 | |
|         
 | |
|         // Send EHLO again after STARTTLS
 | |
|         await this.smtpCommand(tlsSocket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`);
 | |
|         
 | |
|         // Use tlsSocket for remaining commands
 | |
|         return this.completeSMTPExchange(tlsSocket, email, route);
 | |
|       }
 | |
|       
 | |
|       // Complete the SMTP exchange
 | |
|       return this.completeSMTPExchange(socket, email, route);
 | |
|     } catch (error: any) {
 | |
|       logger.log('error', `Failed to forward email: ${error.message}`);
 | |
|       
 | |
|       // Close the connection
 | |
|       socket.destroy();
 | |
|       
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Complete the SMTP exchange after connection and initial setup
 | |
|    * @param socket Network socket
 | |
|    * @param email Email to send
 | |
|    * @param rule Domain rule
 | |
|    */
 | |
|   private async completeSMTPExchange(socket: net.Socket | tls.TLSSocket, email: Email, route: any): Promise<any> {
 | |
|     try {
 | |
|       // Authenticate if credentials provided
 | |
|       if (route?.action?.forward?.auth?.user && route?.action?.forward?.auth?.pass) {
 | |
|         // Send AUTH LOGIN
 | |
|         await this.smtpCommand(socket, 'AUTH LOGIN');
 | |
|         
 | |
|         // Send username (base64)
 | |
|         const username = Buffer.from(route.action.forward.auth.user).toString('base64');
 | |
|         await this.smtpCommand(socket, username);
 | |
|         
 | |
|         // Send password (base64)
 | |
|         const password = Buffer.from(route.action.forward.auth.pass).toString('base64');
 | |
|         await this.smtpCommand(socket, password);
 | |
|       }
 | |
|       
 | |
|       // Send MAIL FROM
 | |
|       await this.smtpCommand(socket, `MAIL FROM:<${email.from}>`);
 | |
|       
 | |
|       // Send RCPT TO for each recipient
 | |
|       for (const recipient of email.getAllRecipients()) {
 | |
|         await this.smtpCommand(socket, `RCPT TO:<${recipient}>`);
 | |
|       }
 | |
|       
 | |
|       // Send DATA
 | |
|       await this.smtpCommand(socket, 'DATA');
 | |
|       
 | |
|       // Send email content (simplified)
 | |
|       const emailContent = await this.getFormattedEmail(email);
 | |
|       await this.smtpData(socket, emailContent);
 | |
|       
 | |
|       // Send QUIT
 | |
|       await this.smtpCommand(socket, 'QUIT');
 | |
|       
 | |
|       // Close the connection
 | |
|       socket.end();
 | |
|       
 | |
|       logger.log('info', `Email forwarded successfully to ${route?.action?.forward?.host}:${route?.action?.forward?.port || 25}`);
 | |
|       
 | |
|       return {
 | |
|         targetServer: route?.action?.forward?.host,
 | |
|         targetPort: route?.action?.forward?.port || 25,
 | |
|         recipients: email.getAllRecipients().length
 | |
|       };
 | |
|     } catch (error: any) {
 | |
|       logger.log('error', `Failed to forward email: ${error.message}`);
 | |
|       
 | |
|       // Close the connection
 | |
|       socket.destroy();
 | |
|       
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Default handler for MTA mode delivery
 | |
|    * @param item Queue item
 | |
|    */
 | |
|   private async handleMtaDelivery(item: IQueueItem): Promise<any> {
 | |
|     logger.log('info', `MTA delivery for item ${item.id}`);
 | |
|     
 | |
|     const email = item.processingResult as Email;
 | |
|     const route = item.route;
 | |
|     
 | |
|     try {
 | |
|       // Apply DKIM signing if configured in the route
 | |
|       if (item.route?.action.options?.mtaOptions?.dkimSign) {
 | |
|         await this.applyDkimSigning(email, item.route.action.options.mtaOptions);
 | |
|       }
 | |
|       
 | |
|       // In a full implementation, this would use the MTA service
 | |
|       // For now, we'll simulate a successful delivery
 | |
|       
 | |
|       logger.log('info', `Email processed by MTA: ${email.subject} to ${email.getAllRecipients().join(', ')}`);
 | |
|       
 | |
|       // Note: The MTA implementation would handle actual local delivery
 | |
|       
 | |
|       // Simulate successful delivery
 | |
|       return {
 | |
|         recipients: email.getAllRecipients().length,
 | |
|         subject: email.subject,
 | |
|         dkimSigned: !!item.route?.action.options?.mtaOptions?.dkimSign
 | |
|       };
 | |
|     } catch (error: any) {
 | |
|       logger.log('error', `Failed to process email in MTA mode: ${error.message}`);
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Default handler for process mode delivery
 | |
|    * @param item Queue item
 | |
|    */
 | |
|   private async handleProcessDelivery(item: IQueueItem): Promise<any> {
 | |
|     logger.log('info', `Process delivery for item ${item.id}`);
 | |
|     
 | |
|     const email = item.processingResult as Email;
 | |
|     const route = item.route;
 | |
|     
 | |
|     try {
 | |
|       // Apply content scanning if enabled
 | |
|       if (route?.action.options?.contentScanning && route?.action.options?.scanners && route.action.options.scanners.length > 0) {
 | |
|         logger.log('info', 'Performing content scanning');
 | |
|         
 | |
|         // Apply each scanner
 | |
|         for (const scanner of route.action.options.scanners) {
 | |
|           switch (scanner.type) {
 | |
|             case 'spam':
 | |
|               logger.log('info', 'Scanning for spam content');
 | |
|               // Implement spam scanning
 | |
|               break;
 | |
|               
 | |
|             case 'virus':
 | |
|               logger.log('info', 'Scanning for virus content');
 | |
|               // Implement virus scanning
 | |
|               break;
 | |
|               
 | |
|             case 'attachment':
 | |
|               logger.log('info', 'Scanning attachments');
 | |
|               
 | |
|               // Check for blocked extensions
 | |
|               if (scanner.blockedExtensions && scanner.blockedExtensions.length > 0) {
 | |
|                 for (const attachment of email.attachments) {
 | |
|                   const ext = this.getFileExtension(attachment.filename);
 | |
|                   if (scanner.blockedExtensions.includes(ext)) {
 | |
|                     if (scanner.action === 'reject') {
 | |
|                       throw new Error(`Blocked attachment type: ${ext}`);
 | |
|                     } else { // tag
 | |
|                       email.addHeader('X-Attachment-Warning', `Potentially unsafe attachment: ${attachment.filename}`);
 | |
|                     }
 | |
|                   }
 | |
|                 }
 | |
|               }
 | |
|               break;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       // Apply transformations if defined
 | |
|       if (route?.action.options?.transformations && route?.action.options?.transformations.length > 0) {
 | |
|         logger.log('info', 'Applying email transformations');
 | |
|         
 | |
|         for (const transform of route.action.options.transformations) {
 | |
|           switch (transform.type) {
 | |
|             case 'addHeader':
 | |
|               if (transform.header && transform.value) {
 | |
|                 email.addHeader(transform.header, transform.value);
 | |
|               }
 | |
|               break;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       // Apply DKIM signing if configured (after all transformations)
 | |
|       if (item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim) {
 | |
|         await this.applyDkimSigning(email, item.route.action.options?.mtaOptions || {});
 | |
|       }
 | |
|       
 | |
|       logger.log('info', `Email successfully processed in store-and-forward mode`);
 | |
|       
 | |
|       // Simulate successful delivery
 | |
|       return {
 | |
|         recipients: email.getAllRecipients().length,
 | |
|         subject: email.subject,
 | |
|         scanned: !!route?.action.options?.contentScanning,
 | |
|         transformed: !!(route?.action.options?.transformations && route?.action.options?.transformations.length > 0),
 | |
|         dkimSigned: !!(item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim)
 | |
|       };
 | |
|     } catch (error: any) {
 | |
|       logger.log('error', `Failed to process email: ${error.message}`);
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Get file extension from filename
 | |
|    */
 | |
|   private getFileExtension(filename: string): string {
 | |
|     return filename.substring(filename.lastIndexOf('.')).toLowerCase();
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Apply DKIM signing to an email
 | |
|    */
 | |
|   private async applyDkimSigning(email: Email, mtaOptions: any): Promise<void> {
 | |
|     if (!this.emailServer) {
 | |
|       logger.log('warn', 'Cannot apply DKIM signing without email server reference');
 | |
|       return;
 | |
|     }
 | |
|     
 | |
|     const domainName = mtaOptions.dkimOptions?.domainName || email.from.split('@')[1];
 | |
|     const keySelector = mtaOptions.dkimOptions?.keySelector || 'default';
 | |
|     
 | |
|     try {
 | |
|       // Ensure DKIM keys exist for the domain
 | |
|       await this.emailServer.dkimCreator.handleDKIMKeysForDomain(domainName);
 | |
|       
 | |
|       // Convert Email to raw format for signing
 | |
|       const rawEmail = email.toRFC822String();
 | |
|       
 | |
|       // Sign the email
 | |
|       const signResult = await plugins.dkimSign(rawEmail, {
 | |
|         canonicalization: 'relaxed/relaxed',
 | |
|         algorithm: 'rsa-sha256',
 | |
|         signTime: new Date(),
 | |
|         signatureData: [
 | |
|           {
 | |
|             signingDomain: domainName,
 | |
|             selector: keySelector,
 | |
|             privateKey: (await this.emailServer.dkimCreator.readDKIMKeys(domainName)).privateKey,
 | |
|             algorithm: 'rsa-sha256',
 | |
|             canonicalization: 'relaxed/relaxed'
 | |
|           }
 | |
|         ]
 | |
|       });
 | |
|       
 | |
|       // Add the DKIM-Signature header to the email
 | |
|       if (signResult.signatures) {
 | |
|         email.addHeader('DKIM-Signature', signResult.signatures);
 | |
|         logger.log('info', `Successfully added DKIM signature for ${domainName}`);
 | |
|       }
 | |
|     } catch (error) {
 | |
|       logger.log('error', `Failed to apply DKIM signature: ${error.message}`);
 | |
|       // Don't throw - allow email to be sent without DKIM if signing fails
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Format email for SMTP transmission
 | |
|    * @param email Email to format
 | |
|    */
 | |
|   private async getFormattedEmail(email: Email): Promise<string> {
 | |
|     // This is a simplified implementation
 | |
|     // In a full implementation, this would use proper MIME formatting
 | |
|     
 | |
|     let content = '';
 | |
|     
 | |
|     // Add headers
 | |
|     content += `From: ${email.from}\r\n`;
 | |
|     content += `To: ${email.to.join(', ')}\r\n`;
 | |
|     content += `Subject: ${email.subject}\r\n`;
 | |
|     
 | |
|     // Add additional headers
 | |
|     for (const [name, value] of Object.entries(email.headers || {})) {
 | |
|       content += `${name}: ${value}\r\n`;
 | |
|     }
 | |
|     
 | |
|     // Add content type for multipart
 | |
|     if (email.attachments && email.attachments.length > 0) {
 | |
|       const boundary = `----_=_NextPart_${Math.random().toString(36).substr(2)}`;
 | |
|       content += `MIME-Version: 1.0\r\n`;
 | |
|       content += `Content-Type: multipart/mixed; boundary="${boundary}"\r\n`;
 | |
|       content += `\r\n`;
 | |
|       
 | |
|       // Add text part
 | |
|       content += `--${boundary}\r\n`;
 | |
|       content += `Content-Type: text/plain; charset="UTF-8"\r\n`;
 | |
|       content += `\r\n`;
 | |
|       content += `${email.text}\r\n`;
 | |
|       
 | |
|       // Add HTML part if present
 | |
|       if (email.html) {
 | |
|         content += `--${boundary}\r\n`;
 | |
|         content += `Content-Type: text/html; charset="UTF-8"\r\n`;
 | |
|         content += `\r\n`;
 | |
|         content += `${email.html}\r\n`;
 | |
|       }
 | |
|       
 | |
|       // Add attachments
 | |
|       for (const attachment of email.attachments) {
 | |
|         content += `--${boundary}\r\n`;
 | |
|         content += `Content-Type: ${attachment.contentType || 'application/octet-stream'}; name="${attachment.filename}"\r\n`;
 | |
|         content += `Content-Disposition: attachment; filename="${attachment.filename}"\r\n`;
 | |
|         content += `Content-Transfer-Encoding: base64\r\n`;
 | |
|         content += `\r\n`;
 | |
|         
 | |
|         // Add base64 encoded content
 | |
|         const base64Content = attachment.content.toString('base64');
 | |
|         
 | |
|         // Split into lines of 76 characters
 | |
|         for (let i = 0; i < base64Content.length; i += 76) {
 | |
|           content += base64Content.substring(i, i + 76) + '\r\n';
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       // End boundary
 | |
|       content += `--${boundary}--\r\n`;
 | |
|     } else {
 | |
|       // Simple email with just text
 | |
|       content += `Content-Type: text/plain; charset="UTF-8"\r\n`;
 | |
|       content += `\r\n`;
 | |
|       content += `${email.text}\r\n`;
 | |
|     }
 | |
|     
 | |
|     return content;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Send SMTP command and wait for response
 | |
|    * @param socket Socket connection
 | |
|    * @param command SMTP command to send
 | |
|    */
 | |
|   private async smtpCommand(socket: net.Socket, command: string): Promise<string> {
 | |
|     return new Promise<string>((resolve, reject) => {
 | |
|       const onData = (data: Buffer) => {
 | |
|         const response = data.toString().trim();
 | |
|         
 | |
|         // Clean up listeners
 | |
|         socket.removeListener('data', onData);
 | |
|         socket.removeListener('error', onError);
 | |
|         socket.removeListener('timeout', onTimeout);
 | |
|         
 | |
|         // Check response code
 | |
|         if (response.charAt(0) === '2' || response.charAt(0) === '3') {
 | |
|           resolve(response);
 | |
|         } else {
 | |
|           reject(new Error(`SMTP error: ${response}`));
 | |
|         }
 | |
|       };
 | |
|       
 | |
|       const onError = (err: Error) => {
 | |
|         // Clean up listeners
 | |
|         socket.removeListener('data', onData);
 | |
|         socket.removeListener('error', onError);
 | |
|         socket.removeListener('timeout', onTimeout);
 | |
|         
 | |
|         reject(err);
 | |
|       };
 | |
|       
 | |
|       const onTimeout = () => {
 | |
|         // Clean up listeners
 | |
|         socket.removeListener('data', onData);
 | |
|         socket.removeListener('error', onError);
 | |
|         socket.removeListener('timeout', onTimeout);
 | |
|         
 | |
|         reject(new Error('SMTP command timeout'));
 | |
|       };
 | |
|       
 | |
|       // Set up listeners
 | |
|       socket.once('data', onData);
 | |
|       socket.once('error', onError);
 | |
|       socket.once('timeout', onTimeout);
 | |
|       
 | |
|       // Send command
 | |
|       socket.write(command + '\r\n');
 | |
|     });
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Send SMTP DATA command with content
 | |
|    * @param socket Socket connection
 | |
|    * @param data Email content to send
 | |
|    */
 | |
|   private async smtpData(socket: net.Socket, data: string): Promise<string> {
 | |
|     return new Promise<string>((resolve, reject) => {
 | |
|       const onData = (responseData: Buffer) => {
 | |
|         const response = responseData.toString().trim();
 | |
|         
 | |
|         // Clean up listeners
 | |
|         socket.removeListener('data', onData);
 | |
|         socket.removeListener('error', onError);
 | |
|         socket.removeListener('timeout', onTimeout);
 | |
|         
 | |
|         // Check response code
 | |
|         if (response.charAt(0) === '2') {
 | |
|           resolve(response);
 | |
|         } else {
 | |
|           reject(new Error(`SMTP error: ${response}`));
 | |
|         }
 | |
|       };
 | |
|       
 | |
|       const onError = (err: Error) => {
 | |
|         // Clean up listeners
 | |
|         socket.removeListener('data', onData);
 | |
|         socket.removeListener('error', onError);
 | |
|         socket.removeListener('timeout', onTimeout);
 | |
|         
 | |
|         reject(err);
 | |
|       };
 | |
|       
 | |
|       const onTimeout = () => {
 | |
|         // Clean up listeners
 | |
|         socket.removeListener('data', onData);
 | |
|         socket.removeListener('error', onError);
 | |
|         socket.removeListener('timeout', onTimeout);
 | |
|         
 | |
|         reject(new Error('SMTP data timeout'));
 | |
|       };
 | |
|       
 | |
|       // Set up listeners
 | |
|       socket.once('data', onData);
 | |
|       socket.once('error', onError);
 | |
|       socket.once('timeout', onTimeout);
 | |
|       
 | |
|       // Send data and end with CRLF.CRLF
 | |
|       socket.write(data + '\r\n.\r\n');
 | |
|     });
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Upgrade socket to TLS
 | |
|    * @param socket Socket connection
 | |
|    * @param hostname Target hostname for TLS
 | |
|    */
 | |
|   private async upgradeTls(socket: net.Socket, hostname: string): Promise<tls.TLSSocket> {
 | |
|     return new Promise<tls.TLSSocket>((resolve, reject) => {
 | |
|       const tlsOptions: tls.ConnectionOptions = {
 | |
|         socket,
 | |
|         servername: hostname,
 | |
|         rejectUnauthorized: this.options.verifyCertificates,
 | |
|         minVersion: this.options.tlsMinVersion as tls.SecureVersion
 | |
|       };
 | |
|       
 | |
|       const tlsSocket = tls.connect(tlsOptions);
 | |
|       
 | |
|       tlsSocket.once('secureConnect', () => {
 | |
|         resolve(tlsSocket);
 | |
|       });
 | |
|       
 | |
|       tlsSocket.once('error', (err) => {
 | |
|         reject(new Error(`TLS error: ${err.message}`));
 | |
|       });
 | |
|       
 | |
|       tlsSocket.setTimeout(this.options.socketTimeout);
 | |
|       
 | |
|       tlsSocket.once('timeout', () => {
 | |
|         reject(new Error('TLS connection timeout'));
 | |
|       });
 | |
|     });
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Update delivery time statistics
 | |
|    */
 | |
|   private updateDeliveryTimeStats(): void {
 | |
|     if (this.deliveryTimes.length === 0) return;
 | |
|     
 | |
|     // Keep only the last 1000 delivery times
 | |
|     if (this.deliveryTimes.length > 1000) {
 | |
|       this.deliveryTimes = this.deliveryTimes.slice(-1000);
 | |
|     }
 | |
|     
 | |
|     // Calculate average
 | |
|     const sum = this.deliveryTimes.reduce((acc, time) => acc + time, 0);
 | |
|     this.stats.avgDeliveryTime = sum / this.deliveryTimes.length;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Check if rate limit is exceeded
 | |
|    * @returns True if rate limited, false otherwise
 | |
|    */
 | |
|   private checkRateLimit(): boolean {
 | |
|     const now = Date.now();
 | |
|     const elapsed = now - this.rateLimitLastCheck;
 | |
|     
 | |
|     // Reset counter if more than a minute has passed
 | |
|     if (elapsed >= 60000) {
 | |
|       this.rateLimitLastCheck = now;
 | |
|       this.rateLimitCounter = 0;
 | |
|       this.throttled = false;
 | |
|       this.stats.rateLimiting.currentRate = 0;
 | |
|       return false;
 | |
|     }
 | |
|     
 | |
|     // Check if we're already throttled
 | |
|     if (this.throttled) {
 | |
|       return true;
 | |
|     }
 | |
|     
 | |
|     // Increment counter
 | |
|     this.rateLimitCounter++;
 | |
|     
 | |
|     // Calculate current rate (emails per minute)
 | |
|     const rate = (this.rateLimitCounter / elapsed) * 60000;
 | |
|     this.stats.rateLimiting.currentRate = rate;
 | |
|     
 | |
|     // Check if rate limit is exceeded
 | |
|     if (rate > this.options.globalRateLimit) {
 | |
|       this.throttled = true;
 | |
|       this.stats.rateLimiting.throttled++;
 | |
|       
 | |
|       // Schedule throttle reset
 | |
|       const resetDelay = 60000 - elapsed;
 | |
|       setTimeout(() => {
 | |
|         this.throttled = false;
 | |
|         this.rateLimitLastCheck = Date.now();
 | |
|         this.rateLimitCounter = 0;
 | |
|         this.stats.rateLimiting.currentRate = 0;
 | |
|       }, resetDelay);
 | |
|       
 | |
|       return true;
 | |
|     }
 | |
|     
 | |
|     return false;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Update delivery options
 | |
|    * @param options New options
 | |
|    */
 | |
|   public updateOptions(options: Partial<IMultiModeDeliveryOptions>): void {
 | |
|     this.options = {
 | |
|       ...this.options,
 | |
|       ...options
 | |
|     };
 | |
|     
 | |
|     // Update rate limit statistics
 | |
|     if (options.globalRateLimit) {
 | |
|       this.stats.rateLimiting.globalLimit = options.globalRateLimit;
 | |
|     }
 | |
|     
 | |
|     logger.log('info', 'MultiModeDeliverySystem options updated');
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Get delivery statistics
 | |
|    */
 | |
|   public getStats(): IDeliveryStats {
 | |
|     return { ...this.stats };
 | |
|   }
 | |
| } |