import * as plugins from '../../plugins.js'; import * as paths from '../../paths.js'; import { EventEmitter } from 'events'; import { logger } from '../../logger.js'; import { SecurityLogger, SecurityLogLevel, SecurityEventType } from '../../security/index.js'; import { DomainRouter } from './classes.domain.router.js'; import type { IEmailConfig, EmailProcessingMode, IDomainRule } from './classes.email.config.js'; import { Email } from '../core/classes.email.js'; import * as net from 'node:net'; import * as tls from 'node:tls'; import * as stream from 'node:stream'; import { SMTPServer as MtaSmtpServer } from '../delivery/classes.smtpserver.js'; /** * Options for the unified email server */ export interface IUnifiedEmailServerOptions { // Base server options ports: number[]; hostname: string; banner?: string; // Authentication options auth?: { required?: boolean; methods?: ('PLAIN' | 'LOGIN' | 'OAUTH2')[]; users?: Array<{username: string, password: string}>; }; // TLS options tls?: { certPath?: string; keyPath?: string; caPath?: string; minVersion?: string; ciphers?: string; }; // Limits maxMessageSize?: number; maxClients?: number; maxConnections?: number; // Connection options connectionTimeout?: number; socketTimeout?: number; // Domain rules domainRules: IDomainRule[]; // Default handling for unmatched domains defaultMode: EmailProcessingMode; defaultServer?: string; defaultPort?: number; defaultTls?: boolean; } /** * Interface describing SMTP session data */ export interface ISmtpSession { id: string; remoteAddress: string; clientHostname: string; secure: boolean; authenticated: boolean; user?: { username: string; [key: string]: any; }; envelope: { mailFrom: { address: string; args: any; }; rcptTo: Array<{ address: string; args: any; }>; }; processingMode?: EmailProcessingMode; matchedRule?: IDomainRule; } /** * Authentication data for SMTP */ export interface IAuthData { method: string; username: string; password: string; } /** * Server statistics */ export interface IServerStats { startTime: Date; connections: { current: number; total: number; }; messages: { processed: number; delivered: number; failed: number; }; processingTime: { avg: number; max: number; min: number; }; } /** * Unified email server that handles all email traffic with pattern-based routing */ export class UnifiedEmailServer extends EventEmitter { private options: IUnifiedEmailServerOptions; private domainRouter: DomainRouter; private servers: MtaSmtpServer[] = []; private stats: IServerStats; private processingTimes: number[] = []; constructor(options: IUnifiedEmailServerOptions) { super(); // Set default options this.options = { ...options, banner: options.banner || `${options.hostname} ESMTP UnifiedEmailServer`, maxMessageSize: options.maxMessageSize || 10 * 1024 * 1024, // 10MB maxClients: options.maxClients || 100, maxConnections: options.maxConnections || 1000, connectionTimeout: options.connectionTimeout || 60000, // 1 minute socketTimeout: options.socketTimeout || 60000 // 1 minute }; // Initialize domain router for pattern matching this.domainRouter = new DomainRouter({ domainRules: options.domainRules, defaultMode: options.defaultMode, defaultServer: options.defaultServer, defaultPort: options.defaultPort, defaultTls: options.defaultTls, enableCache: true, cacheSize: 1000 }); // Initialize statistics this.stats = { startTime: new Date(), connections: { current: 0, total: 0 }, messages: { processed: 0, delivered: 0, failed: 0 }, processingTime: { avg: 0, max: 0, min: 0 } }; // We'll create the SMTP servers during the start() method } /** * Start the unified email server */ public async start(): Promise { logger.log('info', `Starting UnifiedEmailServer on ports: ${(this.options.ports as number[]).join(', ')}`); try { // Ensure we have the necessary TLS options const hasTlsConfig = this.options.tls?.keyPath && this.options.tls?.certPath; // Prepare the certificate and key if available let key: string | undefined; let cert: string | undefined; if (hasTlsConfig) { try { key = plugins.fs.readFileSync(this.options.tls.keyPath!, 'utf8'); cert = plugins.fs.readFileSync(this.options.tls.certPath!, 'utf8'); logger.log('info', 'TLS certificates loaded successfully'); } catch (error) { logger.log('warn', `Failed to load TLS certificates: ${error.message}`); } } // Create a SMTP server for each port for (const port of this.options.ports as number[]) { // Create a reference object to hold the MTA service during setup const mtaRef = { config: { smtp: { hostname: this.options.hostname }, security: { checkIPReputation: false, verifyDkim: true, verifySpf: true, verifyDmarc: true } }, // These will be implemented in the real integration: dkimVerifier: { verify: async () => ({ isValid: true, domain: '' }) }, spfVerifier: { verifyAndApply: async () => true }, dmarcVerifier: { verify: async () => ({}), applyPolicy: () => true }, processIncomingEmail: async (email: Email) => { // This is where we'll process the email based on domain routing const to = email.to[0]; // Email.to is an array, take the first recipient const rule = this.domainRouter.matchRule(to); const mode = rule?.mode || this.options.defaultMode; // Process based on the mode await this.processEmailByMode(email, { id: 'session-' + Math.random().toString(36).substring(2), remoteAddress: '127.0.0.1', clientHostname: '', secure: false, authenticated: false, envelope: { mailFrom: { address: email.from, args: {} }, rcptTo: email.to.map(recipient => ({ address: recipient, args: {} })) }, processingMode: mode, matchedRule: rule }, mode); return true; } }; // Create server options const serverOptions = { port, hostname: this.options.hostname, key, cert }; // Create and start the SMTP server const smtpServer = new MtaSmtpServer(mtaRef as any, serverOptions); this.servers.push(smtpServer); // Start the server await new Promise((resolve, reject) => { try { smtpServer.start(); logger.log('info', `UnifiedEmailServer listening on port ${port}`); // Set up event handlers (smtpServer as any).server.on('error', (err: Error) => { logger.log('error', `SMTP server error on port ${port}: ${err.message}`); this.emit('error', err); }); resolve(); } catch (err) { if ((err as any).code === 'EADDRINUSE') { logger.log('error', `Port ${port} is already in use`); reject(new Error(`Port ${port} is already in use`)); } else { logger.log('error', `Error starting server on port ${port}: ${err.message}`); reject(err); } } }); } logger.log('info', 'UnifiedEmailServer started successfully'); this.emit('started'); } catch (error) { logger.log('error', `Failed to start UnifiedEmailServer: ${error.message}`); throw error; } } /** * Stop the unified email server */ public async stop(): Promise { logger.log('info', 'Stopping UnifiedEmailServer'); try { // Stop all SMTP servers for (const server of this.servers) { server.stop(); } // Clear the servers array this.servers = []; logger.log('info', 'UnifiedEmailServer stopped successfully'); this.emit('stopped'); } catch (error) { logger.log('error', `Error stopping UnifiedEmailServer: ${error.message}`); throw error; } } /** * Handle new SMTP connection (stub implementation) */ private onConnect(session: ISmtpSession, callback: (err?: Error) => void): void { logger.log('info', `New connection from ${session.remoteAddress}`); // Update connection statistics this.stats.connections.current++; this.stats.connections.total++; // Log connection event SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.CONNECTION, message: 'New SMTP connection established', ipAddress: session.remoteAddress, details: { sessionId: session.id, secure: session.secure } }); // Optional IP reputation check would go here // Continue with the connection callback(); } /** * Handle authentication (stub implementation) */ private onAuth(auth: IAuthData, session: ISmtpSession, callback: (err?: Error, user?: any) => void): void { if (!this.options.auth || !this.options.auth.users || this.options.auth.users.length === 0) { // No authentication configured, reject const error = new Error('Authentication not supported'); logger.log('warn', `Authentication attempt when not configured: ${auth.username}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.WARN, type: SecurityEventType.AUTHENTICATION, message: 'Authentication attempt when not configured', ipAddress: session.remoteAddress, details: { username: auth.username, method: auth.method, sessionId: session.id }, success: false }); return callback(error); } // Find matching user const user = this.options.auth.users.find(u => u.username === auth.username && u.password === auth.password); if (user) { logger.log('info', `User ${auth.username} authenticated successfully`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.AUTHENTICATION, message: 'SMTP authentication successful', ipAddress: session.remoteAddress, details: { username: auth.username, method: auth.method, sessionId: session.id }, success: true }); return callback(null, { username: user.username }); } else { const error = new Error('Invalid username or password'); logger.log('warn', `Failed authentication for ${auth.username}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.WARN, type: SecurityEventType.AUTHENTICATION, message: 'SMTP authentication failed', ipAddress: session.remoteAddress, details: { username: auth.username, method: auth.method, sessionId: session.id }, success: false }); return callback(error); } } /** * Handle MAIL FROM command (stub implementation) */ private onMailFrom(address: {address: string}, session: ISmtpSession, callback: (err?: Error) => void): void { logger.log('info', `MAIL FROM: ${address.address}`); // Validate the email address if (!this.isValidEmail(address.address)) { const error = new Error('Invalid sender address'); logger.log('warn', `Invalid sender address: ${address.address}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.WARN, type: SecurityEventType.EMAIL_VALIDATION, message: 'Invalid sender email format', ipAddress: session.remoteAddress, details: { address: address.address, sessionId: session.id }, success: false }); return callback(error); } // Authentication check if required if (this.options.auth?.required && !session.authenticated) { const error = new Error('Authentication required'); logger.log('warn', `Unauthenticated sender rejected: ${address.address}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.WARN, type: SecurityEventType.AUTHENTICATION, message: 'Unauthenticated sender rejected', ipAddress: session.remoteAddress, details: { address: address.address, sessionId: session.id }, success: false }); return callback(error); } // Continue processing callback(); } /** * Handle RCPT TO command (stub implementation) */ private onRcptTo(address: {address: string}, session: ISmtpSession, callback: (err?: Error) => void): void { logger.log('info', `RCPT TO: ${address.address}`); // Validate the email address if (!this.isValidEmail(address.address)) { const error = new Error('Invalid recipient address'); logger.log('warn', `Invalid recipient address: ${address.address}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.WARN, type: SecurityEventType.EMAIL_VALIDATION, message: 'Invalid recipient email format', ipAddress: session.remoteAddress, details: { address: address.address, sessionId: session.id }, success: false }); return callback(error); } // Pattern match the recipient to determine processing mode const rule = this.domainRouter.matchRule(address.address); if (rule) { // Store the matched rule and processing mode in the session session.matchedRule = rule; session.processingMode = rule.mode; logger.log('info', `Email ${address.address} matched rule: ${rule.pattern}, mode: ${rule.mode}`); } else { // Use default mode session.processingMode = this.options.defaultMode; logger.log('info', `Email ${address.address} using default mode: ${this.options.defaultMode}`); } // Continue processing callback(); } /** * Handle incoming email data (stub implementation) */ private onData(stream: stream.Readable, session: ISmtpSession, callback: (err?: Error) => void): void { logger.log('info', `Processing email data for session ${session.id}`); const startTime = Date.now(); const chunks: Buffer[] = []; stream.on('data', (chunk: Buffer) => { chunks.push(chunk); }); stream.on('end', async () => { try { const data = Buffer.concat(chunks); const mode = session.processingMode || this.options.defaultMode; // Determine processing mode based on matched rule const processedEmail = await this.processEmailByMode(data, session, mode); // Update statistics this.stats.messages.processed++; this.stats.messages.delivered++; // Calculate processing time const processingTime = Date.now() - startTime; this.processingTimes.push(processingTime); this.updateProcessingTimeStats(); // Emit event for delivery queue this.emit('emailProcessed', processedEmail, mode, session.matchedRule); logger.log('info', `Email processed successfully in ${processingTime}ms, mode: ${mode}`); callback(); } catch (error) { logger.log('error', `Error processing email: ${error.message}`); // Update statistics this.stats.messages.processed++; this.stats.messages.failed++; // Calculate processing time for failed attempts too const processingTime = Date.now() - startTime; this.processingTimes.push(processingTime); this.updateProcessingTimeStats(); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_PROCESSING, message: 'Email processing failed', ipAddress: session.remoteAddress, details: { error: error.message, sessionId: session.id, mode: session.processingMode, processingTime }, success: false }); callback(error); } }); stream.on('error', (err) => { logger.log('error', `Stream error: ${err.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_PROCESSING, message: 'Email stream error', ipAddress: session.remoteAddress, details: { error: err.message, sessionId: session.id }, success: false }); callback(err); }); } /** * Update processing time statistics */ private updateProcessingTimeStats(): void { if (this.processingTimes.length === 0) return; // Keep only the last 1000 processing times if (this.processingTimes.length > 1000) { this.processingTimes = this.processingTimes.slice(-1000); } // Calculate stats const sum = this.processingTimes.reduce((acc, time) => acc + time, 0); const avg = sum / this.processingTimes.length; const max = Math.max(...this.processingTimes); const min = Math.min(...this.processingTimes); this.stats.processingTime = { avg, max, min }; } /** * Process email based on the determined mode */ private async processEmailByMode(emailData: Email | Buffer, session: ISmtpSession, mode: EmailProcessingMode): Promise { // Convert Buffer to Email if needed let email: Email; if (Buffer.isBuffer(emailData)) { // Parse the email data buffer into an Email object try { const parsed = await plugins.mailparser.simpleParser(emailData); email = new Email({ from: parsed.from?.value[0]?.address || session.envelope.mailFrom.address, to: session.envelope.rcptTo[0]?.address || '', subject: parsed.subject || '', text: parsed.text || '', html: parsed.html || undefined, attachments: parsed.attachments?.map(att => ({ filename: att.filename || '', content: att.content, contentType: att.contentType })) || [] }); } catch (error) { logger.log('error', `Error parsing email data: ${error.message}`); throw new Error(`Error parsing email data: ${error.message}`); } } else { email = emailData; } // Process based on mode switch (mode) { case 'forward': await this.handleForwardMode(email, session); break; case 'mta': await this.handleMtaMode(email, session); break; case 'process': await this.handleProcessMode(email, session); break; default: throw new Error(`Unknown processing mode: ${mode}`); } // Return the processed email return email; } /** * Handle email in forward mode (SMTP proxy) */ private async handleForwardMode(email: Email, session: ISmtpSession): Promise { logger.log('info', `Handling email in forward mode for session ${session.id}`); // Get target server information const rule = session.matchedRule; const targetServer = rule?.target?.server || this.options.defaultServer; const targetPort = rule?.target?.port || this.options.defaultPort || 25; const useTls = rule?.target?.useTls ?? this.options.defaultTls ?? false; if (!targetServer) { throw new Error('No target server configured for forward mode'); } logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`); try { // Create a simple SMTP client connection to the target server const client = new net.Socket(); await new Promise((resolve, reject) => { // Connect to the target server client.connect({ host: targetServer, port: targetPort }); client.on('data', (data) => { const response = data.toString().trim(); logger.log('debug', `SMTP response: ${response}`); // Handle SMTP response codes if (response.startsWith('2')) { // Success response resolve(); } else if (response.startsWith('5')) { // Permanent error reject(new Error(`SMTP error: ${response}`)); } }); client.on('error', (err) => { logger.log('error', `SMTP client error: ${err.message}`); reject(err); }); // SMTP client commands would go here in a full implementation // For now, just finish the connection client.end(); resolve(); }); logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_FORWARDING, message: 'Email forwarded', ipAddress: session.remoteAddress, details: { sessionId: session.id, targetServer, targetPort, useTls, ruleName: rule?.pattern || 'default', subject: email.subject }, success: true }); } catch (error) { logger.log('error', `Failed to forward email: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_FORWARDING, message: 'Email forwarding failed', ipAddress: session.remoteAddress, details: { sessionId: session.id, targetServer, targetPort, useTls, ruleName: rule?.pattern || 'default', error: error.message }, success: false }); throw error; } } /** * Handle email in MTA mode (programmatic processing) */ private async handleMtaMode(email: Email, session: ISmtpSession): Promise { logger.log('info', `Handling email in MTA mode for session ${session.id}`); try { // Apply MTA rule options if provided if (session.matchedRule?.mtaOptions) { const options = session.matchedRule.mtaOptions; // Apply DKIM signing if enabled if (options.dkimSign && options.dkimOptions) { // Sign the email with DKIM logger.log('info', `Signing email with DKIM for domain ${options.dkimOptions.domainName}`); // In a full implementation, this would use the DKIM signing library } } // Get email content for logging/processing const subject = email.subject; const recipients = email.getAllRecipients().join(', '); logger.log('info', `Email processed by MTA: ${subject} to ${recipients}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_PROCESSING, message: 'Email processed by MTA', ipAddress: session.remoteAddress, details: { sessionId: session.id, ruleName: session.matchedRule?.pattern || 'default', subject, recipients }, success: true }); } catch (error) { logger.log('error', `Failed to process email in MTA mode: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_PROCESSING, message: 'MTA processing failed', ipAddress: session.remoteAddress, details: { sessionId: session.id, ruleName: session.matchedRule?.pattern || 'default', error: error.message }, success: false }); throw error; } } /** * Handle email in process mode (store-and-forward with scanning) */ private async handleProcessMode(email: Email, session: ISmtpSession): Promise { logger.log('info', `Handling email in process mode for session ${session.id}`); try { const rule = session.matchedRule; // Apply content scanning if enabled if (rule?.contentScanning && rule.scanners && rule.scanners.length > 0) { logger.log('info', 'Performing content scanning'); // Apply each scanner for (const scanner of rule.scanners) { switch (scanner.type) { case 'spam': logger.log('info', 'Scanning for spam content'); // Implement spam scanning break; case 'virus': logger.log('info', 'Scanning for virus content'); // Implement virus scanning break; case 'attachment': logger.log('info', 'Scanning attachments'); // Check for blocked extensions if (scanner.blockedExtensions && scanner.blockedExtensions.length > 0) { for (const attachment of email.attachments) { const ext = this.getFileExtension(attachment.filename); if (scanner.blockedExtensions.includes(ext)) { if (scanner.action === 'reject') { throw new Error(`Blocked attachment type: ${ext}`); } else { // tag email.addHeader('X-Attachment-Warning', `Potentially unsafe attachment: ${attachment.filename}`); } } } } break; } } } // Apply transformations if defined if (rule?.transformations && rule.transformations.length > 0) { logger.log('info', 'Applying email transformations'); for (const transform of rule.transformations) { switch (transform.type) { case 'addHeader': if (transform.header && transform.value) { email.addHeader(transform.header, transform.value); } break; } } } logger.log('info', `Email successfully processed in store-and-forward mode`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_PROCESSING, message: 'Email processed and queued', ipAddress: session.remoteAddress, details: { sessionId: session.id, ruleName: rule?.pattern || 'default', contentScanning: rule?.contentScanning || false, subject: email.subject }, success: true }); } catch (error) { logger.log('error', `Failed to process email: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_PROCESSING, message: 'Email processing failed', ipAddress: session.remoteAddress, details: { sessionId: session.id, ruleName: session.matchedRule?.pattern || 'default', error: error.message }, success: false }); throw error; } } /** * Get file extension from filename */ private getFileExtension(filename: string): string { return filename.substring(filename.lastIndexOf('.')).toLowerCase(); } /** * Handle server errors */ private onError(err: Error): void { logger.log('error', `Server error: ${err.message}`); this.emit('error', err); } /** * Handle server close */ private onClose(): void { logger.log('info', 'Server closed'); this.emit('close'); // Update statistics this.stats.connections.current = 0; } /** * Update server configuration */ public updateOptions(options: Partial): void { // Stop the server if changing ports const portsChanged = options.ports && (!this.options.ports || JSON.stringify(options.ports) !== JSON.stringify(this.options.ports)); if (portsChanged) { this.stop().then(() => { this.options = { ...this.options, ...options }; this.start(); }); } else { // Update options without restart this.options = { ...this.options, ...options }; // Update domain router if rules changed if (options.domainRules) { this.domainRouter.updateRules(options.domainRules); } } } /** * Update domain rules */ public updateDomainRules(rules: IDomainRule[]): void { this.options.domainRules = rules; this.domainRouter.updateRules(rules); } /** * Get server statistics */ public getStats(): IServerStats { return { ...this.stats }; } /** * Validate email address format */ private isValidEmail(email: string): boolean { // Basic validation - a more comprehensive validation could be used const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; return emailRegex.test(email); } }