import * as plugins from '../../plugins.js'; import * as paths from '../../paths.js'; import { EventEmitter } from 'events'; import { logger } from '../../logger.js'; import { SecurityLogger, SecurityLogLevel, SecurityEventType } from '../../security/index.js'; import { DKIMCreator } from '../security/classes.dkimcreator.js'; import { IPReputationChecker } from '../../security/classes.ipreputationchecker.js'; import { RustSecurityBridge } from '../../security/classes.rustsecuritybridge.js'; import type { IEmailReceivedEvent, IAuthRequestEvent, IEmailData } from '../../security/classes.rustsecuritybridge.js'; import { EmailRouter } from './classes.email.router.js'; import type { IEmailRoute, IEmailAction, IEmailContext, IEmailDomainConfig } from './interfaces.js'; import { Email } from '../core/classes.email.js'; import { DomainRegistry } from './classes.domain.registry.js'; import { DnsManager } from './classes.dns.manager.js'; import { BounceManager, BounceType, BounceCategory } from '../core/classes.bouncemanager.js'; import type { ISmtpSendResult, IOutboundEmail } from '../../security/classes.rustsecuritybridge.js'; import { MultiModeDeliverySystem, type IMultiModeDeliveryOptions } from '../delivery/classes.delivery.system.js'; import { UnifiedDeliveryQueue, type IQueueOptions } from '../delivery/classes.delivery.queue.js'; import { UnifiedRateLimiter, type IHierarchicalRateLimits } from '../delivery/classes.unified.rate.limiter.js'; import { SmtpState } from '../delivery/interfaces.js'; import type { EmailProcessingMode, ISmtpSession as IBaseSmtpSession } from '../delivery/interfaces.js'; /** External DcRouter interface shape used by UnifiedEmailServer */ interface DcRouter { storageManager: any; dnsServer?: any; options?: any; } /** * Extended SMTP session interface with route information */ export interface IExtendedSmtpSession extends ISmtpSession { /** * Matched route for this session */ matchedRoute?: IEmailRoute; } /** * Options for the unified email server */ export interface IUnifiedEmailServerOptions { // Base server options ports: number[]; hostname: string; domains: IEmailDomainConfig[]; // Domain configurations banner?: string; debug?: boolean; useSocketHandler?: boolean; // Use socket-handler mode instead of port listening // Authentication options auth?: { required?: boolean; methods?: ('PLAIN' | 'LOGIN' | 'OAUTH2')[]; users?: Array<{username: string, password: string}>; }; // TLS options tls?: { certPath?: string; keyPath?: string; caPath?: string; minVersion?: string; ciphers?: string; }; // Limits maxMessageSize?: number; maxClients?: number; maxConnections?: number; // Connection options connectionTimeout?: number; socketTimeout?: number; // Email routing rules routes: IEmailRoute[]; // Global defaults for all domains defaults?: { dnsMode?: 'forward' | 'internal-dns' | 'external-dns'; dkim?: IEmailDomainConfig['dkim']; rateLimits?: IEmailDomainConfig['rateLimits']; }; // Outbound settings outbound?: { maxConnections?: number; connectionTimeout?: number; socketTimeout?: number; retryAttempts?: number; defaultFrom?: string; }; // Rate limiting (global limits, can be overridden per domain) rateLimits?: IHierarchicalRateLimits; } /** * Extended SMTP session interface for UnifiedEmailServer */ export interface ISmtpSession extends IBaseSmtpSession { /** * User information if authenticated */ user?: { username: string; [key: string]: any; }; /** * Matched route for this session */ matchedRoute?: IEmailRoute; } /** * Authentication data for SMTP */ import type { ISmtpAuth } from '../delivery/interfaces.js'; export type IAuthData = ISmtpAuth; /** * Server statistics */ export interface IServerStats { startTime: Date; connections: { current: number; total: number; }; messages: { processed: number; delivered: number; failed: number; }; processingTime: { avg: number; max: number; min: number; }; } /** * Unified email server that handles all email traffic with pattern-based routing */ export class UnifiedEmailServer extends EventEmitter { private dcRouter: DcRouter; private options: IUnifiedEmailServerOptions; private emailRouter: EmailRouter; public domainRegistry: DomainRegistry; private servers: any[] = []; private stats: IServerStats; // Add components needed for sending and securing emails public dkimCreator: DKIMCreator; private rustBridge: RustSecurityBridge; private ipReputationChecker: IPReputationChecker; private bounceManager: BounceManager; public deliveryQueue: UnifiedDeliveryQueue; public deliverySystem: MultiModeDeliverySystem; private rateLimiter: UnifiedRateLimiter; // TODO: Implement rate limiting in SMTP server handlers private dkimKeys: Map = new Map(); // domain -> private key constructor(dcRouter: DcRouter, options: IUnifiedEmailServerOptions) { super(); this.dcRouter = dcRouter; // Set default options this.options = { ...options, banner: options.banner || `${options.hostname} ESMTP UnifiedEmailServer`, maxMessageSize: options.maxMessageSize || 10 * 1024 * 1024, // 10MB maxClients: options.maxClients || 100, maxConnections: options.maxConnections || 1000, connectionTimeout: options.connectionTimeout || 60000, // 1 minute socketTimeout: options.socketTimeout || 60000 // 1 minute }; // Initialize Rust security bridge (singleton) this.rustBridge = RustSecurityBridge.getInstance(); // Initialize DKIM creator with storage manager this.dkimCreator = new DKIMCreator(paths.keysDir, dcRouter.storageManager); // Initialize IP reputation checker with storage manager this.ipReputationChecker = IPReputationChecker.getInstance({ enableLocalCache: true, enableDNSBL: true, enableIPInfo: true }, dcRouter.storageManager); // Initialize bounce manager with storage manager this.bounceManager = new BounceManager({ maxCacheSize: 10000, cacheTTL: 30 * 24 * 60 * 60 * 1000, // 30 days storageManager: dcRouter.storageManager }); // Initialize domain registry this.domainRegistry = new DomainRegistry(options.domains, options.defaults); // Initialize email router with routes and storage manager this.emailRouter = new EmailRouter(options.routes || [], { storageManager: dcRouter.storageManager, persistChanges: true }); // Initialize rate limiter this.rateLimiter = new UnifiedRateLimiter(options.rateLimits || { global: { maxConnectionsPerIP: 10, maxMessagesPerMinute: 100, maxRecipientsPerMessage: 50, maxErrorsPerIP: 10, maxAuthFailuresPerIP: 5, blockDuration: 300000 // 5 minutes } }); // Initialize delivery components const queueOptions: IQueueOptions = { storageType: 'memory', // Default to memory storage maxRetries: 3, baseRetryDelay: 300000, // 5 minutes maxRetryDelay: 3600000 // 1 hour }; this.deliveryQueue = new UnifiedDeliveryQueue(queueOptions); const deliveryOptions: IMultiModeDeliveryOptions = { globalRateLimit: 100, // Default to 100 emails per minute concurrentDeliveries: 10, processBounces: true, bounceHandler: { processSmtpFailure: this.processSmtpFailure.bind(this) }, onDeliverySuccess: async (_item, _result) => { // Delivery success recorded via delivery system } }; this.deliverySystem = new MultiModeDeliverySystem(this.deliveryQueue, deliveryOptions, this); // Initialize statistics this.stats = { startTime: new Date(), connections: { current: 0, total: 0 }, messages: { processed: 0, delivered: 0, failed: 0 }, processingTime: { avg: 0, max: 0, min: 0 } }; // We'll create the SMTP servers during the start() method } /** * Send an outbound email via the Rust SMTP client. * Uses connection pooling in the Rust binary for efficiency. */ public async sendOutboundEmail(host: string, port: number, email: Email, options?: { auth?: { user: string; pass: string }; dkimDomain?: string; dkimSelector?: string; }): Promise { // Build DKIM config if domain has keys let dkim: { domain: string; selector: string; privateKey: string } | undefined; if (options?.dkimDomain) { try { const { privateKey } = await this.dkimCreator.readDKIMKeys(options.dkimDomain); dkim = { domain: options.dkimDomain, selector: options.dkimSelector || 'default', privateKey }; } catch (err) { logger.log('warn', `Failed to read DKIM keys for ${options.dkimDomain}: ${(err as Error).message}`); } } // Serialize the Email to the outbound format const outboundEmail: IOutboundEmail = { from: email.from, to: email.to, cc: email.cc || [], bcc: email.bcc || [], subject: email.subject || '', text: email.text || '', html: email.html || undefined, headers: email.headers as Record || {}, }; return this.rustBridge.sendOutboundEmail({ host, port, secure: port === 465, domain: this.options.hostname, auth: options?.auth, email: outboundEmail, dkim, connectionTimeoutSecs: Math.floor((this.options.outbound?.connectionTimeout || 30000) / 1000), socketTimeoutSecs: Math.floor((this.options.outbound?.socketTimeout || 120000) / 1000), poolKey: `${host}:${port}`, maxPoolConnections: this.options.outbound?.maxConnections || 10, }); } /** * Start the unified email server */ public async start(): Promise { logger.log('info', `Starting UnifiedEmailServer on ports: ${(this.options.ports as number[]).join(', ')}`); try { // Initialize the delivery queue await this.deliveryQueue.initialize(); logger.log('info', 'Email delivery queue initialized'); // Start the delivery system await this.deliverySystem.start(); logger.log('info', 'Email delivery system started'); // Start Rust security bridge — required for all security operations const bridgeOk = await this.rustBridge.start(); if (!bridgeOk) { throw new Error('Rust security bridge failed to start. The mailer-bin binary is required. Run "pnpm build" to compile it.'); } logger.log('info', 'Rust security bridge started — Rust is the primary security backend'); // Listen for bridge state changes to propagate resilience events this.rustBridge.on('stateChange', ({ oldState, newState }: { oldState: string; newState: string }) => { if (newState === 'failed') this.emit('bridgeFailed'); else if (newState === 'restarting') this.emit('bridgeRestarting'); else if (newState === 'running' && oldState === 'restarting') this.emit('bridgeRecovered'); }); // Set up DKIM for all domains await this.setupDkimForDomains(); logger.log('info', 'DKIM configuration completed for all domains'); // Create DNS manager and ensure all DNS records are created const dnsManager = new DnsManager(this.dcRouter); await dnsManager.ensureDnsRecords(this.domainRegistry.getAllConfigs(), this.dkimCreator); logger.log('info', 'DNS records ensured for all configured domains'); // Apply per-domain rate limits this.applyDomainRateLimits(); logger.log('info', 'Per-domain rate limits configured'); // Check and rotate DKIM keys if needed await this.checkAndRotateDkimKeys(); logger.log('info', 'DKIM key rotation check completed'); // Ensure we have the necessary TLS options const hasTlsConfig = this.options.tls?.keyPath && this.options.tls?.certPath; // Prepare the certificate and key if available let tlsCertPem: string | undefined; let tlsKeyPem: string | undefined; if (hasTlsConfig) { try { tlsKeyPem = plugins.fs.readFileSync(this.options.tls.keyPath!, 'utf8'); tlsCertPem = plugins.fs.readFileSync(this.options.tls.certPath!, 'utf8'); logger.log('info', 'TLS certificates loaded successfully'); } catch (error) { logger.log('warn', `Failed to load TLS certificates: ${error.message}`); } } // --- Start Rust SMTP server --- // Register event handlers for email reception and auth this.rustBridge.onEmailReceived(async (data) => { try { await this.handleRustEmailReceived(data); } catch (err) { logger.log('error', `Error handling email from Rust SMTP: ${(err as Error).message}`); // Send rejection back to Rust (may fail if bridge is restarting) try { await this.rustBridge.sendEmailProcessingResult({ correlationId: data.correlationId, accepted: false, smtpCode: 451, smtpMessage: 'Internal processing error', }); } catch (sendErr) { logger.log('warn', `Could not send rejection back to Rust: ${(sendErr as Error).message}`); } } }); this.rustBridge.onAuthRequest(async (data) => { try { await this.handleRustAuthRequest(data); } catch (err) { logger.log('error', `Error handling auth from Rust SMTP: ${(err as Error).message}`); try { await this.rustBridge.sendAuthResult({ correlationId: data.correlationId, success: false, message: 'Internal auth error', }); } catch (sendErr) { logger.log('warn', `Could not send auth rejection back to Rust: ${(sendErr as Error).message}`); } } }); // Determine which ports need STARTTLS and which need implicit TLS const smtpPorts = (this.options.ports as number[]).filter(p => p !== 465); const securePort = (this.options.ports as number[]).find(p => p === 465); const started = await this.rustBridge.startSmtpServer({ hostname: this.options.hostname, ports: smtpPorts, securePort: securePort, tlsCertPem, tlsKeyPem, maxMessageSize: this.options.maxMessageSize || 10 * 1024 * 1024, maxConnections: this.options.maxConnections || this.options.maxClients || 100, maxRecipients: 100, connectionTimeoutSecs: this.options.connectionTimeout ? Math.floor(this.options.connectionTimeout / 1000) : 30, dataTimeoutSecs: 60, authEnabled: !!this.options.auth?.required || !!(this.options.auth?.users?.length), maxAuthFailures: 3, socketTimeoutSecs: this.options.socketTimeout ? Math.floor(this.options.socketTimeout / 1000) : 300, processingTimeoutSecs: 30, rateLimits: this.options.rateLimits ? { maxConnectionsPerIp: this.options.rateLimits.global?.maxConnectionsPerIP || 50, maxMessagesPerSender: this.options.rateLimits.global?.maxMessagesPerMinute || 100, maxAuthFailuresPerIp: this.options.rateLimits.global?.maxAuthFailuresPerIP || 5, windowSecs: 60, } : undefined, }); if (!started) { throw new Error('Failed to start Rust SMTP server'); } logger.log('info', `Rust SMTP server listening on ports: ${smtpPorts.join(', ')}${securePort ? ` + ${securePort} (TLS)` : ''}`); logger.log('info', 'UnifiedEmailServer started successfully'); this.emit('started'); } catch (error) { logger.log('error', `Failed to start UnifiedEmailServer: ${error.message}`); throw error; } } /** * Stop the unified email server */ public async stop(): Promise { logger.log('info', 'Stopping UnifiedEmailServer'); try { // Stop the Rust SMTP server first try { await this.rustBridge.stopSmtpServer(); logger.log('info', 'Rust SMTP server stopped'); } catch (err) { logger.log('warn', `Error stopping Rust SMTP server: ${(err as Error).message}`); } // Clear the servers array - servers will be garbage collected this.servers = []; // Remove bridge state change listener and stop bridge this.rustBridge.removeAllListeners('stateChange'); await this.rustBridge.stop(); // Stop the delivery system if (this.deliverySystem) { await this.deliverySystem.stop(); logger.log('info', 'Email delivery system stopped'); } // Shut down the delivery queue if (this.deliveryQueue) { await this.deliveryQueue.shutdown(); logger.log('info', 'Email delivery queue shut down'); } // Close all Rust SMTP client connection pools try { await this.rustBridge.closeSmtpPool(); } catch { // Bridge may already be stopped } logger.log('info', 'UnifiedEmailServer stopped successfully'); this.emit('stopped'); } catch (error) { logger.log('error', `Error stopping UnifiedEmailServer: ${error.message}`); throw error; } } // ----------------------------------------------------------------------- // Rust SMTP server event handlers // ----------------------------------------------------------------------- /** * Handle an emailReceived event from the Rust SMTP server. * Decodes the email data, processes it through the routing system, * and sends back the result via the correlation-ID callback. */ private async handleRustEmailReceived(data: IEmailReceivedEvent): Promise { const { correlationId, mailFrom, rcptTo, remoteAddr, clientHostname, secure, authenticatedUser } = data; logger.log('info', `Rust SMTP received email from=${mailFrom} to=${rcptTo.join(',')} remote=${remoteAddr}`); try { // Decode the email data let rawMessageBuffer: Buffer; if (data.data.type === 'inline' && data.data.base64) { rawMessageBuffer = Buffer.from(data.data.base64, 'base64'); } else if (data.data.type === 'file' && data.data.path) { rawMessageBuffer = plugins.fs.readFileSync(data.data.path); // Clean up temp file try { plugins.fs.unlinkSync(data.data.path); } catch { // Ignore cleanup errors } } else { throw new Error('Invalid email data transport'); } // Build a session-like object for processEmailByMode const session: IExtendedSmtpSession = { id: data.sessionId || 'rust-' + Math.random().toString(36).substring(2), state: SmtpState.FINISHED, mailFrom: mailFrom, rcptTo: rcptTo, emailData: rawMessageBuffer.toString('utf8'), useTLS: secure, connectionEnded: false, remoteAddress: remoteAddr, clientHostname: clientHostname || '', secure: secure, authenticated: !!authenticatedUser, envelope: { mailFrom: { address: mailFrom, args: {} }, rcptTo: rcptTo.map(addr => ({ address: addr, args: {} })), }, }; if (authenticatedUser) { session.user = { username: authenticatedUser }; } // Attach pre-computed security results from Rust in-process pipeline if (data.securityResults) { (session as any)._precomputedSecurityResults = data.securityResults; } // Process the email through the routing system await this.processEmailByMode(rawMessageBuffer, session); // Send acceptance back to Rust await this.rustBridge.sendEmailProcessingResult({ correlationId, accepted: true, smtpCode: 250, smtpMessage: '2.0.0 Message accepted for delivery', }); } catch (err) { logger.log('error', `Failed to process email from Rust SMTP: ${(err as Error).message}`); await this.rustBridge.sendEmailProcessingResult({ correlationId, accepted: false, smtpCode: 550, smtpMessage: `5.0.0 Processing failed: ${(err as Error).message}`, }); } } /** * Handle an authRequest event from the Rust SMTP server. * Validates credentials and sends back the result. */ private async handleRustAuthRequest(data: IAuthRequestEvent): Promise { const { correlationId, username, password, remoteAddr } = data; logger.log('info', `Rust SMTP auth request for user=${username} from=${remoteAddr}`); // Check against configured users const users = this.options.auth?.users || []; const matched = users.find( u => u.username === username && u.password === password ); if (matched) { await this.rustBridge.sendAuthResult({ correlationId, success: true, }); } else { logger.log('warn', `Auth failed for user=${username} from=${remoteAddr}`); await this.rustBridge.sendAuthResult({ correlationId, success: false, message: 'Invalid credentials', }); } } /** * Verify inbound email security (DKIM/SPF/DMARC) using pre-computed Rust results * or falling back to IPC call if no pre-computed results are available. */ private async verifyInboundSecurity(email: Email, session: IExtendedSmtpSession): Promise { try { // Check for pre-computed results from Rust in-process security pipeline const precomputed = (session as any)._precomputedSecurityResults; let result: any; if (precomputed) { logger.log('info', 'Using pre-computed security results from Rust in-process pipeline'); result = precomputed; } else { // Fallback: IPC round-trip to Rust (for backward compat) const rawMessage = session.emailData || email.toRFC822String(); result = await this.rustBridge.verifyEmail({ rawMessage, ip: session.remoteAddress, heloDomain: session.clientHostname || '', hostname: this.options.hostname, mailFrom: session.envelope?.mailFrom?.address || session.mailFrom || '', }); } // Apply DKIM result headers if (result.dkim && result.dkim.length > 0) { const dkimSummary = result.dkim .map((d: any) => `${d.status}${d.domain ? ` (${d.domain})` : ''}`) .join(', '); email.addHeader('X-DKIM-Result', dkimSummary); } // Apply SPF result header if (result.spf) { email.addHeader('Received-SPF', `${result.spf.result} (domain: ${result.spf.domain}, ip: ${result.spf.ip})`); // Mark as spam on SPF hard fail if (result.spf.result === 'fail') { email.mightBeSpam = true; logger.log('warn', `SPF fail for ${session.remoteAddress} — marking as potential spam`); } } // Apply DMARC result header and policy if (result.dmarc) { email.addHeader('X-DMARC-Result', `${result.dmarc.action} (policy=${result.dmarc.policy}, dkim=${result.dmarc.dkim_result}, spf=${result.dmarc.spf_result})`); if (result.dmarc.action === 'reject') { email.mightBeSpam = true; logger.log('warn', `DMARC reject for domain ${result.dmarc.domain} — marking as spam`); } else if (result.dmarc.action === 'quarantine') { email.mightBeSpam = true; logger.log('info', `DMARC quarantine for domain ${result.dmarc.domain} — marking as potential spam`); } } // Apply content scan results (from pre-computed pipeline) if (result.contentScan) { const scan = result.contentScan; if (scan.threatScore > 0) { email.addHeader('X-Spam-Score', String(scan.threatScore)); if (scan.threatType) { email.addHeader('X-Spam-Type', scan.threatType); } if (scan.threatScore >= 50) { email.mightBeSpam = true; logger.log('warn', `Content scan threat score ${scan.threatScore} (${scan.threatType}) — marking as potential spam`); } } } // Apply IP reputation results (from pre-computed pipeline) if (result.ipReputation) { const rep = result.ipReputation; email.addHeader('X-IP-Reputation-Score', String(rep.score)); if (rep.is_spam) { email.mightBeSpam = true; logger.log('warn', `IP ${rep.ip} flagged by reputation check (score=${rep.score}) — marking as potential spam`); } } logger.log('info', `Inbound security verified for email from ${session.remoteAddress}: DKIM=${result.dkim?.[0]?.status ?? 'none'}, SPF=${result.spf?.result ?? 'none'}, DMARC=${result.dmarc?.action ?? 'none'}`); } catch (err) { logger.log('warn', `Inbound security verification failed: ${(err as Error).message} — accepting email`); } } /** * Process email based on routing rules */ public async processEmailByMode(emailData: Email | Buffer, session: IExtendedSmtpSession): Promise { // Convert Buffer to Email if needed let email: Email; if (Buffer.isBuffer(emailData)) { // Parse the email data buffer into an Email object try { const parsed = await plugins.mailparser.simpleParser(emailData); email = new Email({ from: parsed.from?.value[0]?.address || session.envelope.mailFrom.address, to: session.envelope.rcptTo[0]?.address || '', subject: parsed.subject || '', text: parsed.text || '', html: parsed.html || undefined, attachments: parsed.attachments?.map(att => ({ filename: att.filename || '', content: att.content, contentType: att.contentType })) || [] }); } catch (error) { logger.log('error', `Error parsing email data: ${error.message}`); throw new Error(`Error parsing email data: ${error.message}`); } } else { email = emailData; } // Run inbound security verification (DKIM/SPF/DMARC) via Rust bridge if (session.remoteAddress && session.remoteAddress !== '127.0.0.1') { await this.verifyInboundSecurity(email, session); } // First check if this is a bounce notification email // Look for common bounce notification subject patterns const subject = email.subject || ''; const isBounceLike = /mail delivery|delivery (failed|status|notification)|failure notice|returned mail|undeliverable|delivery problem/i.test(subject); if (isBounceLike) { logger.log('info', `Email subject matches bounce notification pattern: "${subject}"`); // Try to process as a bounce const isBounce = await this.processBounceNotification(email); if (isBounce) { logger.log('info', 'Successfully processed as bounce notification, skipping regular processing'); return email; } logger.log('info', 'Not a valid bounce notification, continuing with regular processing'); } // Find matching route const context: IEmailContext = { email, session }; const route = await this.emailRouter.evaluateRoutes(context); if (!route) { // No matching route - reject throw new Error('No matching route for email'); } // Store matched route in session session.matchedRoute = route; // Execute action based on route await this.executeAction(route.action, email, context); // Return the processed email return email; } /** * Execute action based on route configuration */ private async executeAction(action: IEmailAction, email: Email, context: IEmailContext): Promise { switch (action.type) { case 'forward': await this.handleForwardAction(action, email, context); break; case 'process': await this.handleProcessAction(action, email, context); break; case 'deliver': await this.handleDeliverAction(action, email, context); break; case 'reject': await this.handleRejectAction(action, email, context); break; default: throw new Error(`Unknown action type: ${(action as any).type}`); } } /** * Handle forward action */ private async handleForwardAction(_action: IEmailAction, email: Email, context: IEmailContext): Promise { if (!_action.forward) { throw new Error('Forward action requires forward configuration'); } const { host, port = 25, auth, addHeaders } = _action.forward; logger.log('info', `Forwarding email to ${host}:${port}`); // Add forwarding headers if (addHeaders) { for (const [key, value] of Object.entries(addHeaders)) { email.headers[key] = value; } } // Add standard forwarding headers email.headers['X-Forwarded-For'] = context.session.remoteAddress || 'unknown'; email.headers['X-Forwarded-To'] = email.to.join(', '); email.headers['X-Forwarded-Date'] = new Date().toISOString(); try { // Send email via Rust SMTP client await this.sendOutboundEmail(host, port, email, { auth: auth as { user: string; pass: string } | undefined, }); logger.log('info', `Successfully forwarded email to ${host}:${port}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_FORWARDING, message: 'Email forwarded successfully', ipAddress: context.session.remoteAddress, details: { sessionId: context.session.id, routeName: context.session.matchedRoute?.name, targetHost: host, targetPort: port, recipients: email.to }, success: true }); } catch (error) { logger.log('error', `Failed to forward email: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_FORWARDING, message: 'Email forwarding failed', ipAddress: context.session.remoteAddress, details: { sessionId: context.session.id, routeName: context.session.matchedRoute?.name, targetHost: host, targetPort: port, error: error.message }, success: false }); // Handle as bounce for (const recipient of email.getAllRecipients()) { await this.bounceManager.processSmtpFailure(recipient, error.message, { sender: email.from, originalEmailId: email.headers['Message-ID'] as string }); } throw error; } } /** * Handle process action */ private async handleProcessAction(action: IEmailAction, email: Email, context: IEmailContext): Promise { logger.log('info', `Processing email with action options`); // Apply scanning if requested if (action.process?.scan) { // Use existing content scanner // Note: ContentScanner integration would go here logger.log('info', 'Content scanning requested'); } // Note: DKIM signing will be applied at delivery time to ensure signature validity // Queue for delivery const queue = action.process?.queue || 'normal'; await this.deliveryQueue.enqueue(email, 'process', context.session.matchedRoute!); logger.log('info', `Email queued for delivery in ${queue} queue`); } /** * Handle deliver action */ private async handleDeliverAction(_action: IEmailAction, email: Email, context: IEmailContext): Promise { logger.log('info', `Delivering email locally`); // Queue for local delivery await this.deliveryQueue.enqueue(email, 'mta', context.session.matchedRoute!); logger.log('info', 'Email queued for local delivery'); } /** * Handle reject action */ private async handleRejectAction(action: IEmailAction, email: Email, context: IEmailContext): Promise { const code = action.reject?.code || 550; const message = action.reject?.message || 'Message rejected'; logger.log('info', `Rejecting email with code ${code}: ${message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.WARN, type: SecurityEventType.EMAIL_PROCESSING, message: 'Email rejected by routing rule', ipAddress: context.session.remoteAddress, details: { sessionId: context.session.id, routeName: context.session.matchedRoute?.name, rejectCode: code, rejectMessage: message, from: email.from, to: email.to }, success: false }); // Throw error with SMTP code and message const error = new Error(message); (error as any).responseCode = code; throw error; } /** * Set up DKIM configuration for all domains */ private async setupDkimForDomains(): Promise { const domainConfigs = this.domainRegistry.getAllConfigs(); if (domainConfigs.length === 0) { logger.log('warn', 'No domains configured for DKIM'); return; } for (const domainConfig of domainConfigs) { const domain = domainConfig.domain; const selector = domainConfig.dkim?.selector || 'default'; try { // Check if DKIM keys already exist for this domain let keyPair: { privateKey: string; publicKey: string }; try { // Try to read existing keys keyPair = await this.dkimCreator.readDKIMKeys(domain); logger.log('info', `Using existing DKIM keys for domain: ${domain}`); } catch (error) { // Generate new keys if they don't exist keyPair = await this.dkimCreator.createDKIMKeys(); // Store them for future use await this.dkimCreator.createAndStoreDKIMKeys(domain); logger.log('info', `Generated new DKIM keys for domain: ${domain}`); } // Store the private key for signing this.dkimKeys.set(domain, keyPair.privateKey); // DNS record creation is now handled by DnsManager logger.log('info', `DKIM keys loaded for domain: ${domain} with selector: ${selector}`); } catch (error) { logger.log('error', `Failed to set up DKIM for domain ${domain}: ${error.message}`); } } } /** * Apply per-domain rate limits from domain configurations */ private applyDomainRateLimits(): void { const domainConfigs = this.domainRegistry.getAllConfigs(); for (const domainConfig of domainConfigs) { if (domainConfig.rateLimits) { const domain = domainConfig.domain; const rateLimitConfig: any = {}; // Convert domain-specific rate limits to the format expected by UnifiedRateLimiter if (domainConfig.rateLimits.outbound) { if (domainConfig.rateLimits.outbound.messagesPerMinute) { rateLimitConfig.maxMessagesPerMinute = domainConfig.rateLimits.outbound.messagesPerMinute; } // Note: messagesPerHour and messagesPerDay would need additional implementation in rate limiter } if (domainConfig.rateLimits.inbound) { if (domainConfig.rateLimits.inbound.messagesPerMinute) { rateLimitConfig.maxMessagesPerMinute = domainConfig.rateLimits.inbound.messagesPerMinute; } if (domainConfig.rateLimits.inbound.connectionsPerIp) { rateLimitConfig.maxConnectionsPerIP = domainConfig.rateLimits.inbound.connectionsPerIp; } if (domainConfig.rateLimits.inbound.recipientsPerMessage) { rateLimitConfig.maxRecipientsPerMessage = domainConfig.rateLimits.inbound.recipientsPerMessage; } } // Apply the rate limits if we have any if (Object.keys(rateLimitConfig).length > 0) { this.rateLimiter.applyDomainLimits(domain, rateLimitConfig); logger.log('info', `Applied rate limits for domain ${domain}:`, rateLimitConfig); } } } } /** * Check and rotate DKIM keys if needed */ private async checkAndRotateDkimKeys(): Promise { const domainConfigs = this.domainRegistry.getAllConfigs(); for (const domainConfig of domainConfigs) { const domain = domainConfig.domain; const selector = domainConfig.dkim?.selector || 'default'; const rotateKeys = domainConfig.dkim?.rotateKeys || false; const rotationInterval = domainConfig.dkim?.rotationInterval || 90; const keySize = domainConfig.dkim?.keySize || 2048; if (!rotateKeys) { logger.log('debug', `DKIM key rotation disabled for ${domain}`); continue; } try { // Check if keys need rotation const needsRotation = await this.dkimCreator.needsRotation(domain, selector, rotationInterval); if (needsRotation) { logger.log('info', `DKIM keys need rotation for ${domain} (selector: ${selector})`); // Rotate the keys const newSelector = await this.dkimCreator.rotateDkimKeys(domain, selector, keySize); // Update the domain config with new selector domainConfig.dkim = { ...domainConfig.dkim, selector: newSelector }; // Re-register DNS handler for new selector if internal-dns mode if (domainConfig.dnsMode === 'internal-dns' && this.dcRouter.dnsServer) { // Get new public key const keyPair = await this.dkimCreator.readDKIMKeysForSelector(domain, newSelector); const publicKeyBase64 = keyPair.publicKey .replace(/-----BEGIN PUBLIC KEY-----/g, '') .replace(/-----END PUBLIC KEY-----/g, '') .replace(/\s/g, ''); const ttl = domainConfig.dns?.internal?.ttl || 3600; // Register new selector this.dcRouter.dnsServer.registerHandler( `${newSelector}._domainkey.${domain}`, ['TXT'], () => ({ name: `${newSelector}._domainkey.${domain}`, type: 'TXT', class: 'IN', ttl: ttl, data: `v=DKIM1; k=rsa; p=${publicKeyBase64}` }) ); logger.log('info', `DKIM DNS handler registered for new selector: ${newSelector}._domainkey.${domain}`); // Store the updated public key in storage await this.dcRouter.storageManager.set( `/email/dkim/${domain}/public.key`, keyPair.publicKey ); } // Clean up old keys after grace period (async, don't wait) this.dkimCreator.cleanupOldKeys(domain, 30).catch(error => { logger.log('warn', `Failed to cleanup old DKIM keys for ${domain}: ${error.message}`); }); } else { logger.log('debug', `DKIM keys for ${domain} are up to date`); } } catch (error) { logger.log('error', `Failed to check/rotate DKIM keys for ${domain}: ${error.message}`); } } } /** * Generate SmartProxy routes for email ports */ public generateProxyRoutes(portMapping?: Record): any[] { const routes: any[] = []; const defaultPortMapping = { 25: 10025, 587: 10587, 465: 10465 }; const actualPortMapping = portMapping || defaultPortMapping; // Generate routes for each configured port for (const externalPort of this.options.ports) { const internalPort = actualPortMapping[externalPort] || externalPort + 10000; let routeName = 'email-route'; let tlsMode = 'passthrough'; // Configure based on port switch (externalPort) { case 25: routeName = 'smtp-route'; tlsMode = 'passthrough'; // STARTTLS break; case 587: routeName = 'submission-route'; tlsMode = 'passthrough'; // STARTTLS break; case 465: routeName = 'smtps-route'; tlsMode = 'terminate'; // Implicit TLS break; default: routeName = `email-port-${externalPort}-route`; } routes.push({ name: routeName, match: { ports: [externalPort] }, action: { type: 'forward', target: { host: 'localhost', port: internalPort }, tls: { mode: tlsMode } } }); } return routes; } /** * Update server configuration */ public updateOptions(options: Partial): void { // Stop the server if changing ports const portsChanged = options.ports && (!this.options.ports || JSON.stringify(options.ports) !== JSON.stringify(this.options.ports)); if (portsChanged) { this.stop().then(() => { this.options = { ...this.options, ...options }; this.start(); }); } else { // Update options without restart this.options = { ...this.options, ...options }; // Update domain registry if domains changed if (options.domains) { this.domainRegistry = new DomainRegistry(options.domains, options.defaults || this.options.defaults); } // Update email router if routes changed if (options.routes) { this.emailRouter.updateRoutes(options.routes); } } } /** * Update email routes */ public updateEmailRoutes(routes: IEmailRoute[]): void { this.options.routes = routes; this.emailRouter.updateRoutes(routes); } /** * Get server statistics */ public getStats(): IServerStats { return { ...this.stats }; } /** * Get domain registry */ public getDomainRegistry(): DomainRegistry { return this.domainRegistry; } /** * Update email routes dynamically */ public updateRoutes(routes: IEmailRoute[]): void { this.emailRouter.setRoutes(routes); logger.log('info', `Updated email routes with ${routes.length} routes`); } /** * Send an email through the delivery system * @param email The email to send * @param mode The processing mode to use * @param rule Optional rule to apply * @param options Optional sending options * @returns The ID of the queued email */ public async sendEmail( email: Email, mode: EmailProcessingMode = 'mta', route?: IEmailRoute, options?: { skipSuppressionCheck?: boolean; ipAddress?: string; isTransactional?: boolean; } ): Promise { logger.log('info', `Sending email: ${email.subject} to ${email.to.join(', ')}`); try { // Validate the email if (!email.from) { throw new Error('Email must have a sender address'); } if (!email.to || email.to.length === 0) { throw new Error('Email must have at least one recipient'); } // Check if any recipients are on the suppression list (unless explicitly skipped) if (!options?.skipSuppressionCheck) { const suppressedRecipients = email.to.filter(recipient => this.isEmailSuppressed(recipient)); if (suppressedRecipients.length > 0) { // Filter out suppressed recipients const originalCount = email.to.length; const suppressed = suppressedRecipients.map(recipient => { const info = this.getSuppressionInfo(recipient); return { email: recipient, reason: info?.reason || 'Unknown', until: info?.expiresAt ? new Date(info.expiresAt).toISOString() : 'permanent' }; }); logger.log('warn', `Filtering out ${suppressedRecipients.length} suppressed recipient(s)`, { suppressed }); // If all recipients are suppressed, throw an error if (suppressedRecipients.length === originalCount) { throw new Error('All recipients are on the suppression list'); } // Filter the recipients list to only include non-suppressed addresses email.to = email.to.filter(recipient => !this.isEmailSuppressed(recipient)); } } // Check if the sender domain has DKIM keys and sign the email if needed if (mode === 'mta' && route?.action.options?.mtaOptions?.dkimSign) { const domain = email.from.split('@')[1]; await this.handleDkimSigning(email, domain, route.action.options.mtaOptions.dkimOptions?.keySelector || 'mta'); } // Generate a unique ID for this email const id = plugins.uuid.v4(); // Queue the email for delivery await this.deliveryQueue.enqueue(email, mode, route); logger.log('info', `Email queued with ID: ${id}`); return id; } catch (error) { logger.log('error', `Failed to send email: ${error.message}`); throw error; } } /** * Handle DKIM signing for an email * @param email The email to sign * @param domain The domain to sign with * @param selector The DKIM selector */ private async handleDkimSigning(email: Email, domain: string, selector: string): Promise { try { // Ensure we have DKIM keys for this domain await this.dkimCreator.handleDKIMKeysForDomain(domain); // Get the private key const { privateKey } = await this.dkimCreator.readDKIMKeys(domain); // Convert Email to raw format for signing const rawEmail = email.toRFC822String(); // Sign the email via Rust bridge const signResult = await this.rustBridge.signDkim({ rawMessage: rawEmail, domain, selector, privateKey, }); if (signResult.header) { email.addHeader('DKIM-Signature', signResult.header); logger.log('info', `Successfully added DKIM signature for ${domain}`); } } catch (error) { logger.log('error', `Failed to sign email with DKIM: ${error.message}`); // Continue without DKIM rather than failing the send } } /** * Process a bounce notification email * @param bounceEmail The email containing bounce notification information * @returns Processed bounce record or null if not a bounce */ public async processBounceNotification(bounceEmail: Email): Promise { logger.log('info', 'Processing potential bounce notification email'); try { // Process as a bounce notification (no conversion needed anymore) const bounceRecord = await this.bounceManager.processBounceEmail(bounceEmail); if (bounceRecord) { logger.log('info', `Successfully processed bounce notification for ${bounceRecord.recipient}`, { bounceType: bounceRecord.bounceType, bounceCategory: bounceRecord.bounceCategory }); // Notify any registered listeners about the bounce this.emit('bounceProcessed', bounceRecord); // Log security event SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_VALIDATION, message: `Bounce notification processed for recipient`, domain: bounceRecord.domain, details: { recipient: bounceRecord.recipient, bounceType: bounceRecord.bounceType, bounceCategory: bounceRecord.bounceCategory }, success: true }); return true; } else { logger.log('info', 'Email not recognized as a bounce notification'); return false; } } catch (error) { logger.log('error', `Error processing bounce notification: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_VALIDATION, message: 'Failed to process bounce notification', details: { error: error.message, subject: bounceEmail.subject }, success: false }); return false; } } /** * Process an SMTP failure as a bounce * @param recipient Recipient email that failed * @param smtpResponse SMTP error response * @param options Additional options for bounce processing * @returns Processed bounce record */ public async processSmtpFailure( recipient: string, smtpResponse: string, options: { sender?: string; originalEmailId?: string; statusCode?: string; headers?: Record; } = {} ): Promise { logger.log('info', `Processing SMTP failure for ${recipient}: ${smtpResponse}`); try { // Process the SMTP failure through the bounce manager const bounceRecord = await this.bounceManager.processSmtpFailure( recipient, smtpResponse, options ); logger.log('info', `Successfully processed SMTP failure for ${recipient} as ${bounceRecord.bounceCategory} bounce`, { bounceType: bounceRecord.bounceType }); // Notify any registered listeners about the bounce this.emit('bounceProcessed', bounceRecord); // Log security event SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.INFO, type: SecurityEventType.EMAIL_VALIDATION, message: `SMTP failure processed for recipient`, domain: bounceRecord.domain, details: { recipient: bounceRecord.recipient, bounceType: bounceRecord.bounceType, bounceCategory: bounceRecord.bounceCategory, smtpResponse }, success: true }); return true; } catch (error) { logger.log('error', `Error processing SMTP failure: ${error.message}`); SecurityLogger.getInstance().logEvent({ level: SecurityLogLevel.ERROR, type: SecurityEventType.EMAIL_VALIDATION, message: 'Failed to process SMTP failure', details: { recipient, smtpResponse, error: error.message }, success: false }); return false; } } /** * Check if an email address is suppressed (has bounced previously) * @param email Email address to check * @returns Whether the email is suppressed */ public isEmailSuppressed(email: string): boolean { return this.bounceManager.isEmailSuppressed(email); } /** * Get suppression information for an email * @param email Email address to check * @returns Suppression information or null if not suppressed */ public getSuppressionInfo(email: string): { reason: string; timestamp: number; expiresAt?: number; } | null { return this.bounceManager.getSuppressionInfo(email); } /** * Get bounce history information for an email * @param email Email address to check * @returns Bounce history or null if no bounces */ public getBounceHistory(email: string): { lastBounce: number; count: number; type: BounceType; category: BounceCategory; } | null { return this.bounceManager.getBounceInfo(email); } /** * Get all suppressed email addresses * @returns Array of suppressed email addresses */ public getSuppressionList(): string[] { return this.bounceManager.getSuppressionList(); } /** * Get all hard bounced email addresses * @returns Array of hard bounced email addresses */ public getHardBouncedAddresses(): string[] { return this.bounceManager.getHardBouncedAddresses(); } /** * Add an email to the suppression list * @param email Email address to suppress * @param reason Reason for suppression * @param expiresAt Optional expiration time (undefined for permanent) */ public addToSuppressionList(email: string, reason: string, expiresAt?: number): void { this.bounceManager.addToSuppressionList(email, reason, expiresAt); logger.log('info', `Added ${email} to suppression list: ${reason}`); } /** * Remove an email from the suppression list * @param email Email address to remove from suppression */ public removeFromSuppressionList(email: string): void { this.bounceManager.removeFromSuppressionList(email); logger.log('info', `Removed ${email} from suppression list`); } /** * Record email bounce * @param domain Sending domain * @param receivingDomain Receiving domain that bounced * @param bounceType Type of bounce (hard/soft) * @param reason Bounce reason */ public recordBounce(domain: string, receivingDomain: string, bounceType: 'hard' | 'soft', reason: string): void { const bounceRecord = { id: `bounce_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`, recipient: `user@${receivingDomain}`, sender: `user@${domain}`, domain: domain, bounceType: bounceType === 'hard' ? BounceType.INVALID_RECIPIENT : BounceType.TEMPORARY_FAILURE, bounceCategory: bounceType === 'hard' ? BounceCategory.HARD : BounceCategory.SOFT, timestamp: Date.now(), smtpResponse: reason, diagnosticCode: reason, statusCode: bounceType === 'hard' ? '550' : '450', processed: false }; this.bounceManager.processBounce(bounceRecord); } /** * Get the rate limiter instance * @returns The unified rate limiter */ public getRateLimiter(): UnifiedRateLimiter { return this.rateLimiter; } }