feat: implement comprehensive route-based email routing system
Replace legacy domain-rule based routing with flexible route-based system that supports: - Multi-criteria matching (recipients, senders, IPs, authentication) - Four action types (forward, process, deliver, reject) - Moved DKIM signing to delivery phase for signature validity - Connection pooling for efficient email forwarding - Pattern caching for improved performance This provides more granular control over email routing with priority-based matching and comprehensive test coverage.
This commit is contained in:
@ -3,7 +3,8 @@ import { EventEmitter } from 'node:events';
|
||||
import * as fs from 'node:fs';
|
||||
import * as path from 'node:path';
|
||||
import { logger } from '../../logger.js';
|
||||
import { type EmailProcessingMode, type IDomainRule } from '../routing/classes.email.config.js';
|
||||
import { type EmailProcessingMode } from '../routing/classes.email.config.js';
|
||||
import type { IEmailRoute } from '../routing/interfaces.js';
|
||||
|
||||
/**
|
||||
* Queue item status
|
||||
@ -17,7 +18,7 @@ export interface IQueueItem {
|
||||
id: string;
|
||||
processingMode: EmailProcessingMode;
|
||||
processingResult: any;
|
||||
rule: IDomainRule;
|
||||
route: IEmailRoute;
|
||||
status: QueueItemStatus;
|
||||
attempts: number;
|
||||
nextAttempt: Date;
|
||||
@ -218,9 +219,9 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
* Add an item to the queue
|
||||
* @param processingResult Processing result to queue
|
||||
* @param mode Processing mode
|
||||
* @param rule Domain rule
|
||||
* @param route Email route
|
||||
*/
|
||||
public async enqueue(processingResult: any, mode: EmailProcessingMode, rule: IDomainRule): Promise<string> {
|
||||
public async enqueue(processingResult: any, mode: EmailProcessingMode, route: IEmailRoute): Promise<string> {
|
||||
// Check if queue is full
|
||||
if (this.queue.size >= this.options.maxQueueSize) {
|
||||
throw new Error('Queue is full');
|
||||
@ -234,7 +235,7 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
id,
|
||||
processingMode: mode,
|
||||
processingResult,
|
||||
rule,
|
||||
route,
|
||||
status: 'pending',
|
||||
attempts: 0,
|
||||
nextAttempt: new Date(),
|
||||
|
@ -350,7 +350,7 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
details: {
|
||||
itemId: item.id,
|
||||
mode: item.processingMode,
|
||||
pattern: item.rule.pattern,
|
||||
routeName: item.route?.name || 'unknown',
|
||||
deliveryTime
|
||||
},
|
||||
success: true
|
||||
@ -410,7 +410,7 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
details: {
|
||||
itemId: item.id,
|
||||
mode: item.processingMode,
|
||||
pattern: item.rule.pattern,
|
||||
routeName: item.route?.name || 'unknown',
|
||||
error: error.message,
|
||||
deliveryTime
|
||||
},
|
||||
@ -434,12 +434,12 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
logger.log('info', `Forward delivery for item ${item.id}`);
|
||||
|
||||
const email = item.processingResult as Email;
|
||||
const rule = item.rule;
|
||||
const route = item.route;
|
||||
|
||||
// Get target server information
|
||||
const targetServer = rule.target?.server;
|
||||
const targetPort = rule.target?.port || 25;
|
||||
const useTls = rule.target?.useTls ?? false;
|
||||
const targetServer = route?.action.forward?.host;
|
||||
const targetPort = route?.action.forward?.port || 25;
|
||||
const useTls = false; // TLS configuration can be enhanced later
|
||||
|
||||
if (!targetServer) {
|
||||
throw new Error('No target server configured for forward mode');
|
||||
@ -458,6 +458,11 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
// Get SMTP client from UnifiedEmailServer
|
||||
const smtpClient = this.emailServer.getSmtpClient(targetServer, targetPort);
|
||||
|
||||
// Apply DKIM signing if configured in the route
|
||||
if (item.route?.action.options?.mtaOptions?.dkimSign) {
|
||||
await this.applyDkimSigning(email, item.route.action.options.mtaOptions);
|
||||
}
|
||||
|
||||
// Send the email using SmtpClient
|
||||
const result = await smtpClient.sendMail(email);
|
||||
|
||||
@ -486,12 +491,12 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
*/
|
||||
private async handleForwardDeliveryLegacy(item: IQueueItem): Promise<any> {
|
||||
const email = item.processingResult as Email;
|
||||
const rule = item.rule;
|
||||
const route = item.route;
|
||||
|
||||
// Get target server information
|
||||
const targetServer = rule.target?.server;
|
||||
const targetPort = rule.target?.port || 25;
|
||||
const useTls = rule.target?.useTls ?? false;
|
||||
const targetServer = route?.action.forward?.host;
|
||||
const targetPort = route?.action.forward?.port || 25;
|
||||
const useTls = false; // TLS configuration can be enhanced later
|
||||
|
||||
if (!targetServer) {
|
||||
throw new Error('No target server configured for forward mode');
|
||||
@ -528,7 +533,7 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
});
|
||||
|
||||
// Send EHLO
|
||||
await this.smtpCommand(socket, `EHLO ${rule.mtaOptions?.domain || 'localhost'}`);
|
||||
await this.smtpCommand(socket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`);
|
||||
|
||||
// Start TLS if required
|
||||
if (useTls) {
|
||||
@ -538,14 +543,14 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
const tlsSocket = await this.upgradeTls(socket, targetServer);
|
||||
|
||||
// Send EHLO again after STARTTLS
|
||||
await this.smtpCommand(tlsSocket, `EHLO ${rule.mtaOptions?.domain || 'localhost'}`);
|
||||
await this.smtpCommand(tlsSocket, `EHLO ${route?.action.options?.mtaOptions?.domain || 'localhost'}`);
|
||||
|
||||
// Use tlsSocket for remaining commands
|
||||
return this.completeSMTPExchange(tlsSocket, email, rule);
|
||||
return this.completeSMTPExchange(tlsSocket, email, route);
|
||||
}
|
||||
|
||||
// Complete the SMTP exchange
|
||||
return this.completeSMTPExchange(socket, email, rule);
|
||||
return this.completeSMTPExchange(socket, email, route);
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to forward email: ${error.message}`);
|
||||
|
||||
@ -562,19 +567,19 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
* @param email Email to send
|
||||
* @param rule Domain rule
|
||||
*/
|
||||
private async completeSMTPExchange(socket: net.Socket | tls.TLSSocket, email: Email, rule: IDomainRule): Promise<any> {
|
||||
private async completeSMTPExchange(socket: net.Socket | tls.TLSSocket, email: Email, route: any): Promise<any> {
|
||||
try {
|
||||
// Authenticate if credentials provided
|
||||
if (rule.target?.authentication?.user && rule.target?.authentication?.pass) {
|
||||
if (route?.action?.forward?.auth?.user && route?.action?.forward?.auth?.pass) {
|
||||
// Send AUTH LOGIN
|
||||
await this.smtpCommand(socket, 'AUTH LOGIN');
|
||||
|
||||
// Send username (base64)
|
||||
const username = Buffer.from(rule.target.authentication.user).toString('base64');
|
||||
const username = Buffer.from(route.action.forward.auth.user).toString('base64');
|
||||
await this.smtpCommand(socket, username);
|
||||
|
||||
// Send password (base64)
|
||||
const password = Buffer.from(rule.target.authentication.pass).toString('base64');
|
||||
const password = Buffer.from(route.action.forward.auth.pass).toString('base64');
|
||||
await this.smtpCommand(socket, password);
|
||||
}
|
||||
|
||||
@ -599,11 +604,11 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
// Close the connection
|
||||
socket.end();
|
||||
|
||||
logger.log('info', `Email forwarded successfully to ${rule.target?.server}:${rule.target?.port || 25}`);
|
||||
logger.log('info', `Email forwarded successfully to ${route?.action?.forward?.host}:${route?.action?.forward?.port || 25}`);
|
||||
|
||||
return {
|
||||
targetServer: rule.target?.server,
|
||||
targetPort: rule.target?.port || 25,
|
||||
targetServer: route?.action?.forward?.host,
|
||||
targetPort: route?.action?.forward?.port || 25,
|
||||
recipients: email.getAllRecipients().length
|
||||
};
|
||||
} catch (error: any) {
|
||||
@ -624,32 +629,26 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
logger.log('info', `MTA delivery for item ${item.id}`);
|
||||
|
||||
const email = item.processingResult as Email;
|
||||
const rule = item.rule;
|
||||
const route = item.route;
|
||||
|
||||
try {
|
||||
// Apply DKIM signing if configured in the route
|
||||
if (item.route?.action.options?.mtaOptions?.dkimSign) {
|
||||
await this.applyDkimSigning(email, item.route.action.options.mtaOptions);
|
||||
}
|
||||
|
||||
// In a full implementation, this would use the MTA service
|
||||
// For now, we'll simulate a successful delivery
|
||||
|
||||
logger.log('info', `Email processed by MTA: ${email.subject} to ${email.getAllRecipients().join(', ')}`);
|
||||
|
||||
// Apply MTA rule options if provided
|
||||
if (rule.mtaOptions) {
|
||||
const options = rule.mtaOptions;
|
||||
|
||||
// Apply DKIM signing if enabled
|
||||
if (options.dkimSign && options.dkimOptions) {
|
||||
// Sign the email with DKIM
|
||||
logger.log('info', `Signing email with DKIM for domain ${options.dkimOptions.domainName}`);
|
||||
|
||||
// In a full implementation, this would use the DKIM signing library
|
||||
}
|
||||
}
|
||||
// Note: The MTA implementation would handle actual local delivery
|
||||
|
||||
// Simulate successful delivery
|
||||
return {
|
||||
recipients: email.getAllRecipients().length,
|
||||
subject: email.subject,
|
||||
dkimSigned: !!rule.mtaOptions?.dkimSign
|
||||
dkimSigned: !!item.route?.action.options?.mtaOptions?.dkimSign
|
||||
};
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to process email in MTA mode: ${error.message}`);
|
||||
@ -665,15 +664,15 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
logger.log('info', `Process delivery for item ${item.id}`);
|
||||
|
||||
const email = item.processingResult as Email;
|
||||
const rule = item.rule;
|
||||
const route = item.route;
|
||||
|
||||
try {
|
||||
// Apply content scanning if enabled
|
||||
if (rule.contentScanning && rule.scanners && rule.scanners.length > 0) {
|
||||
if (route?.action.options?.contentScanning && route?.action.options?.scanners && route.action.options.scanners.length > 0) {
|
||||
logger.log('info', 'Performing content scanning');
|
||||
|
||||
// Apply each scanner
|
||||
for (const scanner of rule.scanners) {
|
||||
for (const scanner of route.action.options.scanners) {
|
||||
switch (scanner.type) {
|
||||
case 'spam':
|
||||
logger.log('info', 'Scanning for spam content');
|
||||
@ -707,10 +706,10 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
}
|
||||
|
||||
// Apply transformations if defined
|
||||
if (rule.transformations && rule.transformations.length > 0) {
|
||||
if (route?.action.options?.transformations && route?.action.options?.transformations.length > 0) {
|
||||
logger.log('info', 'Applying email transformations');
|
||||
|
||||
for (const transform of rule.transformations) {
|
||||
for (const transform of route.action.options.transformations) {
|
||||
switch (transform.type) {
|
||||
case 'addHeader':
|
||||
if (transform.header && transform.value) {
|
||||
@ -721,14 +720,20 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
// Apply DKIM signing if configured (after all transformations)
|
||||
if (item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim) {
|
||||
await this.applyDkimSigning(email, item.route.action.options?.mtaOptions || {});
|
||||
}
|
||||
|
||||
logger.log('info', `Email successfully processed in store-and-forward mode`);
|
||||
|
||||
// Simulate successful delivery
|
||||
return {
|
||||
recipients: email.getAllRecipients().length,
|
||||
subject: email.subject,
|
||||
scanned: !!rule.contentScanning,
|
||||
transformed: !!(rule.transformations && rule.transformations.length > 0)
|
||||
scanned: !!route?.action.options?.contentScanning,
|
||||
transformed: !!(route?.action.options?.transformations && route?.action.options?.transformations.length > 0),
|
||||
dkimSigned: !!(item.route?.action.options?.mtaOptions?.dkimSign || item.route?.action.process?.dkim)
|
||||
};
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to process email: ${error.message}`);
|
||||
@ -743,6 +748,52 @@ export class MultiModeDeliverySystem extends EventEmitter {
|
||||
return filename.substring(filename.lastIndexOf('.')).toLowerCase();
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply DKIM signing to an email
|
||||
*/
|
||||
private async applyDkimSigning(email: Email, mtaOptions: any): Promise<void> {
|
||||
if (!this.emailServer) {
|
||||
logger.log('warn', 'Cannot apply DKIM signing without email server reference');
|
||||
return;
|
||||
}
|
||||
|
||||
const domainName = mtaOptions.dkimOptions?.domainName || email.from.split('@')[1];
|
||||
const keySelector = mtaOptions.dkimOptions?.keySelector || 'default';
|
||||
|
||||
try {
|
||||
// Ensure DKIM keys exist for the domain
|
||||
await this.emailServer.dkimCreator.handleDKIMKeysForDomain(domainName);
|
||||
|
||||
// Convert Email to raw format for signing
|
||||
const rawEmail = email.toRFC822String();
|
||||
|
||||
// Sign the email
|
||||
const signResult = await plugins.dkimSign(rawEmail, {
|
||||
canonicalization: 'relaxed/relaxed',
|
||||
algorithm: 'rsa-sha256',
|
||||
signTime: new Date(),
|
||||
signatureData: [
|
||||
{
|
||||
signingDomain: domainName,
|
||||
selector: keySelector,
|
||||
privateKey: (await this.emailServer.dkimCreator.readDKIMKeys(domainName)).privateKey,
|
||||
algorithm: 'rsa-sha256',
|
||||
canonicalization: 'relaxed/relaxed'
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
// Add the DKIM-Signature header to the email
|
||||
if (signResult.signatures) {
|
||||
email.addHeader('DKIM-Signature', signResult.signatures);
|
||||
logger.log('info', `Successfully added DKIM signature for ${domainName}`);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.log('error', `Failed to apply DKIM signature: ${error.message}`);
|
||||
// Don't throw - allow email to be sent without DKIM if signing fails
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Format email for SMTP transmission
|
||||
* @param email Email to format
|
||||
|
@ -323,7 +323,7 @@ export class DataHandler implements IDataHandler {
|
||||
// Process the email via the UnifiedEmailServer
|
||||
// Pass the email object, session data, and specify the mode (mta, forward, or process)
|
||||
// This connects SMTP reception to the overall email system
|
||||
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any, 'mta');
|
||||
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any);
|
||||
|
||||
SmtpLogger.info(`Email processed through UnifiedEmailServer: ${email.getMessageId()}`, {
|
||||
sessionId: session.id,
|
||||
@ -373,7 +373,7 @@ export class DataHandler implements IDataHandler {
|
||||
|
||||
// Process the email via the UnifiedEmailServer in forward mode
|
||||
try {
|
||||
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any, 'forward');
|
||||
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any);
|
||||
|
||||
SmtpLogger.info(`Email forwarded through UnifiedEmailServer: ${email.getMessageId()}`, {
|
||||
sessionId: session.id,
|
||||
@ -412,7 +412,7 @@ export class DataHandler implements IDataHandler {
|
||||
|
||||
// Process the email via the UnifiedEmailServer in process mode
|
||||
try {
|
||||
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any, 'process');
|
||||
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any);
|
||||
|
||||
SmtpLogger.info(`Email processed directly through UnifiedEmailServer: ${email.getMessageId()}`, {
|
||||
sessionId: session.id,
|
||||
|
Reference in New Issue
Block a user