Files
smartmta/ts/mail/routing/classes.unified.email.server.ts

1143 lines
38 KiB
TypeScript
Raw Permalink Normal View History

import * as plugins from '../../plugins.js';
import * as paths from '../../paths.js';
import { EventEmitter } from 'events';
import { logger } from '../../logger.js';
import {
SecurityLogger,
SecurityLogLevel,
SecurityEventType
} from '../../security/index.js';
import { DKIMCreator } from '../security/classes.dkimcreator.js';
import { RustSecurityBridge } from '../../security/classes.rustsecuritybridge.js';
import type { IEmailReceivedEvent, IAuthRequestEvent, IEmailData } from '../../security/classes.rustsecuritybridge.js';
import { EmailRouter } from './classes.email.router.js';
import type { IEmailRoute, IEmailAction, IEmailContext, IEmailDomainConfig } from './interfaces.js';
import { Email } from '../core/classes.email.js';
import { DomainRegistry } from './classes.domain.registry.js';
import { DnsManager } from './classes.dns.manager.js';
import { BounceManager, BounceType, BounceCategory } from '../core/classes.bouncemanager.js';
import type { ISmtpSendResult, IOutboundEmail } from '../../security/classes.rustsecuritybridge.js';
import { MultiModeDeliverySystem, type IMultiModeDeliveryOptions } from '../delivery/classes.delivery.system.js';
import { UnifiedDeliveryQueue, type IQueueOptions } from '../delivery/classes.delivery.queue.js';
import { UnifiedRateLimiter, type IHierarchicalRateLimits } from '../delivery/classes.unified.rate.limiter.js';
import { SmtpState } from '../delivery/interfaces.js';
import type { EmailProcessingMode, ISmtpSession as IBaseSmtpSession } from '../delivery/interfaces.js';
import { EmailActionExecutor } from './classes.email.action.executor.js';
import { DkimManager } from './classes.dkim.manager.js';
/** External DcRouter interface shape used by UnifiedEmailServer */
interface DcRouter {
storageManager: any;
dnsServer?: any;
options?: any;
}
2025-10-24 08:09:29 +00:00
/**
* Extended SMTP session interface with route information
*/
export interface IExtendedSmtpSession extends ISmtpSession {
/**
* Matched route for this session
*/
matchedRoute?: IEmailRoute;
}
/**
* Options for the unified email server
*/
export interface IUnifiedEmailServerOptions {
// Base server options
ports: number[];
hostname: string;
domains: IEmailDomainConfig[]; // Domain configurations
banner?: string;
debug?: boolean;
useSocketHandler?: boolean; // Use socket-handler mode instead of port listening
2025-10-24 08:09:29 +00:00
// Authentication options
auth?: {
required?: boolean;
methods?: ('PLAIN' | 'LOGIN' | 'OAUTH2')[];
users?: Array<{username: string, password: string}>;
};
2025-10-24 08:09:29 +00:00
// TLS options
tls?: {
certPath?: string;
keyPath?: string;
caPath?: string;
minVersion?: string;
ciphers?: string;
};
2025-10-24 08:09:29 +00:00
// Limits
maxMessageSize?: number;
maxClients?: number;
maxConnections?: number;
2025-10-24 08:09:29 +00:00
// Connection options
connectionTimeout?: number;
socketTimeout?: number;
2025-10-24 08:09:29 +00:00
// Email routing rules
routes: IEmailRoute[];
2025-10-24 08:09:29 +00:00
// Global defaults for all domains
defaults?: {
dnsMode?: 'forward' | 'internal-dns' | 'external-dns';
dkim?: IEmailDomainConfig['dkim'];
rateLimits?: IEmailDomainConfig['rateLimits'];
};
2025-10-24 08:09:29 +00:00
// Outbound settings
outbound?: {
maxConnections?: number;
connectionTimeout?: number;
socketTimeout?: number;
retryAttempts?: number;
defaultFrom?: string;
};
2025-10-24 08:09:29 +00:00
// Rate limiting (global limits, can be overridden per domain)
rateLimits?: IHierarchicalRateLimits;
}
/**
* Extended SMTP session interface for UnifiedEmailServer
*/
export interface ISmtpSession extends IBaseSmtpSession {
/**
* User information if authenticated
*/
user?: {
username: string;
[key: string]: any;
};
2025-10-24 08:09:29 +00:00
/**
* Matched route for this session
*/
matchedRoute?: IEmailRoute;
}
/**
* Authentication data for SMTP
*/
import type { ISmtpAuth } from '../delivery/interfaces.js';
2025-10-24 08:09:29 +00:00
export type IAuthData = ISmtpAuth;
/**
* Server statistics
*/
export interface IServerStats {
startTime: Date;
connections: {
current: number;
total: number;
};
messages: {
processed: number;
delivered: number;
failed: number;
};
processingTime: {
avg: number;
max: number;
min: number;
};
}
/**
* Unified email server that handles all email traffic with pattern-based routing
*/
export class UnifiedEmailServer extends EventEmitter {
2025-10-24 08:09:29 +00:00
private dcRouter: DcRouter;
private options: IUnifiedEmailServerOptions;
private emailRouter: EmailRouter;
public domainRegistry: DomainRegistry;
private servers: any[] = [];
private stats: IServerStats;
2025-10-24 08:09:29 +00:00
// Add components needed for sending and securing emails
public dkimCreator: DKIMCreator;
private rustBridge: RustSecurityBridge;
2025-10-24 08:09:29 +00:00
private bounceManager: BounceManager;
public deliveryQueue: UnifiedDeliveryQueue;
public deliverySystem: MultiModeDeliverySystem;
private rateLimiter: UnifiedRateLimiter; // TODO: Implement rate limiting in SMTP server handlers
// Extracted subsystems
private actionExecutor: EmailActionExecutor;
private dkimManager: DkimManager;
2025-10-24 08:09:29 +00:00
constructor(dcRouter: DcRouter, options: IUnifiedEmailServerOptions) {
super();
this.dcRouter = dcRouter;
2025-10-24 08:09:29 +00:00
// Set default options
this.options = {
...options,
banner: options.banner || `${options.hostname} ESMTP UnifiedEmailServer`,
maxMessageSize: options.maxMessageSize || 10 * 1024 * 1024, // 10MB
maxClients: options.maxClients || 100,
maxConnections: options.maxConnections || 1000,
connectionTimeout: options.connectionTimeout || 60000, // 1 minute
socketTimeout: options.socketTimeout || 60000 // 1 minute
};
// Initialize Rust security bridge (singleton)
this.rustBridge = RustSecurityBridge.getInstance();
2025-10-24 08:09:29 +00:00
// Initialize DKIM creator with storage manager
this.dkimCreator = new DKIMCreator(paths.keysDir, dcRouter.storageManager);
2025-10-24 08:09:29 +00:00
// Initialize bounce manager with storage manager
this.bounceManager = new BounceManager({
maxCacheSize: 10000,
cacheTTL: 30 * 24 * 60 * 60 * 1000, // 30 days
storageManager: dcRouter.storageManager
});
2025-10-24 08:09:29 +00:00
// Initialize domain registry
this.domainRegistry = new DomainRegistry(options.domains, options.defaults);
2025-10-24 08:09:29 +00:00
// Initialize email router with routes and storage manager
this.emailRouter = new EmailRouter(options.routes || [], {
storageManager: dcRouter.storageManager,
persistChanges: true
});
2025-10-24 08:09:29 +00:00
// Initialize rate limiter
this.rateLimiter = new UnifiedRateLimiter(options.rateLimits || {
global: {
maxConnectionsPerIP: 10,
maxMessagesPerMinute: 100,
maxRecipientsPerMessage: 50,
maxErrorsPerIP: 10,
maxAuthFailuresPerIP: 5,
blockDuration: 300000 // 5 minutes
}
});
2025-10-24 08:09:29 +00:00
// Initialize delivery components
const queueOptions: IQueueOptions = {
storageType: 'memory', // Default to memory storage
maxRetries: 3,
baseRetryDelay: 300000, // 5 minutes
maxRetryDelay: 3600000 // 1 hour
};
2025-10-24 08:09:29 +00:00
this.deliveryQueue = new UnifiedDeliveryQueue(queueOptions);
2025-10-24 08:09:29 +00:00
const deliveryOptions: IMultiModeDeliveryOptions = {
globalRateLimit: 100, // Default to 100 emails per minute
concurrentDeliveries: 10,
processBounces: true,
bounceHandler: {
processSmtpFailure: this.processSmtpFailure.bind(this)
},
onDeliverySuccess: async (_item, _result) => {
// Delivery success recorded via delivery system
2025-10-24 08:09:29 +00:00
}
};
2025-10-24 08:09:29 +00:00
this.deliverySystem = new MultiModeDeliverySystem(this.deliveryQueue, deliveryOptions, this);
// Initialize action executor
this.actionExecutor = new EmailActionExecutor({
sendOutboundEmail: this.sendOutboundEmail.bind(this),
bounceManager: this.bounceManager,
deliveryQueue: this.deliveryQueue,
});
// Initialize DKIM manager
this.dkimManager = new DkimManager(this.dkimCreator, this.domainRegistry, dcRouter, this.rustBridge);
2025-10-24 08:09:29 +00:00
// Initialize statistics
this.stats = {
startTime: new Date(),
connections: {
current: 0,
total: 0
},
messages: {
processed: 0,
delivered: 0,
failed: 0
},
processingTime: {
avg: 0,
max: 0,
min: 0
}
};
2025-10-24 08:09:29 +00:00
// We'll create the SMTP servers during the start() method
}
2025-10-24 08:09:29 +00:00
/**
* Send an outbound email via the Rust SMTP client.
* Uses connection pooling in the Rust binary for efficiency.
2025-10-24 08:09:29 +00:00
*/
public async sendOutboundEmail(host: string, port: number, email: Email, options?: {
auth?: { user: string; pass: string };
dkimDomain?: string;
dkimSelector?: string;
}): Promise<ISmtpSendResult> {
// Build DKIM config if domain has keys
let dkim: { domain: string; selector: string; privateKey: string } | undefined;
if (options?.dkimDomain) {
try {
const { privateKey } = await this.dkimCreator.readDKIMKeys(options.dkimDomain);
dkim = { domain: options.dkimDomain, selector: options.dkimSelector || 'default', privateKey };
} catch (err) {
logger.log('warn', `Failed to read DKIM keys for ${options.dkimDomain}: ${(err as Error).message}`);
}
2025-10-24 08:09:29 +00:00
}
// Serialize the Email to the outbound format
const outboundEmail: IOutboundEmail = {
from: email.from,
to: email.to,
cc: email.cc || [],
bcc: email.bcc || [],
subject: email.subject || '',
text: email.text || '',
html: email.html || undefined,
headers: email.headers as Record<string, string> || {},
};
return this.rustBridge.sendOutboundEmail({
host,
port,
secure: port === 465,
domain: this.options.hostname,
auth: options?.auth,
email: outboundEmail,
dkim,
connectionTimeoutSecs: Math.floor((this.options.outbound?.connectionTimeout || 30000) / 1000),
socketTimeoutSecs: Math.floor((this.options.outbound?.socketTimeout || 120000) / 1000),
poolKey: `${host}:${port}`,
maxPoolConnections: this.options.outbound?.maxConnections || 10,
});
2025-10-24 08:09:29 +00:00
}
2025-10-24 08:09:29 +00:00
/**
* Start the unified email server
*/
public async start(): Promise<void> {
logger.log('info', `Starting UnifiedEmailServer on ports: ${(this.options.ports as number[]).join(', ')}`);
2025-10-24 08:09:29 +00:00
try {
await this.startDeliveryPipeline();
await this.startRustBridge();
await this.initializeDkimAndDns();
this.registerBridgeEventHandlers();
await this.startSmtpServer();
logger.log('info', 'UnifiedEmailServer started successfully');
this.emit('started');
} catch (error) {
logger.log('error', `Failed to start UnifiedEmailServer: ${error.message}`);
throw error;
}
}
private async startDeliveryPipeline(): Promise<void> {
await this.deliveryQueue.initialize();
logger.log('info', 'Email delivery queue initialized');
await this.deliverySystem.start();
logger.log('info', 'Email delivery system started');
}
private async startRustBridge(): Promise<void> {
const bridgeOk = await this.rustBridge.start();
if (!bridgeOk) {
throw new Error('Rust security bridge failed to start. The mailer-bin binary is required. Run "pnpm build" to compile it.');
}
logger.log('info', 'Rust security bridge started — Rust is the primary security backend');
this.rustBridge.on('stateChange', ({ oldState, newState }: { oldState: string; newState: string }) => {
if (newState === 'failed') this.emit('bridgeFailed');
else if (newState === 'restarting') this.emit('bridgeRestarting');
else if (newState === 'running' && oldState === 'restarting') this.emit('bridgeRecovered');
});
}
private async initializeDkimAndDns(): Promise<void> {
await this.dkimManager.setupDkimForDomains();
logger.log('info', 'DKIM configuration completed for all domains');
const dnsManager = new DnsManager(this.dcRouter);
await dnsManager.ensureDnsRecords(this.domainRegistry.getAllConfigs(), this.dkimCreator);
logger.log('info', 'DNS records ensured for all configured domains');
this.applyDomainRateLimits();
logger.log('info', 'Per-domain rate limits configured');
await this.dkimManager.checkAndRotateDkimKeys();
logger.log('info', 'DKIM key rotation check completed');
}
private registerBridgeEventHandlers(): void {
this.rustBridge.onEmailReceived(async (data) => {
try {
await this.handleRustEmailReceived(data);
} catch (err) {
logger.log('error', `Error handling email from Rust SMTP: ${(err as Error).message}`);
try {
await this.rustBridge.sendEmailProcessingResult({
correlationId: data.correlationId,
accepted: false,
smtpCode: 451,
smtpMessage: 'Internal processing error',
});
} catch (sendErr) {
logger.log('warn', `Could not send rejection back to Rust: ${(sendErr as Error).message}`);
}
}
});
this.rustBridge.onAuthRequest(async (data) => {
try {
await this.handleRustAuthRequest(data);
} catch (err) {
logger.log('error', `Error handling auth from Rust SMTP: ${(err as Error).message}`);
try {
await this.rustBridge.sendAuthResult({
correlationId: data.correlationId,
success: false,
message: 'Internal auth error',
});
} catch (sendErr) {
logger.log('warn', `Could not send auth rejection back to Rust: ${(sendErr as Error).message}`);
}
}
});
}
private async startSmtpServer(): Promise<void> {
const hasTlsConfig = this.options.tls?.keyPath && this.options.tls?.certPath;
let tlsCertPem: string | undefined;
let tlsKeyPem: string | undefined;
if (hasTlsConfig) {
try {
tlsKeyPem = plugins.fs.readFileSync(this.options.tls.keyPath!, 'utf8');
tlsCertPem = plugins.fs.readFileSync(this.options.tls.certPath!, 'utf8');
logger.log('info', 'TLS certificates loaded successfully');
} catch (error) {
logger.log('warn', `Failed to load TLS certificates: ${error.message}`);
2025-10-24 08:09:29 +00:00
}
}
const smtpPorts = (this.options.ports as number[]).filter(p => p !== 465);
const securePort = (this.options.ports as number[]).find(p => p === 465);
const started = await this.rustBridge.startSmtpServer({
hostname: this.options.hostname,
ports: smtpPorts,
securePort: securePort,
tlsCertPem,
tlsKeyPem,
maxMessageSize: this.options.maxMessageSize || 10 * 1024 * 1024,
maxConnections: this.options.maxConnections || this.options.maxClients || 100,
maxRecipients: 100,
connectionTimeoutSecs: this.options.connectionTimeout ? Math.floor(this.options.connectionTimeout / 1000) : 30,
dataTimeoutSecs: 60,
authEnabled: !!this.options.auth?.required || !!(this.options.auth?.users?.length),
maxAuthFailures: 3,
socketTimeoutSecs: this.options.socketTimeout ? Math.floor(this.options.socketTimeout / 1000) : 300,
processingTimeoutSecs: 30,
rateLimits: this.options.rateLimits ? {
maxConnectionsPerIp: this.options.rateLimits.global?.maxConnectionsPerIP || 50,
maxMessagesPerSender: this.options.rateLimits.global?.maxMessagesPerMinute || 100,
maxAuthFailuresPerIp: this.options.rateLimits.global?.maxAuthFailuresPerIP || 5,
windowSecs: 60,
} : undefined,
});
if (!started) {
throw new Error('Failed to start Rust SMTP server');
2025-10-24 08:09:29 +00:00
}
logger.log('info', `Rust SMTP server listening on ports: ${smtpPorts.join(', ')}${securePort ? ` + ${securePort} (TLS)` : ''}`);
2025-10-24 08:09:29 +00:00
}
2025-10-24 08:09:29 +00:00
/**
* Stop the unified email server
*/
public async stop(): Promise<void> {
logger.log('info', 'Stopping UnifiedEmailServer');
2025-10-24 08:09:29 +00:00
try {
// Stop the Rust SMTP server first
try {
await this.rustBridge.stopSmtpServer();
logger.log('info', 'Rust SMTP server stopped');
} catch (err) {
logger.log('warn', `Error stopping Rust SMTP server: ${(err as Error).message}`);
}
2025-10-24 08:09:29 +00:00
// Clear the servers array - servers will be garbage collected
this.servers = [];
// Remove bridge state change listener and stop bridge
this.rustBridge.removeAllListeners('stateChange');
await this.rustBridge.stop();
2025-10-24 08:09:29 +00:00
// Stop the delivery system
if (this.deliverySystem) {
await this.deliverySystem.stop();
logger.log('info', 'Email delivery system stopped');
}
2025-10-24 08:09:29 +00:00
// Shut down the delivery queue
if (this.deliveryQueue) {
await this.deliveryQueue.shutdown();
logger.log('info', 'Email delivery queue shut down');
}
// Close all Rust SMTP client connection pools
try {
await this.rustBridge.closeSmtpPool();
} catch {
// Bridge may already be stopped
2025-10-24 08:09:29 +00:00
}
2025-10-24 08:09:29 +00:00
logger.log('info', 'UnifiedEmailServer stopped successfully');
this.emit('stopped');
} catch (error) {
logger.log('error', `Error stopping UnifiedEmailServer: ${error.message}`);
throw error;
}
}
// -----------------------------------------------------------------------
// Rust SMTP server event handlers
// -----------------------------------------------------------------------
/**
* Handle an emailReceived event from the Rust SMTP server.
*/
private async handleRustEmailReceived(data: IEmailReceivedEvent): Promise<void> {
const { correlationId, mailFrom, rcptTo, remoteAddr, clientHostname, secure, authenticatedUser } = data;
logger.log('info', `Rust SMTP received email from=${mailFrom} to=${rcptTo.join(',')} remote=${remoteAddr}`);
try {
// Decode the email data
let rawMessageBuffer: Buffer;
if (data.data.type === 'inline' && data.data.base64) {
rawMessageBuffer = Buffer.from(data.data.base64, 'base64');
} else if (data.data.type === 'file' && data.data.path) {
rawMessageBuffer = plugins.fs.readFileSync(data.data.path);
// Clean up temp file
try {
plugins.fs.unlinkSync(data.data.path);
} catch {
// Ignore cleanup errors
}
} else {
throw new Error('Invalid email data transport');
}
// Build a session-like object for processEmailByMode
const session: IExtendedSmtpSession = {
id: data.sessionId || 'rust-' + Math.random().toString(36).substring(2),
state: SmtpState.FINISHED,
mailFrom: mailFrom,
rcptTo: rcptTo,
emailData: rawMessageBuffer.toString('utf8'),
useTLS: secure,
connectionEnded: false,
remoteAddress: remoteAddr,
clientHostname: clientHostname || '',
secure: secure,
authenticated: !!authenticatedUser,
envelope: {
mailFrom: { address: mailFrom, args: {} },
rcptTo: rcptTo.map(addr => ({ address: addr, args: {} })),
},
};
if (authenticatedUser) {
session.user = { username: authenticatedUser };
}
// Attach pre-computed security results from Rust in-process pipeline
if (data.securityResults) {
(session as any)._precomputedSecurityResults = data.securityResults;
}
// Process the email through the routing system
await this.processEmailByMode(rawMessageBuffer, session);
// Send acceptance back to Rust
await this.rustBridge.sendEmailProcessingResult({
correlationId,
accepted: true,
smtpCode: 250,
smtpMessage: '2.0.0 Message accepted for delivery',
});
} catch (err) {
logger.log('error', `Failed to process email from Rust SMTP: ${(err as Error).message}`);
await this.rustBridge.sendEmailProcessingResult({
correlationId,
accepted: false,
smtpCode: 550,
smtpMessage: `5.0.0 Processing failed: ${(err as Error).message}`,
});
}
}
/**
* Handle an authRequest event from the Rust SMTP server.
*/
private async handleRustAuthRequest(data: IAuthRequestEvent): Promise<void> {
const { correlationId, username, password, remoteAddr } = data;
logger.log('info', `Rust SMTP auth request for user=${username} from=${remoteAddr}`);
// Check against configured users
const users = this.options.auth?.users || [];
const matched = users.find(
u => u.username === username && u.password === password
);
if (matched) {
await this.rustBridge.sendAuthResult({
correlationId,
success: true,
});
} else {
logger.log('warn', `Auth failed for user=${username} from=${remoteAddr}`);
await this.rustBridge.sendAuthResult({
correlationId,
success: false,
message: 'Invalid credentials',
});
}
}
/**
* Verify inbound email security (DKIM/SPF/DMARC) using pre-computed Rust results
* or falling back to IPC call if no pre-computed results are available.
*/
private async verifyInboundSecurity(email: Email, session: IExtendedSmtpSession): Promise<void> {
try {
// Check for pre-computed results from Rust in-process security pipeline
const precomputed = (session as any)._precomputedSecurityResults;
let result: any;
if (precomputed) {
logger.log('info', 'Using pre-computed security results from Rust in-process pipeline');
result = precomputed;
} else {
// Fallback: IPC round-trip to Rust (for backward compat)
const rawMessage = session.emailData || email.toRFC822String();
result = await this.rustBridge.verifyEmail({
rawMessage,
ip: session.remoteAddress,
heloDomain: session.clientHostname || '',
hostname: this.options.hostname,
mailFrom: session.envelope?.mailFrom?.address || session.mailFrom || '',
});
}
// Apply DKIM result headers
if (result.dkim && result.dkim.length > 0) {
const dkimSummary = result.dkim
.map((d: any) => `${d.status}${d.domain ? ` (${d.domain})` : ''}`)
.join(', ');
email.addHeader('X-DKIM-Result', dkimSummary);
}
// Apply SPF result header
if (result.spf) {
email.addHeader('Received-SPF', `${result.spf.result} (domain: ${result.spf.domain}, ip: ${result.spf.ip})`);
// Mark as spam on SPF hard fail
if (result.spf.result === 'fail') {
email.mightBeSpam = true;
logger.log('warn', `SPF fail for ${session.remoteAddress} — marking as potential spam`);
}
}
// Apply DMARC result header and policy
if (result.dmarc) {
email.addHeader('X-DMARC-Result', `${result.dmarc.action} (policy=${result.dmarc.policy}, dkim=${result.dmarc.dkim_result}, spf=${result.dmarc.spf_result})`);
if (result.dmarc.action === 'reject') {
email.mightBeSpam = true;
logger.log('warn', `DMARC reject for domain ${result.dmarc.domain} — marking as spam`);
} else if (result.dmarc.action === 'quarantine') {
email.mightBeSpam = true;
logger.log('info', `DMARC quarantine for domain ${result.dmarc.domain} — marking as potential spam`);
}
}
// Apply content scan results (from pre-computed pipeline)
if (result.contentScan) {
const scan = result.contentScan;
if (scan.threatScore > 0) {
email.addHeader('X-Spam-Score', String(scan.threatScore));
if (scan.threatType) {
email.addHeader('X-Spam-Type', scan.threatType);
}
if (scan.threatScore >= 50) {
email.mightBeSpam = true;
logger.log('warn', `Content scan threat score ${scan.threatScore} (${scan.threatType}) — marking as potential spam`);
}
}
}
// Apply IP reputation results (from pre-computed pipeline)
if (result.ipReputation) {
const rep = result.ipReputation;
email.addHeader('X-IP-Reputation-Score', String(rep.score));
if (rep.is_spam) {
email.mightBeSpam = true;
logger.log('warn', `IP ${rep.ip} flagged by reputation check (score=${rep.score}) — marking as potential spam`);
}
}
logger.log('info', `Inbound security verified for email from ${session.remoteAddress}: DKIM=${result.dkim?.[0]?.status ?? 'none'}, SPF=${result.spf?.result ?? 'none'}, DMARC=${result.dmarc?.action ?? 'none'}`);
} catch (err) {
logger.log('warn', `Inbound security verification failed: ${(err as Error).message} — accepting email`);
}
}
2025-10-24 08:09:29 +00:00
/**
* Process email based on routing rules
*/
public async processEmailByMode(emailData: Email | Buffer, session: IExtendedSmtpSession): Promise<Email> {
// Convert Buffer to Email if needed
let email: Email;
if (Buffer.isBuffer(emailData)) {
// Parse the email data buffer into an Email object
try {
const parsed = await plugins.mailparser.simpleParser(emailData);
email = new Email({
from: parsed.from?.value[0]?.address || session.envelope.mailFrom.address,
to: session.envelope.rcptTo[0]?.address || '',
subject: parsed.subject || '',
text: parsed.text || '',
html: parsed.html || undefined,
attachments: parsed.attachments?.map(att => ({
filename: att.filename || '',
content: att.content,
contentType: att.contentType
})) || []
});
} catch (error) {
logger.log('error', `Error parsing email data: ${error.message}`);
throw new Error(`Error parsing email data: ${error.message}`);
}
} else {
email = emailData;
}
// Run inbound security verification (DKIM/SPF/DMARC) via Rust bridge
if (session.remoteAddress && session.remoteAddress !== '127.0.0.1') {
await this.verifyInboundSecurity(email, session);
}
2025-10-24 08:09:29 +00:00
// First check if this is a bounce notification email
const subject = email.subject || '';
const isBounceLike = /mail delivery|delivery (failed|status|notification)|failure notice|returned mail|undeliverable|delivery problem/i.test(subject);
2025-10-24 08:09:29 +00:00
if (isBounceLike) {
logger.log('info', `Email subject matches bounce notification pattern: "${subject}"`);
2025-10-24 08:09:29 +00:00
const isBounce = await this.processBounceNotification(email);
2025-10-24 08:09:29 +00:00
if (isBounce) {
logger.log('info', 'Successfully processed as bounce notification, skipping regular processing');
return email;
}
2025-10-24 08:09:29 +00:00
logger.log('info', 'Not a valid bounce notification, continuing with regular processing');
}
// Find matching route
const context: IEmailContext = { email, session };
const route = await this.emailRouter.evaluateRoutes(context);
2025-10-24 08:09:29 +00:00
if (!route) {
throw new Error('No matching route for email');
}
2025-10-24 08:09:29 +00:00
// Store matched route in session
session.matchedRoute = route;
2025-10-24 08:09:29 +00:00
// Execute action based on route
await this.actionExecutor.executeAction(route.action, email, context);
2025-10-24 08:09:29 +00:00
// Return the processed email
return email;
}
2025-10-24 08:09:29 +00:00
/**
* Apply per-domain rate limits from domain configurations
*/
private applyDomainRateLimits(): void {
const domainConfigs = this.domainRegistry.getAllConfigs();
2025-10-24 08:09:29 +00:00
for (const domainConfig of domainConfigs) {
if (domainConfig.rateLimits) {
const domain = domainConfig.domain;
const rateLimitConfig: any = {};
2025-10-24 08:09:29 +00:00
if (domainConfig.rateLimits.outbound) {
if (domainConfig.rateLimits.outbound.messagesPerMinute) {
rateLimitConfig.maxMessagesPerMinute = domainConfig.rateLimits.outbound.messagesPerMinute;
}
}
2025-10-24 08:09:29 +00:00
if (domainConfig.rateLimits.inbound) {
if (domainConfig.rateLimits.inbound.messagesPerMinute) {
rateLimitConfig.maxMessagesPerMinute = domainConfig.rateLimits.inbound.messagesPerMinute;
}
if (domainConfig.rateLimits.inbound.connectionsPerIp) {
rateLimitConfig.maxConnectionsPerIP = domainConfig.rateLimits.inbound.connectionsPerIp;
}
if (domainConfig.rateLimits.inbound.recipientsPerMessage) {
rateLimitConfig.maxRecipientsPerMessage = domainConfig.rateLimits.inbound.recipientsPerMessage;
}
}
2025-10-24 08:09:29 +00:00
if (Object.keys(rateLimitConfig).length > 0) {
this.rateLimiter.applyDomainLimits(domain, rateLimitConfig);
logger.log('info', `Applied rate limits for domain ${domain}:`, rateLimitConfig);
}
}
}
}
2025-10-24 08:09:29 +00:00
/**
* Generate SmartProxy routes for email ports
*/
public generateProxyRoutes(portMapping?: Record<number, number>): any[] {
const routes: any[] = [];
const defaultPortMapping = {
25: 10025,
587: 10587,
465: 10465
};
2025-10-24 08:09:29 +00:00
const actualPortMapping = portMapping || defaultPortMapping;
2025-10-24 08:09:29 +00:00
for (const externalPort of this.options.ports) {
const internalPort = actualPortMapping[externalPort] || externalPort + 10000;
2025-10-24 08:09:29 +00:00
let routeName = 'email-route';
let tlsMode = 'passthrough';
2025-10-24 08:09:29 +00:00
switch (externalPort) {
case 25:
routeName = 'smtp-route';
tlsMode = 'passthrough';
2025-10-24 08:09:29 +00:00
break;
case 587:
routeName = 'submission-route';
tlsMode = 'passthrough';
2025-10-24 08:09:29 +00:00
break;
case 465:
routeName = 'smtps-route';
tlsMode = 'terminate';
2025-10-24 08:09:29 +00:00
break;
default:
routeName = `email-port-${externalPort}-route`;
}
2025-10-24 08:09:29 +00:00
routes.push({
name: routeName,
match: {
ports: [externalPort]
},
action: {
type: 'forward',
target: {
host: 'localhost',
port: internalPort
},
tls: {
mode: tlsMode
}
}
});
}
2025-10-24 08:09:29 +00:00
return routes;
}
2025-10-24 08:09:29 +00:00
/**
* Update server configuration
*/
public updateOptions(options: Partial<IUnifiedEmailServerOptions>): void {
const portsChanged = options.ports &&
(!this.options.ports ||
2025-10-24 08:09:29 +00:00
JSON.stringify(options.ports) !== JSON.stringify(this.options.ports));
2025-10-24 08:09:29 +00:00
if (portsChanged) {
this.stop().then(() => {
this.options = { ...this.options, ...options };
this.start();
});
} else {
this.options = { ...this.options, ...options };
2025-10-24 08:09:29 +00:00
if (options.domains) {
this.domainRegistry = new DomainRegistry(options.domains, options.defaults || this.options.defaults);
}
2025-10-24 08:09:29 +00:00
if (options.routes) {
this.emailRouter.updateRoutes(options.routes);
}
}
}
2025-10-24 08:09:29 +00:00
/**
* Update email routes
*/
public updateEmailRoutes(routes: IEmailRoute[]): void {
this.options.routes = routes;
this.emailRouter.updateRoutes(routes);
}
2025-10-24 08:09:29 +00:00
/**
* Get server statistics
*/
public getStats(): IServerStats {
return { ...this.stats };
}
2025-10-24 08:09:29 +00:00
/**
* Get domain registry
*/
public getDomainRegistry(): DomainRegistry {
return this.domainRegistry;
}
2025-10-24 08:09:29 +00:00
/**
* Send an email through the delivery system
*/
public async sendEmail(
email: Email,
mode: EmailProcessingMode = 'mta',
route?: IEmailRoute,
2025-10-24 08:09:29 +00:00
options?: {
skipSuppressionCheck?: boolean;
ipAddress?: string;
isTransactional?: boolean;
}
): Promise<string> {
logger.log('info', `Sending email: ${email.subject} to ${email.to.join(', ')}`);
2025-10-24 08:09:29 +00:00
try {
if (!email.from) {
throw new Error('Email must have a sender address');
}
2025-10-24 08:09:29 +00:00
if (!email.to || email.to.length === 0) {
throw new Error('Email must have at least one recipient');
}
// Check if any recipients are on the suppression list
2025-10-24 08:09:29 +00:00
if (!options?.skipSuppressionCheck) {
const suppressedRecipients = email.to.filter(recipient => this.isEmailSuppressed(recipient));
2025-10-24 08:09:29 +00:00
if (suppressedRecipients.length > 0) {
const originalCount = email.to.length;
const suppressed = suppressedRecipients.map(recipient => {
const info = this.getSuppressionInfo(recipient);
return {
email: recipient,
reason: info?.reason || 'Unknown',
until: info?.expiresAt ? new Date(info.expiresAt).toISOString() : 'permanent'
};
});
2025-10-24 08:09:29 +00:00
logger.log('warn', `Filtering out ${suppressedRecipients.length} suppressed recipient(s)`, { suppressed });
2025-10-24 08:09:29 +00:00
if (suppressedRecipients.length === originalCount) {
throw new Error('All recipients are on the suppression list');
}
2025-10-24 08:09:29 +00:00
email.to = email.to.filter(recipient => !this.isEmailSuppressed(recipient));
}
}
// Sign with DKIM if configured
2025-10-24 08:09:29 +00:00
if (mode === 'mta' && route?.action.options?.mtaOptions?.dkimSign) {
const domain = email.from.split('@')[1];
await this.dkimManager.handleDkimSigning(email, domain, route.action.options.mtaOptions.dkimOptions?.keySelector || 'mta');
2025-10-24 08:09:29 +00:00
}
2025-10-24 08:09:29 +00:00
const id = plugins.uuid.v4();
await this.deliveryQueue.enqueue(email, mode, route);
2025-10-24 08:09:29 +00:00
logger.log('info', `Email queued with ID: ${id}`);
return id;
} catch (error) {
logger.log('error', `Failed to send email: ${error.message}`);
throw error;
}
}
// -----------------------------------------------------------------------
// Bounce/suppression methods
// -----------------------------------------------------------------------
2025-10-24 08:09:29 +00:00
public async processBounceNotification(bounceEmail: Email): Promise<boolean> {
logger.log('info', 'Processing potential bounce notification email');
2025-10-24 08:09:29 +00:00
try {
const bounceRecord = await this.bounceManager.processBounceEmail(bounceEmail);
2025-10-24 08:09:29 +00:00
if (bounceRecord) {
logger.log('info', `Successfully processed bounce notification for ${bounceRecord.recipient}`, {
bounceType: bounceRecord.bounceType,
bounceCategory: bounceRecord.bounceCategory
});
2025-10-24 08:09:29 +00:00
this.emit('bounceProcessed', bounceRecord);
2025-10-24 08:09:29 +00:00
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.EMAIL_VALIDATION,
message: `Bounce notification processed for recipient`,
domain: bounceRecord.domain,
details: {
recipient: bounceRecord.recipient,
bounceType: bounceRecord.bounceType,
bounceCategory: bounceRecord.bounceCategory
},
success: true
});
2025-10-24 08:09:29 +00:00
return true;
} else {
logger.log('info', 'Email not recognized as a bounce notification');
return false;
}
} catch (error) {
logger.log('error', `Error processing bounce notification: ${error.message}`);
2025-10-24 08:09:29 +00:00
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_VALIDATION,
message: 'Failed to process bounce notification',
details: { error: error.message, subject: bounceEmail.subject },
2025-10-24 08:09:29 +00:00
success: false
});
2025-10-24 08:09:29 +00:00
return false;
}
}
2025-10-24 08:09:29 +00:00
public async processSmtpFailure(
recipient: string,
smtpResponse: string,
options: { sender?: string; originalEmailId?: string; statusCode?: string; headers?: Record<string, string> } = {}
2025-10-24 08:09:29 +00:00
): Promise<boolean> {
logger.log('info', `Processing SMTP failure for ${recipient}: ${smtpResponse}`);
2025-10-24 08:09:29 +00:00
try {
const bounceRecord = await this.bounceManager.processSmtpFailure(recipient, smtpResponse, options);
2025-10-24 08:09:29 +00:00
logger.log('info', `Successfully processed SMTP failure for ${recipient} as ${bounceRecord.bounceCategory} bounce`, {
bounceType: bounceRecord.bounceType
});
2025-10-24 08:09:29 +00:00
this.emit('bounceProcessed', bounceRecord);
2025-10-24 08:09:29 +00:00
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.EMAIL_VALIDATION,
message: `SMTP failure processed for recipient`,
domain: bounceRecord.domain,
details: {
recipient: bounceRecord.recipient,
bounceType: bounceRecord.bounceType,
bounceCategory: bounceRecord.bounceCategory,
smtpResponse
},
success: true
});
2025-10-24 08:09:29 +00:00
return true;
} catch (error) {
logger.log('error', `Error processing SMTP failure: ${error.message}`);
2025-10-24 08:09:29 +00:00
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
2025-10-24 08:09:29 +00:00
type: SecurityEventType.EMAIL_VALIDATION,
message: 'Failed to process SMTP failure',
details: { recipient, smtpResponse, error: error.message },
2025-10-24 08:09:29 +00:00
success: false
});
2025-10-24 08:09:29 +00:00
return false;
}
}
2025-10-24 08:09:29 +00:00
public isEmailSuppressed(email: string): boolean {
return this.bounceManager.isEmailSuppressed(email);
}
public getSuppressionInfo(email: string): { reason: string; timestamp: number; expiresAt?: number } | null {
2025-10-24 08:09:29 +00:00
return this.bounceManager.getSuppressionInfo(email);
}
public getBounceHistory(email: string): { lastBounce: number; count: number; type: BounceType; category: BounceCategory } | null {
2025-10-24 08:09:29 +00:00
return this.bounceManager.getBounceInfo(email);
}
2025-10-24 08:09:29 +00:00
public getSuppressionList(): string[] {
return this.bounceManager.getSuppressionList();
}
2025-10-24 08:09:29 +00:00
public getHardBouncedAddresses(): string[] {
return this.bounceManager.getHardBouncedAddresses();
}
2025-10-24 08:09:29 +00:00
public addToSuppressionList(email: string, reason: string, expiresAt?: number): void {
this.bounceManager.addToSuppressionList(email, reason, expiresAt);
logger.log('info', `Added ${email} to suppression list: ${reason}`);
}
2025-10-24 08:09:29 +00:00
public removeFromSuppressionList(email: string): void {
this.bounceManager.removeFromSuppressionList(email);
logger.log('info', `Removed ${email} from suppression list`);
}
2025-10-24 08:09:29 +00:00
public recordBounce(domain: string, receivingDomain: string, bounceType: 'hard' | 'soft', reason: string): void {
const bounceRecord = {
id: `bounce_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`,
recipient: `user@${receivingDomain}`,
sender: `user@${domain}`,
domain: domain,
bounceType: bounceType === 'hard' ? BounceType.INVALID_RECIPIENT : BounceType.TEMPORARY_FAILURE,
bounceCategory: bounceType === 'hard' ? BounceCategory.HARD : BounceCategory.SOFT,
timestamp: Date.now(),
smtpResponse: reason,
diagnosticCode: reason,
statusCode: bounceType === 'hard' ? '550' : '450',
processed: false
};
2025-10-24 08:09:29 +00:00
this.bounceManager.processBounce(bounceRecord);
}
2025-10-24 08:09:29 +00:00
/**
* Get the rate limiter instance
*/
public getRateLimiter(): UnifiedRateLimiter {
return this.rateLimiter;
}
}