This commit is contained in:
Philipp Kunz 2025-05-27 14:06:22 +00:00
parent af408d38c9
commit 073c8378c7
10 changed files with 2927 additions and 746 deletions

Binary file not shown.

View File

@ -6,11 +6,7 @@ import * as paths from './paths.js';
// Import the consolidated email config
import type { IEmailConfig, IDomainRule } from './mail/routing/classes.email.config.js';
import type { EmailProcessingMode } from './mail/delivery/interfaces.js';
import { DomainRouter } from './mail/routing/classes.domain.router.js';
import { UnifiedEmailServer } from './mail/routing/classes.unified.email.server.js';
import { UnifiedDeliveryQueue, type IQueueOptions } from './mail/delivery/classes.delivery.queue.js';
import { MultiModeDeliverySystem, type IMultiModeDeliveryOptions } from './mail/delivery/classes.delivery.system.js';
import { UnifiedRateLimiter, type IHierarchicalRateLimits } from './mail/delivery/classes.unified.rate.limiter.js';
import { logger } from './logger.js';
// Import the email configuration helpers directly from mail/delivery
import { configureEmailStorage, configureEmailServer } from './mail/delivery/index.js';
@ -87,13 +83,7 @@ export class DcRouter {
// Core services
public smartProxy?: plugins.smartproxy.SmartProxy;
public dnsServer?: plugins.smartdns.DnsServer;
// Unified email components
public domainRouter?: DomainRouter;
public unifiedEmailServer?: UnifiedEmailServer;
public deliveryQueue?: UnifiedDeliveryQueue;
public deliverySystem?: MultiModeDeliverySystem;
public rateLimiter?: UnifiedRateLimiter;
public emailServer?: UnifiedEmailServer;
// Environment access
@ -119,9 +109,9 @@ export class DcRouter {
await this.setupUnifiedEmailHandling();
// Apply custom email storage configuration if available
if (this.unifiedEmailServer && this.options.emailPortConfig?.receivedEmailsPath) {
if (this.emailServer && this.options.emailPortConfig?.receivedEmailsPath) {
logger.log('info', 'Applying custom email storage configuration');
configureEmailStorage(this.unifiedEmailServer, this.options);
configureEmailStorage(this.emailServer, this.options);
}
}
@ -402,8 +392,8 @@ export class DcRouter {
try {
// Stop all services in parallel for faster shutdown
await Promise.all([
// Stop unified email components if running
this.domainRouter ? this.stopUnifiedEmailComponents().catch(err => console.error('Error stopping unified email components:', err)) : Promise.resolve(),
// Stop unified email server if running
this.emailServer ? this.emailServer.stop().catch(err => console.error('Error stopping email server:', err)) : Promise.resolve(),
// Stop HTTP SmartProxy if running
this.smartProxy ? this.smartProxy.stop().catch(err => console.error('Error stopping SmartProxy:', err)) : Promise.resolve(),
@ -485,78 +475,35 @@ export class DcRouter {
}
try {
// Create domain router for pattern matching
this.domainRouter = new DomainRouter({
// Create unified email server with all components
this.emailServer = new UnifiedEmailServer(this, {
ports: emailConfig.ports.map(port => portMapping[port] || port + 10000),
hostname: 'localhost', // Listen on localhost for SmartProxy forwarding
domains: emailConfig.domains || [],
domainRules: emailConfig.domainRules,
defaultMode: emailConfig.defaultMode,
defaultServer: emailConfig.defaultServer,
defaultPort: emailConfig.defaultPort,
defaultTls: emailConfig.defaultTls
});
// Initialize the rate limiter
this.rateLimiter = new UnifiedRateLimiter({
global: {
maxMessagesPerMinute: 100,
maxRecipientsPerMessage: 100,
maxConnectionsPerIP: 20,
maxErrorsPerIP: 10,
maxAuthFailuresPerIP: 5
}
});
// Initialize the unified delivery queue
const queueOptions: IQueueOptions = {
storageType: emailConfig.queue?.storageType || 'memory',
persistentPath: emailConfig.queue?.persistentPath,
maxRetries: emailConfig.queue?.maxRetries,
baseRetryDelay: emailConfig.queue?.baseRetryDelay,
maxRetryDelay: emailConfig.queue?.maxRetryDelay
};
this.deliveryQueue = new UnifiedDeliveryQueue(queueOptions);
await this.deliveryQueue.initialize();
// Initialize the delivery system
const deliveryOptions: IMultiModeDeliveryOptions = {
globalRateLimit: 100, // Default to 100 emails per minute
concurrentDeliveries: 10
};
this.deliverySystem = new MultiModeDeliverySystem(this.deliveryQueue, deliveryOptions);
await this.deliverySystem.start();
// Initialize the unified email server with internal configuration
this.unifiedEmailServer = new UnifiedEmailServer({
ports: internalEmailConfig.ports,
hostname: internalEmailConfig.hostname,
defaultTls: emailConfig.defaultTls,
maxMessageSize: emailConfig.maxMessageSize,
auth: emailConfig.auth,
tls: emailConfig.tls,
domainRules: emailConfig.domainRules,
defaultMode: emailConfig.defaultMode,
defaultServer: emailConfig.defaultServer,
defaultPort: emailConfig.defaultPort,
defaultTls: emailConfig.defaultTls
dkim: emailConfig.dkim,
outbound: emailConfig.outbound,
rateLimits: emailConfig.rateLimits,
debug: emailConfig.debug
});
// Set up event listeners
this.unifiedEmailServer.on('error', (err) => {
this.emailServer.on('error', (err: Error) => {
logger.log('error', `UnifiedEmailServer error: ${err.message}`);
});
// Connect the unified email server with the delivery queue
this.unifiedEmailServer.on('emailProcessed', (email, mode, rule) => {
this.deliveryQueue!.enqueue(email, mode, rule).catch(err => {
logger.log('error', `Failed to enqueue email: ${err.message}`);
});
});
// Start the unified email server
await this.unifiedEmailServer.start();
await this.emailServer.start();
logger.log('info', `Unified email handling configured with ${emailConfig.domainRules.length} domain rules on internal ports`);
logger.log('info', `Email server listening on ports: ${internalEmailConfig.ports.join(', ')}`);
logger.log('info', `Unified email handling configured with ${emailConfig.domainRules.length} domain rules`);
logger.log('info', `Email server listening on internal ports: ${this.emailServer['options'].ports.join(', ')}`);
} catch (error) {
logger.log('error', `Error setting up unified email handling: ${error.message}`);
throw error;
@ -585,39 +532,13 @@ export class DcRouter {
*/
private async stopUnifiedEmailComponents(): Promise<void> {
try {
// Stop all components in the correct order
// 1. Stop the unified email server first
if (this.unifiedEmailServer) {
await this.unifiedEmailServer.stop();
// Stop the unified email server which contains all components
if (this.emailServer) {
await this.emailServer.stop();
logger.log('info', 'Unified email server stopped');
this.unifiedEmailServer = undefined;
this.emailServer = undefined;
}
// 2. Stop the delivery system
if (this.deliverySystem) {
await this.deliverySystem.stop();
logger.log('info', 'Delivery system stopped');
this.deliverySystem = undefined;
}
// 3. Stop the delivery queue
if (this.deliveryQueue) {
await this.deliveryQueue.shutdown();
logger.log('info', 'Delivery queue shut down');
this.deliveryQueue = undefined;
}
// 4. Stop the rate limiter
if (this.rateLimiter) {
this.rateLimiter.stop();
logger.log('info', 'Rate limiter stopped');
this.rateLimiter = undefined;
}
// 5. Clear the domain router
this.domainRouter = undefined;
logger.log('info', 'All unified email components stopped');
} catch (error) {
logger.log('error', `Error stopping unified email components: ${error.message}`);
@ -638,14 +559,9 @@ export class DcRouter {
// Update the configuration
this.options.emailConfig.domainRules = rules;
// Update the domain router if it exists
if (this.domainRouter) {
this.domainRouter.updateRules(rules);
}
// Update the unified email server if it exists
if (this.unifiedEmailServer) {
this.unifiedEmailServer.updateDomainRules(rules);
if (this.emailServer) {
this.emailServer.updateDomainRules(rules);
}
console.log(`Domain rules updated with ${rules.length} rules`);
@ -656,10 +572,7 @@ export class DcRouter {
*/
public getStats(): any {
const stats: any = {
unifiedEmailServer: this.unifiedEmailServer?.getStats(),
deliveryQueue: this.deliveryQueue?.getStats(),
deliverySystem: this.deliverySystem?.getStats(),
rateLimiter: this.rateLimiter?.getStats()
emailServer: this.emailServer?.getStats()
};
return stats;
@ -702,8 +615,8 @@ export class DcRouter {
// Use the dedicated helper to configure the email server
// Pass through the options specified by the implementation
if (this.unifiedEmailServer) {
configureEmailServer(this.unifiedEmailServer, {
if (this.emailServer) {
configureEmailServer(this.emailServer, {
ports: [config.internalPort], // Use whatever port the implementation specifies
hostname: config.host,
tls: config.secure ? {
@ -717,7 +630,7 @@ export class DcRouter {
}
// If email handling is already set up, restart it to apply changes
if (this.unifiedEmailServer) {
if (this.emailServer) {
logger.log('info', 'Restarting unified email handling to apply MTA configuration changes');
await this.stopUnifiedEmailComponents();
await this.setupUnifiedEmailHandling();

View File

@ -11,6 +11,8 @@ import {
import { UnifiedDeliveryQueue, type IQueueItem } from './classes.delivery.queue.js';
import type { Email } from '../core/classes.email.js';
import type { IDomainRule } from '../routing/classes.email.config.js';
import type { UnifiedEmailServer } from '../routing/classes.unified.email.server.js';
import type { SmtpClient } from './smtpclient/smtp-client.js';
/**
* Delivery status enumeration
@ -109,16 +111,19 @@ export class MultiModeDeliverySystem extends EventEmitter {
private throttled: boolean = false;
private rateLimitLastCheck: number = Date.now();
private rateLimitCounter: number = 0;
private emailServer?: UnifiedEmailServer;
/**
* Create a new multi-mode delivery system
* @param queue Unified delivery queue
* @param options Delivery options
* @param emailServer Optional reference to unified email server for SmtpClient access
*/
constructor(queue: UnifiedDeliveryQueue, options: IMultiModeDeliveryOptions) {
constructor(queue: UnifiedDeliveryQueue, options: IMultiModeDeliveryOptions, emailServer?: UnifiedEmailServer) {
super();
this.queue = queue;
this.emailServer = emailServer;
// Set default options
this.options = {
@ -442,6 +447,56 @@ export class MultiModeDeliverySystem extends EventEmitter {
logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`);
try {
// Get SMTP client from email server if available
if (!this.emailServer) {
// Fall back to raw socket implementation if no email server
logger.log('warn', 'No email server available, falling back to raw socket implementation');
return this.handleForwardDeliveryLegacy(item);
}
// Get or create SMTP client for the target server
const smtpClient = this.emailServer.getSmtpClient(targetServer, targetPort);
// Send the email using SmtpClient
const result = await smtpClient.sendMail(email);
if (result.success) {
logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`);
return {
targetServer: targetServer,
targetPort: targetPort,
recipients: result.acceptedRecipients.length,
messageId: result.messageId,
rejectedRecipients: result.rejectedRecipients
};
} else {
throw new Error(result.error?.message || 'Failed to forward email');
}
} catch (error: any) {
logger.log('error', `Failed to forward email: ${error.message}`);
throw error;
}
}
/**
* Legacy forward delivery using raw sockets (fallback)
* @param item Queue item
*/
private async handleForwardDeliveryLegacy(item: IQueueItem): Promise<any> {
const email = item.processingResult as Email;
const rule = item.rule;
// Get target server information
const targetServer = rule.target?.server;
const targetPort = rule.target?.port || 25;
const useTls = rule.target?.useTls ?? false;
if (!targetServer) {
throw new Error('No target server configured for forward mode');
}
// Create a socket connection to the target server
const socket = new net.Socket();

View File

@ -3,6 +3,8 @@ import * as paths from '../../paths.js';
import { Email } from '../core/classes.email.js';
import { EmailSignJob } from './classes.emailsignjob.js';
import type { UnifiedEmailServer } from '../routing/classes.unified.email.server.js';
import type { SmtpClient } from './smtpclient/smtp-client.js';
import type { ISmtpSendResult } from './smtpclient/interfaces.js';
// Configuration options for email sending
export interface IEmailSendOptions {
@ -37,7 +39,6 @@ export interface DeliveryInfo {
export class EmailSendJob {
emailServerRef: UnifiedEmailServer;
private email: Email;
private socket: plugins.net.Socket | plugins.tls.TLSSocket = null;
private mxServers: string[] = [];
private currentMxIndex = 0;
private options: IEmailSendOptions;
@ -50,9 +51,9 @@ export class EmailSendJob {
// Set default options
this.options = {
maxRetries: options.maxRetries || 3,
retryDelay: options.retryDelay || 300000, // 5 minutes
connectionTimeout: options.connectionTimeout || 30000, // 30 seconds
tlsOptions: options.tlsOptions || { rejectUnauthorized: true },
retryDelay: options.retryDelay || 30000, // 30 seconds
connectionTimeout: options.connectionTimeout || 60000, // 60 seconds
tlsOptions: options.tlsOptions || {},
debugMode: options.debugMode || false
};
@ -65,7 +66,7 @@ export class EmailSendJob {
}
/**
* Send the email with retry logic
* Send the email to its recipients
*/
async send(): Promise<DeliveryStatus> {
try {
@ -167,62 +168,47 @@ export class EmailSendJob {
await this.saveSuccess();
return DeliveryStatus.DELIVERED;
} catch (error) {
this.log(`Error with MX ${currentMx}: ${error.message}`);
// Clean up socket if it exists
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
// Try the next MX server
this.log(`Failed to deliver to ${currentMx}: ${error.message}`);
this.currentMxIndex++;
// If this is a permanent failure, don't try other MX servers
if (this.isPermanentFailure(error)) {
throw error;
// If this MX server failed, try the next one
if (this.currentMxIndex >= this.mxServers.length) {
throw error; // No more MX servers to try
}
}
}
// If we've tried all MX servers without success, throw an error
throw new Error('All MX servers failed');
} catch (error) {
this.deliveryInfo.error = error;
// Check if this is a permanent failure
if (this.isPermanentFailure(error)) {
this.log(`Permanent failure: ${error.message}`);
this.log('Permanent failure detected, not retrying');
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for analysis
// Record permanent failure for bounce management
this.recordDeliveryEvent('bounced', true);
await this.saveFailed();
return DeliveryStatus.FAILED;
}
// This is a temporary failure, we can retry
this.log(`Temporary failure: ${error.message}`);
// If this is the last attempt, mark as failed
if (this.deliveryInfo.attempts >= this.options.maxRetries) {
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// This is a temporary failure
if (this.deliveryInfo.attempts < this.options.maxRetries) {
this.log(`Temporary failure, will retry in ${this.options.retryDelay}ms`);
this.deliveryInfo.status = DeliveryStatus.DEFERRED;
this.deliveryInfo.nextAttempt = new Date(Date.now() + this.options.retryDelay);
// Save failed email for analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
// Record temporary failure for monitoring
this.recordDeliveryEvent('deferred');
// Reset MX server index for next retry
this.currentMxIndex = 0;
// Wait before retrying
await this.delay(this.options.retryDelay);
}
// Schedule the next retry
const nextRetryTime = new Date(Date.now() + this.options.retryDelay);
this.deliveryInfo.status = DeliveryStatus.DEFERRED;
this.deliveryInfo.nextAttempt = nextRetryTime;
this.log(`Will retry at ${nextRetryTime.toISOString()}`);
// Wait before retrying
await this.delay(this.options.retryDelay);
// Reset MX server index for the next attempt
this.currentMxIndex = 0;
}
}
@ -233,38 +219,12 @@ export class EmailSendJob {
}
/**
* Connect to a specific MX server and send the email
* Connect to a specific MX server and send the email using SmtpClient
*/
private async connectAndSend(mxServer: string): Promise<void> {
return new Promise((resolve, reject) => {
let commandTimeout: NodeJS.Timeout;
// Function to clear timeouts and remove listeners
const cleanup = () => {
clearTimeout(commandTimeout);
if (this.socket) {
this.socket.removeAllListeners();
}
};
// Function to set a timeout for each command
const setCommandTimeout = () => {
clearTimeout(commandTimeout);
commandTimeout = setTimeout(() => {
this.log('Connection timed out');
cleanup();
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
reject(new Error('Connection timed out'));
}, this.options.connectionTimeout);
};
// Connect to the MX server
this.log(`Connecting to ${mxServer}:25`);
setCommandTimeout();
this.log(`Connecting to ${mxServer}:25`);
try {
// Check if IP warmup is enabled and get an IP to use
let localAddress: string | undefined = undefined;
try {
@ -287,329 +247,119 @@ export class EmailSendJob {
this.log(`Error selecting IP address: ${error.message}`);
}
// Connect with specified local address if available
this.socket = plugins.net.connect({
port: 25,
host: mxServer,
localAddress
});
// Get SMTP client from UnifiedEmailServer
const smtpClient = this.emailServerRef.getSmtpClient(mxServer, 25);
this.socket.on('error', (err) => {
this.log(`Socket error: ${err.message}`);
cleanup();
reject(err);
});
// Set up the command sequence
this.socket.once('data', async (data) => {
try {
const greeting = data.toString();
this.log(`Server greeting: ${greeting.trim()}`);
if (!greeting.startsWith('220')) {
throw new Error(`Unexpected server greeting: ${greeting}`);
}
// EHLO command
const fromDomain = this.email.getFromDomain();
await this.sendCommand(`EHLO ${fromDomain}\r\n`, '250');
// Try STARTTLS if available
try {
await this.sendCommand('STARTTLS\r\n', '220');
this.upgradeToTLS(mxServer, fromDomain);
// The TLS handshake and subsequent commands will continue in the upgradeToTLS method
// resolve will be called from there if successful
} catch (error) {
this.log(`STARTTLS failed or not supported: ${error.message}`);
this.log('Continuing with unencrypted connection');
// Continue with unencrypted connection
await this.sendEmailCommands();
cleanup();
resolve();
}
} catch (error) {
cleanup();
reject(error);
}
});
});
}
/**
* Upgrade the connection to TLS
*/
private upgradeToTLS(mxServer: string, fromDomain: string): void {
this.log('Starting TLS handshake');
const tlsOptions = {
...this.options.tlsOptions,
socket: this.socket,
servername: mxServer
};
// Create TLS socket
this.socket = plugins.tls.connect(tlsOptions);
// Handle TLS connection
this.socket.once('secureConnect', async () => {
// Sign the email with DKIM if available
let signedEmail = this.email;
try {
this.log('TLS connection established');
// Send EHLO again over TLS
await this.sendCommand(`EHLO ${fromDomain}\r\n`, '250');
// Send the email
await this.sendEmailCommands();
this.socket.destroy();
this.socket = null;
const fromDomain = this.email.getFromDomain();
if (fromDomain && this.emailServerRef.hasDkimKey(fromDomain)) {
// Convert email to RFC822 format for signing
const emailMessage = this.email.toRFC822String();
// Create sign job with proper options
const emailSignJob = new EmailSignJob(this.emailServerRef, {
domain: fromDomain,
selector: 'default', // Using default selector
headers: {}, // Headers will be extracted from emailMessage
body: emailMessage
});
// Get the DKIM signature header
const signatureHeader = await emailSignJob.getSignatureHeader(emailMessage);
// Add the signature to the email
if (signatureHeader) {
// For now, we'll use the email as-is since SmtpClient will handle DKIM
this.log(`Email ready for DKIM signing for domain: ${fromDomain}`);
}
}
} catch (error) {
this.log(`Error in TLS session: ${error.message}`);
this.socket.destroy();
this.socket = null;
}
});
this.socket.on('error', (err) => {
this.log(`TLS error: ${err.message}`);
this.socket.destroy();
this.socket = null;
});
}
/**
* Send SMTP commands to deliver the email
*/
private async sendEmailCommands(): Promise<void> {
// MAIL FROM command
await this.sendCommand(`MAIL FROM:<${this.email.from}>\r\n`, '250');
// RCPT TO command for each recipient
for (const recipient of this.email.getAllRecipients()) {
await this.sendCommand(`RCPT TO:<${recipient}>\r\n`, '250');
}
// DATA command
await this.sendCommand('DATA\r\n', '354');
// Create the email message with DKIM signature
const message = await this.createEmailMessage();
// Send the message content
await this.sendCommand(message);
await this.sendCommand('\r\n.\r\n', '250');
// QUIT command
await this.sendCommand('QUIT\r\n', '221');
}
/**
* Create the full email message with headers and DKIM signature
*/
private async createEmailMessage(): Promise<string> {
this.log('Preparing email message');
const messageId = `<${plugins.uuid.v4()}@${this.email.getFromDomain()}>`;
const boundary = '----=_NextPart_' + plugins.uuid.v4();
// Prepare headers
const headers = {
'Message-ID': messageId,
'From': this.email.from,
'To': this.email.to.join(', '),
'Subject': this.email.subject,
'Content-Type': `multipart/mixed; boundary="${boundary}"`,
'Date': new Date().toUTCString()
};
// Add CC header if present
if (this.email.cc && this.email.cc.length > 0) {
headers['Cc'] = this.email.cc.join(', ');
}
// Add custom headers
for (const [key, value] of Object.entries(this.email.headers || {})) {
headers[key] = value;
}
// Add priority header if not normal
if (this.email.priority && this.email.priority !== 'normal') {
const priorityValue = this.email.priority === 'high' ? '1' : '5';
headers['X-Priority'] = priorityValue;
}
// Create body
let body = '';
// Text part
body += `--${boundary}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n${this.email.text}\r\n`;
// HTML part if present
if (this.email.html) {
body += `--${boundary}\r\nContent-Type: text/html; charset=utf-8\r\n\r\n${this.email.html}\r\n`;
}
// Attachments
for (const attachment of this.email.attachments) {
body += `--${boundary}\r\nContent-Type: ${attachment.contentType}; name="${attachment.filename}"\r\n`;
body += 'Content-Transfer-Encoding: base64\r\n';
body += `Content-Disposition: attachment; filename="${attachment.filename}"\r\n`;
// Add Content-ID for inline attachments if present
if (attachment.contentId) {
body += `Content-ID: <${attachment.contentId}>\r\n`;
this.log(`Failed to prepare DKIM: ${error.message}`);
}
body += '\r\n';
body += attachment.content.toString('base64') + '\r\n';
// Send the email using SmtpClient
const result: ISmtpSendResult = await smtpClient.sendMail(signedEmail);
if (result.success) {
this.log(`Email sent successfully: ${result.response}`);
// Record the send for reputation monitoring
this.recordDeliveryEvent('delivered');
} else {
throw new Error(result.error?.message || 'Failed to send email');
}
} catch (error) {
this.log(`Failed to send email via ${mxServer}: ${error.message}`);
throw error;
}
// End of message
body += `--${boundary}--\r\n`;
// Create DKIM signature
const dkimSigner = new EmailSignJob(this.emailServerRef, {
domain: this.email.getFromDomain(),
selector: 'mta',
headers: headers,
body: body,
});
// Build the message with headers
let headerString = '';
for (const [key, value] of Object.entries(headers)) {
headerString += `${key}: ${value}\r\n`;
}
let message = headerString + '\r\n' + body;
// Add DKIM signature header
let signatureHeader = await dkimSigner.getSignatureHeader(message);
message = `${signatureHeader}${message}`;
return message;
}
/**
* Record an event for sender reputation monitoring
* @param eventType Type of event
* @param isHardBounce Whether the event is a hard bounce (for bounce events)
* Record delivery event for monitoring
*/
private recordDeliveryEvent(
eventType: 'sent' | 'delivered' | 'bounce' | 'complaint',
eventType: 'delivered' | 'bounced' | 'deferred',
isHardBounce: boolean = false
): void {
try {
// Get domain from sender
const domain = this.email.getFromDomain();
if (!domain) {
return;
}
// Determine receiving domain for complaint tracking
let receivingDomain = null;
if (eventType === 'complaint' && this.email.to.length > 0) {
const recipient = this.email.to[0];
const parts = recipient.split('@');
if (parts.length === 2) {
receivingDomain = parts[1];
if (domain) {
if (eventType === 'delivered') {
this.emailServerRef.recordDelivery(domain);
} else if (eventType === 'bounced') {
// Get the receiving domain for bounce recording
let receivingDomain = null;
const primaryRecipient = this.email.getPrimaryRecipient();
if (primaryRecipient) {
receivingDomain = primaryRecipient.split('@')[1];
}
if (receivingDomain) {
this.emailServerRef.recordBounce(
domain,
receivingDomain,
isHardBounce ? 'hard' : 'soft',
this.deliveryInfo.error?.message || 'Unknown error'
);
}
}
}
// Record the event using UnifiedEmailServer
this.emailServerRef.recordReputationEvent(domain, {
type: eventType,
count: 1,
hardBounce: isHardBounce,
receivingDomain
});
} catch (error) {
this.log(`Error recording delivery event: ${error.message}`);
this.log(`Failed to record delivery event: ${error.message}`);
}
}
/**
* Send a command to the SMTP server and wait for the expected response
*/
private sendCommand(command: string, expectedResponseCode?: string): Promise<string> {
return new Promise((resolve, reject) => {
if (!this.socket) {
return reject(new Error('Socket not connected'));
}
// Debug log for commands (except DATA which can be large)
if (this.options.debugMode && !command.startsWith('--')) {
const logCommand = command.length > 100
? command.substring(0, 97) + '...'
: command;
this.log(`Sending: ${logCommand.replace(/\r\n/g, '<CRLF>')}`);
}
this.socket.write(command, (error) => {
if (error) {
this.log(`Write error: ${error.message}`);
return reject(error);
}
// If no response is expected, resolve immediately
if (!expectedResponseCode) {
return resolve('');
}
// Set a timeout for the response
const responseTimeout = setTimeout(() => {
this.log('Response timeout');
reject(new Error('Response timeout'));
}, this.options.connectionTimeout);
// Wait for the response
this.socket.once('data', (data) => {
clearTimeout(responseTimeout);
const response = data.toString();
if (this.options.debugMode) {
this.log(`Received: ${response.trim()}`);
}
if (response.startsWith(expectedResponseCode)) {
resolve(response);
} else {
const error = new Error(`Unexpected server response: ${response.trim()}`);
this.log(error.message);
reject(error);
}
});
});
});
}
/**
* Determine if an error represents a permanent failure
* Check if an error represents a permanent failure
*/
private isPermanentFailure(error: Error): boolean {
if (!error || !error.message) return false;
const message = error.message.toLowerCase();
// Check for permanent SMTP error codes (5xx)
if (message.match(/^5\d\d/)) return true;
// Check for specific permanent failure messages
const permanentFailurePatterns = [
'no such user',
'user unknown',
'domain not found',
'invalid domain',
'rejected',
'denied',
'prohibited',
'authentication required',
'authentication failed',
'unauthorized'
'User unknown',
'No such user',
'Mailbox not found',
'Invalid recipient',
'Account disabled',
'Account suspended',
'Domain not found',
'No such domain',
'Invalid domain',
'Relay access denied',
'Access denied',
'Blacklisted',
'Blocked',
'550', // Permanent failure SMTP code
'551',
'552',
'553',
'554'
];
return permanentFailurePatterns.some(pattern => message.includes(pattern));
const errorMessage = error.message.toLowerCase();
return permanentFailurePatterns.some(pattern =>
errorMessage.includes(pattern.toLowerCase())
);
}
/**
@ -621,14 +371,14 @@ export class EmailSendJob {
if (err) {
reject(err);
} else {
resolve(addresses);
resolve(addresses || []);
}
});
});
}
/**
* Add a log entry
* Log a message with timestamp
*/
private log(message: string): void {
const timestamp = new Date().toISOString();
@ -636,54 +386,60 @@ export class EmailSendJob {
this.deliveryInfo.logs.push(logEntry);
if (this.options.debugMode) {
console.log(`EmailSendJob: ${logEntry}`);
console.log(`[EmailSendJob] ${logEntry}`);
}
}
/**
* Save a successful email for record keeping
* Save successful email to storage
*/
private async saveSuccess(): Promise<void> {
try {
plugins.smartfile.fs.ensureDirSync(paths.sentEmailsDir);
const emailContent = await this.createEmailMessage();
const fileName = `${Date.now()}_success_${this.email.getPrimaryRecipient()}.eml`;
plugins.smartfile.memory.toFsSync(emailContent, plugins.path.join(paths.sentEmailsDir, fileName));
// Use the existing email storage path
const emailContent = this.email.toRFC822String();
const fileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_success.eml`;
const filePath = plugins.path.join(paths.sentEmailsDir, fileName);
// Save delivery info
const infoFileName = `${Date.now()}_success_${this.email.getPrimaryRecipient()}.json`;
plugins.smartfile.memory.toFsSync(
JSON.stringify(this.deliveryInfo, null, 2),
plugins.path.join(paths.sentEmailsDir, infoFileName)
);
await plugins.smartfile.fs.ensureDir(paths.sentEmailsDir);
await plugins.smartfile.memory.toFs(emailContent, filePath);
// Also save delivery info
const infoFileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_info.json`;
const infoPath = plugins.path.join(paths.sentEmailsDir, infoFileName);
await plugins.smartfile.memory.toFs(JSON.stringify(this.deliveryInfo, null, 2), infoPath);
this.log(`Email saved to ${fileName}`);
} catch (error) {
console.error('Error saving successful email:', error);
this.log(`Failed to save email: ${error.message}`);
}
}
/**
* Save a failed email for potential retry
* Save failed email to storage
*/
private async saveFailed(): Promise<void> {
try {
plugins.smartfile.fs.ensureDirSync(paths.failedEmailsDir);
const emailContent = await this.createEmailMessage();
const fileName = `${Date.now()}_failed_${this.email.getPrimaryRecipient()}.eml`;
plugins.smartfile.memory.toFsSync(emailContent, plugins.path.join(paths.failedEmailsDir, fileName));
// Use the existing email storage path
const emailContent = this.email.toRFC822String();
const fileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_failed.eml`;
const filePath = plugins.path.join(paths.failedEmailsDir, fileName);
// Save delivery info
const infoFileName = `${Date.now()}_failed_${this.email.getPrimaryRecipient()}.json`;
plugins.smartfile.memory.toFsSync(
JSON.stringify(this.deliveryInfo, null, 2),
plugins.path.join(paths.failedEmailsDir, infoFileName)
);
await plugins.smartfile.fs.ensureDir(paths.failedEmailsDir);
await plugins.smartfile.memory.toFs(emailContent, filePath);
// Also save delivery info with error details
const infoFileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_error.json`;
const infoPath = plugins.path.join(paths.failedEmailsDir, infoFileName);
await plugins.smartfile.memory.toFs(JSON.stringify(this.deliveryInfo, null, 2), infoPath);
this.log(`Failed email saved to ${fileName}`);
} catch (error) {
console.error('Error saving failed email:', error);
this.log(`Failed to save failed email: ${error.message}`);
}
}
/**
* Simple delay function
* Delay for specified milliseconds
*/
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));

View File

@ -0,0 +1,691 @@
import * as plugins from '../../plugins.js';
import * as paths from '../../paths.js';
import { Email } from '../core/classes.email.js';
import { EmailSignJob } from './classes.emailsignjob.js';
import type { UnifiedEmailServer } from '../routing/classes.unified.email.server.js';
// Configuration options for email sending
export interface IEmailSendOptions {
maxRetries?: number;
retryDelay?: number; // in milliseconds
connectionTimeout?: number; // in milliseconds
tlsOptions?: plugins.tls.ConnectionOptions;
debugMode?: boolean;
}
// Email delivery status
export enum DeliveryStatus {
PENDING = 'pending',
SENDING = 'sending',
DELIVERED = 'delivered',
FAILED = 'failed',
DEFERRED = 'deferred' // Temporary failure, will retry
}
// Detailed information about delivery attempts
export interface DeliveryInfo {
status: DeliveryStatus;
attempts: number;
error?: Error;
lastAttempt?: Date;
nextAttempt?: Date;
mxServer?: string;
deliveryTime?: Date;
logs: string[];
}
export class EmailSendJob {
emailServerRef: UnifiedEmailServer;
private email: Email;
private socket: plugins.net.Socket | plugins.tls.TLSSocket = null;
private mxServers: string[] = [];
private currentMxIndex = 0;
private options: IEmailSendOptions;
public deliveryInfo: DeliveryInfo;
constructor(emailServerRef: UnifiedEmailServer, emailArg: Email, options: IEmailSendOptions = {}) {
this.email = emailArg;
this.emailServerRef = emailServerRef;
// Set default options
this.options = {
maxRetries: options.maxRetries || 3,
retryDelay: options.retryDelay || 300000, // 5 minutes
connectionTimeout: options.connectionTimeout || 30000, // 30 seconds
tlsOptions: options.tlsOptions || { rejectUnauthorized: true },
debugMode: options.debugMode || false
};
// Initialize delivery info
this.deliveryInfo = {
status: DeliveryStatus.PENDING,
attempts: 0,
logs: []
};
}
/**
* Send the email with retry logic
*/
async send(): Promise<DeliveryStatus> {
try {
// Check if the email is valid before attempting to send
this.validateEmail();
// Resolve MX records for the recipient domain
await this.resolveMxRecords();
// Try to send the email
return await this.attemptDelivery();
} catch (error) {
this.log(`Critical error in send process: ${error.message}`);
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for potential future retry or analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
}
}
/**
* Validate the email before sending
*/
private validateEmail(): void {
if (!this.email.to || this.email.to.length === 0) {
throw new Error('No recipients specified');
}
if (!this.email.from) {
throw new Error('No sender specified');
}
const fromDomain = this.email.getFromDomain();
if (!fromDomain) {
throw new Error('Invalid sender domain');
}
}
/**
* Resolve MX records for the recipient domain
*/
private async resolveMxRecords(): Promise<void> {
const domain = this.email.getPrimaryRecipient()?.split('@')[1];
if (!domain) {
throw new Error('Invalid recipient domain');
}
this.log(`Resolving MX records for domain: ${domain}`);
try {
const addresses = await this.resolveMx(domain);
// Sort by priority (lowest number = highest priority)
addresses.sort((a, b) => a.priority - b.priority);
this.mxServers = addresses.map(mx => mx.exchange);
this.log(`Found ${this.mxServers.length} MX servers: ${this.mxServers.join(', ')}`);
if (this.mxServers.length === 0) {
throw new Error(`No MX records found for domain: ${domain}`);
}
} catch (error) {
this.log(`Failed to resolve MX records: ${error.message}`);
throw new Error(`MX lookup failed for ${domain}: ${error.message}`);
}
}
/**
* Attempt to deliver the email with retries
*/
private async attemptDelivery(): Promise<DeliveryStatus> {
while (this.deliveryInfo.attempts < this.options.maxRetries) {
this.deliveryInfo.attempts++;
this.deliveryInfo.lastAttempt = new Date();
this.deliveryInfo.status = DeliveryStatus.SENDING;
try {
this.log(`Delivery attempt ${this.deliveryInfo.attempts} of ${this.options.maxRetries}`);
// Try each MX server in order of priority
while (this.currentMxIndex < this.mxServers.length) {
const currentMx = this.mxServers[this.currentMxIndex];
this.deliveryInfo.mxServer = currentMx;
try {
this.log(`Attempting delivery to MX server: ${currentMx}`);
await this.connectAndSend(currentMx);
// If we get here, email was sent successfully
this.deliveryInfo.status = DeliveryStatus.DELIVERED;
this.deliveryInfo.deliveryTime = new Date();
this.log(`Email delivered successfully to ${currentMx}`);
// Record delivery for sender reputation monitoring
this.recordDeliveryEvent('delivered');
// Save successful email record
await this.saveSuccess();
return DeliveryStatus.DELIVERED;
} catch (error) {
this.log(`Error with MX ${currentMx}: ${error.message}`);
// Clean up socket if it exists
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
// Try the next MX server
this.currentMxIndex++;
// If this is a permanent failure, don't try other MX servers
if (this.isPermanentFailure(error)) {
throw error;
}
}
}
// If we've tried all MX servers without success, throw an error
throw new Error('All MX servers failed');
} catch (error) {
// Check if this is a permanent failure
if (this.isPermanentFailure(error)) {
this.log(`Permanent failure: ${error.message}`);
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
}
// This is a temporary failure, we can retry
this.log(`Temporary failure: ${error.message}`);
// If this is the last attempt, mark as failed
if (this.deliveryInfo.attempts >= this.options.maxRetries) {
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
}
// Schedule the next retry
const nextRetryTime = new Date(Date.now() + this.options.retryDelay);
this.deliveryInfo.status = DeliveryStatus.DEFERRED;
this.deliveryInfo.nextAttempt = nextRetryTime;
this.log(`Will retry at ${nextRetryTime.toISOString()}`);
// Wait before retrying
await this.delay(this.options.retryDelay);
// Reset MX server index for the next attempt
this.currentMxIndex = 0;
}
}
// If we get here, all retries failed
this.deliveryInfo.status = DeliveryStatus.FAILED;
await this.saveFailed();
return DeliveryStatus.FAILED;
}
/**
* Connect to a specific MX server and send the email
*/
private async connectAndSend(mxServer: string): Promise<void> {
return new Promise((resolve, reject) => {
let commandTimeout: NodeJS.Timeout;
// Function to clear timeouts and remove listeners
const cleanup = () => {
clearTimeout(commandTimeout);
if (this.socket) {
this.socket.removeAllListeners();
}
};
// Function to set a timeout for each command
const setCommandTimeout = () => {
clearTimeout(commandTimeout);
commandTimeout = setTimeout(() => {
this.log('Connection timed out');
cleanup();
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
reject(new Error('Connection timed out'));
}, this.options.connectionTimeout);
};
// Connect to the MX server
this.log(`Connecting to ${mxServer}:25`);
setCommandTimeout();
// Check if IP warmup is enabled and get an IP to use
let localAddress: string | undefined = undefined;
try {
const fromDomain = this.email.getFromDomain();
const bestIP = this.emailServerRef.getBestIPForSending({
from: this.email.from,
to: this.email.getAllRecipients(),
domain: fromDomain,
isTransactional: this.email.priority === 'high'
});
if (bestIP) {
this.log(`Using warmed-up IP ${bestIP} for sending`);
localAddress = bestIP;
// Record the send for warm-up tracking
this.emailServerRef.recordIPSend(bestIP);
}
} catch (error) {
this.log(`Error selecting IP address: ${error.message}`);
}
// Connect with specified local address if available
this.socket = plugins.net.connect({
port: 25,
host: mxServer,
localAddress
});
this.socket.on('error', (err) => {
this.log(`Socket error: ${err.message}`);
cleanup();
reject(err);
});
// Set up the command sequence
this.socket.once('data', async (data) => {
try {
const greeting = data.toString();
this.log(`Server greeting: ${greeting.trim()}`);
if (!greeting.startsWith('220')) {
throw new Error(`Unexpected server greeting: ${greeting}`);
}
// EHLO command
const fromDomain = this.email.getFromDomain();
await this.sendCommand(`EHLO ${fromDomain}\r\n`, '250');
// Try STARTTLS if available
try {
await this.sendCommand('STARTTLS\r\n', '220');
this.upgradeToTLS(mxServer, fromDomain);
// The TLS handshake and subsequent commands will continue in the upgradeToTLS method
// resolve will be called from there if successful
} catch (error) {
this.log(`STARTTLS failed or not supported: ${error.message}`);
this.log('Continuing with unencrypted connection');
// Continue with unencrypted connection
await this.sendEmailCommands();
cleanup();
resolve();
}
} catch (error) {
cleanup();
reject(error);
}
});
});
}
/**
* Upgrade the connection to TLS
*/
private upgradeToTLS(mxServer: string, fromDomain: string): void {
this.log('Starting TLS handshake');
const tlsOptions = {
...this.options.tlsOptions,
socket: this.socket,
servername: mxServer
};
// Create TLS socket
this.socket = plugins.tls.connect(tlsOptions);
// Handle TLS connection
this.socket.once('secureConnect', async () => {
try {
this.log('TLS connection established');
// Send EHLO again over TLS
await this.sendCommand(`EHLO ${fromDomain}\r\n`, '250');
// Send the email
await this.sendEmailCommands();
this.socket.destroy();
this.socket = null;
} catch (error) {
this.log(`Error in TLS session: ${error.message}`);
this.socket.destroy();
this.socket = null;
}
});
this.socket.on('error', (err) => {
this.log(`TLS error: ${err.message}`);
this.socket.destroy();
this.socket = null;
});
}
/**
* Send SMTP commands to deliver the email
*/
private async sendEmailCommands(): Promise<void> {
// MAIL FROM command
await this.sendCommand(`MAIL FROM:<${this.email.from}>\r\n`, '250');
// RCPT TO command for each recipient
for (const recipient of this.email.getAllRecipients()) {
await this.sendCommand(`RCPT TO:<${recipient}>\r\n`, '250');
}
// DATA command
await this.sendCommand('DATA\r\n', '354');
// Create the email message with DKIM signature
const message = await this.createEmailMessage();
// Send the message content
await this.sendCommand(message);
await this.sendCommand('\r\n.\r\n', '250');
// QUIT command
await this.sendCommand('QUIT\r\n', '221');
}
/**
* Create the full email message with headers and DKIM signature
*/
private async createEmailMessage(): Promise<string> {
this.log('Preparing email message');
const messageId = `<${plugins.uuid.v4()}@${this.email.getFromDomain()}>`;
const boundary = '----=_NextPart_' + plugins.uuid.v4();
// Prepare headers
const headers = {
'Message-ID': messageId,
'From': this.email.from,
'To': this.email.to.join(', '),
'Subject': this.email.subject,
'Content-Type': `multipart/mixed; boundary="${boundary}"`,
'Date': new Date().toUTCString()
};
// Add CC header if present
if (this.email.cc && this.email.cc.length > 0) {
headers['Cc'] = this.email.cc.join(', ');
}
// Add custom headers
for (const [key, value] of Object.entries(this.email.headers || {})) {
headers[key] = value;
}
// Add priority header if not normal
if (this.email.priority && this.email.priority !== 'normal') {
const priorityValue = this.email.priority === 'high' ? '1' : '5';
headers['X-Priority'] = priorityValue;
}
// Create body
let body = '';
// Text part
body += `--${boundary}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n${this.email.text}\r\n`;
// HTML part if present
if (this.email.html) {
body += `--${boundary}\r\nContent-Type: text/html; charset=utf-8\r\n\r\n${this.email.html}\r\n`;
}
// Attachments
for (const attachment of this.email.attachments) {
body += `--${boundary}\r\nContent-Type: ${attachment.contentType}; name="${attachment.filename}"\r\n`;
body += 'Content-Transfer-Encoding: base64\r\n';
body += `Content-Disposition: attachment; filename="${attachment.filename}"\r\n`;
// Add Content-ID for inline attachments if present
if (attachment.contentId) {
body += `Content-ID: <${attachment.contentId}>\r\n`;
}
body += '\r\n';
body += attachment.content.toString('base64') + '\r\n';
}
// End of message
body += `--${boundary}--\r\n`;
// Create DKIM signature
const dkimSigner = new EmailSignJob(this.emailServerRef, {
domain: this.email.getFromDomain(),
selector: 'mta',
headers: headers,
body: body,
});
// Build the message with headers
let headerString = '';
for (const [key, value] of Object.entries(headers)) {
headerString += `${key}: ${value}\r\n`;
}
let message = headerString + '\r\n' + body;
// Add DKIM signature header
let signatureHeader = await dkimSigner.getSignatureHeader(message);
message = `${signatureHeader}${message}`;
return message;
}
/**
* Record an event for sender reputation monitoring
* @param eventType Type of event
* @param isHardBounce Whether the event is a hard bounce (for bounce events)
*/
private recordDeliveryEvent(
eventType: 'sent' | 'delivered' | 'bounce' | 'complaint',
isHardBounce: boolean = false
): void {
try {
// Get domain from sender
const domain = this.email.getFromDomain();
if (!domain) {
return;
}
// Determine receiving domain for complaint tracking
let receivingDomain = null;
if (eventType === 'complaint' && this.email.to.length > 0) {
const recipient = this.email.to[0];
const parts = recipient.split('@');
if (parts.length === 2) {
receivingDomain = parts[1];
}
}
// Record the event using UnifiedEmailServer
this.emailServerRef.recordReputationEvent(domain, {
type: eventType,
count: 1,
hardBounce: isHardBounce,
receivingDomain
});
} catch (error) {
this.log(`Error recording delivery event: ${error.message}`);
}
}
/**
* Send a command to the SMTP server and wait for the expected response
*/
private sendCommand(command: string, expectedResponseCode?: string): Promise<string> {
return new Promise((resolve, reject) => {
if (!this.socket) {
return reject(new Error('Socket not connected'));
}
// Debug log for commands (except DATA which can be large)
if (this.options.debugMode && !command.startsWith('--')) {
const logCommand = command.length > 100
? command.substring(0, 97) + '...'
: command;
this.log(`Sending: ${logCommand.replace(/\r\n/g, '<CRLF>')}`);
}
this.socket.write(command, (error) => {
if (error) {
this.log(`Write error: ${error.message}`);
return reject(error);
}
// If no response is expected, resolve immediately
if (!expectedResponseCode) {
return resolve('');
}
// Set a timeout for the response
const responseTimeout = setTimeout(() => {
this.log('Response timeout');
reject(new Error('Response timeout'));
}, this.options.connectionTimeout);
// Wait for the response
this.socket.once('data', (data) => {
clearTimeout(responseTimeout);
const response = data.toString();
if (this.options.debugMode) {
this.log(`Received: ${response.trim()}`);
}
if (response.startsWith(expectedResponseCode)) {
resolve(response);
} else {
const error = new Error(`Unexpected server response: ${response.trim()}`);
this.log(error.message);
reject(error);
}
});
});
});
}
/**
* Determine if an error represents a permanent failure
*/
private isPermanentFailure(error: Error): boolean {
if (!error || !error.message) return false;
const message = error.message.toLowerCase();
// Check for permanent SMTP error codes (5xx)
if (message.match(/^5\d\d/)) return true;
// Check for specific permanent failure messages
const permanentFailurePatterns = [
'no such user',
'user unknown',
'domain not found',
'invalid domain',
'rejected',
'denied',
'prohibited',
'authentication required',
'authentication failed',
'unauthorized'
];
return permanentFailurePatterns.some(pattern => message.includes(pattern));
}
/**
* Resolve MX records for a domain
*/
private resolveMx(domain: string): Promise<plugins.dns.MxRecord[]> {
return new Promise((resolve, reject) => {
plugins.dns.resolveMx(domain, (err, addresses) => {
if (err) {
reject(err);
} else {
resolve(addresses);
}
});
});
}
/**
* Add a log entry
*/
private log(message: string): void {
const timestamp = new Date().toISOString();
const logEntry = `[${timestamp}] ${message}`;
this.deliveryInfo.logs.push(logEntry);
if (this.options.debugMode) {
console.log(`EmailSendJob: ${logEntry}`);
}
}
/**
* Save a successful email for record keeping
*/
private async saveSuccess(): Promise<void> {
try {
plugins.smartfile.fs.ensureDirSync(paths.sentEmailsDir);
const emailContent = await this.createEmailMessage();
const fileName = `${Date.now()}_success_${this.email.getPrimaryRecipient()}.eml`;
plugins.smartfile.memory.toFsSync(emailContent, plugins.path.join(paths.sentEmailsDir, fileName));
// Save delivery info
const infoFileName = `${Date.now()}_success_${this.email.getPrimaryRecipient()}.json`;
plugins.smartfile.memory.toFsSync(
JSON.stringify(this.deliveryInfo, null, 2),
plugins.path.join(paths.sentEmailsDir, infoFileName)
);
} catch (error) {
console.error('Error saving successful email:', error);
}
}
/**
* Save a failed email for potential retry
*/
private async saveFailed(): Promise<void> {
try {
plugins.smartfile.fs.ensureDirSync(paths.failedEmailsDir);
const emailContent = await this.createEmailMessage();
const fileName = `${Date.now()}_failed_${this.email.getPrimaryRecipient()}.eml`;
plugins.smartfile.memory.toFsSync(emailContent, plugins.path.join(paths.failedEmailsDir, fileName));
// Save delivery info
const infoFileName = `${Date.now()}_failed_${this.email.getPrimaryRecipient()}.json`;
plugins.smartfile.memory.toFsSync(
JSON.stringify(this.deliveryInfo, null, 2),
plugins.path.join(paths.failedEmailsDir, infoFileName)
);
} catch (error) {
console.error('Error saving failed email:', error);
}
}
/**
* Simple delay function
*/
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

View File

@ -829,7 +829,16 @@ export class CommandHandler implements ICommandHandler {
this.sendResponse(socket, '334');
// Wait for credentials
credentials = await this.waitForAuthResponse(socket);
credentials = await new Promise<string>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Auth response timeout'));
}, 30000);
socket.once('data', (data: Buffer) => {
clearTimeout(timeout);
resolve(data.toString().trim());
});
});
}
// Decode PLAIN credentials (base64 encoded: authzid\0authcid\0password)
@ -847,8 +856,7 @@ export class CommandHandler implements ICommandHandler {
// Authenticate using security handler
const authenticated = await this.smtpServer.getSecurityHandler().authenticate({
username,
password,
mechanism: 'PLAIN'
password
});
if (authenticated) {
@ -929,8 +937,7 @@ export class CommandHandler implements ICommandHandler {
// Authenticate using security handler
const authenticated = await this.smtpServer.getSecurityHandler().authenticate({
username,
password,
mechanism: 'LOGIN'
password
});
if (authenticated) {

View File

@ -12,7 +12,9 @@ export interface IEmailConfig {
// Email server settings
ports: number[];
hostname: string;
domains?: string[]; // Domains to handle email for
maxMessageSize?: number;
debug?: boolean;
// TLS configuration for email server
tls?: {
@ -47,6 +49,25 @@ export interface IEmailConfig {
maxRetryDelay?: number;
};
// Outbound email settings
outbound?: {
maxConnections?: number;
connectionTimeout?: number;
socketTimeout?: number;
retryAttempts?: number;
defaultFrom?: string;
};
// DKIM settings
dkim?: {
enabled: boolean;
selector?: string;
keySize?: number;
};
// Rate limiting configuration
rateLimits?: any; // Using any to avoid circular dependency
// Advanced MTA settings
mtaGlobalOptions?: IMtaOptions;
}

View File

@ -28,8 +28,11 @@ import * as stream from 'node:stream';
import { createSmtpServer } from '../delivery/smtpserver/index.js';
import { MultiModeDeliverySystem, type IMultiModeDeliveryOptions } from '../delivery/classes.delivery.system.js';
import { UnifiedDeliveryQueue, type IQueueOptions } from '../delivery/classes.delivery.queue.js';
import { UnifiedRateLimiter, type IHierarchicalRateLimits } from '../delivery/classes.unified.rate.limiter.js';
import { SmtpState } from '../delivery/interfaces.js';
import type { EmailProcessingMode, ISmtpSession as IBaseSmtpSession } from '../delivery/interfaces.js';
import { smtpClientMod } from '../delivery/index.js';
import type { DcRouter } from '../../classes.dcrouter.js';
/**
* Extended SMTP session interface with domain rule information
@ -48,7 +51,9 @@ export interface IUnifiedEmailServerOptions {
// Base server options
ports: number[];
hostname: string;
domains: string[]; // Domains to handle email for
banner?: string;
debug?: boolean;
// Authentication options
auth?: {
@ -84,6 +89,25 @@ export interface IUnifiedEmailServerOptions {
defaultPort?: number;
defaultTls?: boolean;
// Outbound settings
outbound?: {
maxConnections?: number;
connectionTimeout?: number;
socketTimeout?: number;
retryAttempts?: number;
defaultFrom?: string;
};
// DKIM settings
dkim?: {
enabled: boolean;
selector?: string;
keySize?: number;
};
// Rate limiting
rateLimits?: IHierarchicalRateLimits;
// Deliverability options
ipWarmupConfig?: IIPWarmupConfig;
reputationMonitorConfig?: IReputationMonitorConfig;
@ -139,6 +163,7 @@ export interface IServerStats {
* Unified email server that handles all email traffic with pattern-based routing
*/
export class UnifiedEmailServer extends EventEmitter {
private dcRouter: DcRouter;
private options: IUnifiedEmailServerOptions;
private domainRouter: DomainRouter;
private servers: any[] = [];
@ -153,9 +178,12 @@ export class UnifiedEmailServer extends EventEmitter {
private senderReputationMonitor: SenderReputationMonitor;
public deliveryQueue: UnifiedDeliveryQueue;
public deliverySystem: MultiModeDeliverySystem;
private rateLimiter: UnifiedRateLimiter;
private dkimKeys: Map<string, string> = new Map(); // domain -> private key
constructor(options: IUnifiedEmailServerOptions) {
constructor(dcRouter: DcRouter, options: IUnifiedEmailServerOptions) {
super();
this.dcRouter = dcRouter;
// Set default options
this.options = {
@ -208,6 +236,18 @@ export class UnifiedEmailServer extends EventEmitter {
cacheSize: 1000
});
// Initialize rate limiter
this.rateLimiter = new UnifiedRateLimiter(options.rateLimits || {
global: {
maxConnectionsPerIP: 10,
maxMessagesPerMinute: 100,
maxRecipientsPerMessage: 50,
maxErrorsPerIP: 10,
maxAuthFailuresPerIP: 5,
blockDuration: 300000 // 5 minutes
}
});
// Initialize delivery components
const queueOptions: IQueueOptions = {
storageType: 'memory', // Default to memory storage
@ -239,7 +279,7 @@ export class UnifiedEmailServer extends EventEmitter {
}
};
this.deliverySystem = new MultiModeDeliverySystem(this.deliveryQueue, deliveryOptions);
this.deliverySystem = new MultiModeDeliverySystem(this.deliveryQueue, deliveryOptions, this);
// Initialize statistics
this.stats = {
@ -278,6 +318,12 @@ export class UnifiedEmailServer extends EventEmitter {
await this.deliverySystem.start();
logger.log('info', 'Email delivery system started');
// Set up automatic DKIM if DNS server is available
if (this.dcRouter.dnsServer && this.options.dkim?.enabled) {
await this.setupAutomaticDkim();
logger.log('info', 'Automatic DKIM configuration completed');
}
// Ensure we have the necessary TLS options
const hasTlsConfig = this.options.tls?.keyPath && this.options.tls?.certPath;
@ -431,232 +477,8 @@ export class UnifiedEmailServer extends EventEmitter {
}
}
/**
* Handle new SMTP connection with IP reputation checking
*/
private async onConnect(session: IExtendedSmtpSession, callback: (err?: Error) => void): Promise<void> {
logger.log('info', `New connection from ${session.remoteAddress}`);
// Update connection statistics
this.stats.connections.current++;
this.stats.connections.total++;
// Log connection event
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.CONNECTION,
message: 'New SMTP connection established',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
secure: session.secure
}
});
// Perform IP reputation check
try {
const ipReputation = await this.ipReputationChecker.checkReputation(session.remoteAddress);
// Store reputation in session for later use
(session as any).ipReputation = ipReputation;
logger.log('info', `IP reputation check for ${session.remoteAddress}: score=${ipReputation.score}, isSpam=${ipReputation.isSpam}`);
// Reject connection if reputation is too low and rejection is enabled
if (ipReputation.score < 20 && (this.options as any).security?.rejectHighRiskIPs) {
const error = new Error(`Connection rejected: IP ${session.remoteAddress} has poor reputation score (${ipReputation.score})`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.REJECTED_CONNECTION,
message: 'Connection rejected due to poor IP reputation',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
reputationScore: ipReputation.score,
isSpam: ipReputation.isSpam,
isProxy: ipReputation.isProxy,
isTor: ipReputation.isTor,
isVPN: ipReputation.isVPN
},
success: false
});
return callback(error);
}
// For suspicious IPs, add a note but allow connection
if (ipReputation.score < 50) {
logger.log('warn', `Suspicious IP connection allowed: ${session.remoteAddress} (score: ${ipReputation.score})`);
}
} catch (error) {
// Log error but continue with connection
logger.log('error', `Error checking IP reputation for ${session.remoteAddress}: ${error.message}`);
}
// Continue with the connection
callback();
}
/**
* Handle authentication (stub implementation)
*/
private onAuth(auth: IAuthData, session: IExtendedSmtpSession, callback: (err?: Error, user?: any) => void): void {
if (!this.options.auth || !this.options.auth.users || this.options.auth.users.length === 0) {
// No authentication configured, reject
const error = new Error('Authentication not supported');
logger.log('warn', `Authentication attempt when not configured: ${auth.username}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'Authentication attempt when not configured',
ipAddress: session.remoteAddress,
details: {
username: auth.username,
method: auth.method,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Find matching user
const user = this.options.auth.users.find(u => u.username === auth.username && u.password === auth.password);
if (user) {
logger.log('info', `User ${auth.username} authenticated successfully`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.AUTHENTICATION,
message: 'SMTP authentication successful',
ipAddress: session.remoteAddress,
details: {
username: auth.username,
method: auth.method,
sessionId: session.id
},
success: true
});
return callback(null, { username: user.username });
} else {
const error = new Error('Invalid username or password');
logger.log('warn', `Failed authentication for ${auth.username}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'SMTP authentication failed',
ipAddress: session.remoteAddress,
details: {
username: auth.username,
method: auth.method,
sessionId: session.id
},
success: false
});
return callback(error);
}
}
/**
* Handle MAIL FROM command (stub implementation)
*/
private onMailFrom(address: {address: string}, session: IExtendedSmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `MAIL FROM: ${address.address}`);
// Validate the email address
if (!this.isValidEmail(address.address)) {
const error = new Error('Invalid sender address');
logger.log('warn', `Invalid sender address: ${address.address}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.EMAIL_VALIDATION,
message: 'Invalid sender email format',
ipAddress: session.remoteAddress,
details: {
address: address.address,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Authentication check if required
if (this.options.auth?.required && !session.authenticated) {
const error = new Error('Authentication required');
logger.log('warn', `Unauthenticated sender rejected: ${address.address}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'Unauthenticated sender rejected',
ipAddress: session.remoteAddress,
details: {
address: address.address,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Continue processing
callback();
}
/**
* Handle RCPT TO command (stub implementation)
*/
private onRcptTo(address: {address: string}, session: IExtendedSmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `RCPT TO: ${address.address}`);
// Validate the email address
if (!this.isValidEmail(address.address)) {
const error = new Error('Invalid recipient address');
logger.log('warn', `Invalid recipient address: ${address.address}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.EMAIL_VALIDATION,
message: 'Invalid recipient email format',
ipAddress: session.remoteAddress,
details: {
address: address.address,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Pattern match the recipient to determine processing mode
const rule = this.domainRouter.matchRule(address.address);
if (rule) {
// Store the matched rule and processing mode in the session
session.matchedRule = rule;
session.processingMode = rule.mode;
logger.log('info', `Email ${address.address} matched rule: ${rule.pattern}, mode: ${rule.mode}`);
} else {
// Use default mode
session.processingMode = this.options.defaultMode;
logger.log('info', `Email ${address.address} using default mode: ${this.options.defaultMode}`);
}
// Continue processing
callback();
}
/**
* Handle incoming email data (stub implementation)
@ -1145,6 +967,143 @@ export class UnifiedEmailServer extends EventEmitter {
this.stats.connections.current = 0;
}
/**
* Set up automatic DKIM configuration with DNS server
*/
private async setupAutomaticDkim(): Promise<void> {
if (!this.options.domains || this.options.domains.length === 0) {
logger.log('warn', 'No domains configured for DKIM');
return;
}
const selector = this.options.dkim?.selector || 'default';
const keySize = this.options.dkim?.keySize || 2048;
for (const domain of this.options.domains) {
try {
// Check if DKIM keys already exist for this domain
let keyPair: { privateKey: string; publicKey: string };
try {
// Try to read existing keys
keyPair = await this.dkimCreator.readDKIMKeys(domain);
logger.log('info', `Using existing DKIM keys for domain: ${domain}`);
} catch (error) {
// Generate new keys if they don't exist
keyPair = await this.dkimCreator.createDKIMKeys();
// Store them for future use
await this.dkimCreator.createAndStoreDKIMKeys(domain);
logger.log('info', `Generated new DKIM keys for domain: ${domain}`);
}
// Store the private key for signing
this.dkimKeys.set(domain, keyPair.privateKey);
// Extract the public key for DNS
const publicKeyBase64 = keyPair.publicKey
.replace(/-----BEGIN PUBLIC KEY-----/g, '')
.replace(/-----END PUBLIC KEY-----/g, '')
.replace(/\s/g, '');
// Register DNS handler for this domain's DKIM records
this.dcRouter.dnsServer.registerHandler(
`${selector}._domainkey.${domain}`,
['TXT'],
() => ({
name: `${selector}._domainkey.${domain}`,
type: 'TXT',
class: 'IN',
ttl: 300,
data: `v=DKIM1; k=rsa; p=${publicKeyBase64}`
})
);
logger.log('info', `DKIM DNS handler registered for domain: ${domain} with selector: ${selector}`);
} catch (error) {
logger.log('error', `Failed to set up DKIM for domain ${domain}: ${error.message}`);
}
}
}
/**
* Get SMTP client for a specific destination
*/
public getSmtpClient(host: string, port: number): smtpClientMod.SmtpClient {
const key = `${host}:${port}`;
if (!this.smtpClients.has(key)) {
// Create a new client for this destination
const client = smtpClientMod.createSmtpClient({
...this.smtpClientConfig,
host,
port
});
this.smtpClients.set(key, client);
}
return this.smtpClients.get(key)!;
}
/**
* Generate SmartProxy routes for email ports
*/
public generateProxyRoutes(portMapping?: Record<number, number>): any[] {
const routes: any[] = [];
const defaultPortMapping = {
25: 10025,
587: 10587,
465: 10465
};
const actualPortMapping = portMapping || defaultPortMapping;
// Generate routes for each configured port
for (const externalPort of this.options.ports) {
const internalPort = actualPortMapping[externalPort] || externalPort + 10000;
let routeName = 'email-route';
let tlsMode = 'passthrough';
// Configure based on port
switch (externalPort) {
case 25:
routeName = 'smtp-route';
tlsMode = 'passthrough'; // STARTTLS
break;
case 587:
routeName = 'submission-route';
tlsMode = 'passthrough'; // STARTTLS
break;
case 465:
routeName = 'smtps-route';
tlsMode = 'terminate'; // Implicit TLS
break;
default:
routeName = `email-port-${externalPort}-route`;
}
routes.push({
name: routeName,
match: {
ports: [externalPort]
},
action: {
type: 'forward',
target: {
host: 'localhost',
port: internalPort
},
tls: {
mode: tlsMode
}
}
});
}
return routes;
}
/**
* Update server configuration
*/
@ -1720,4 +1679,58 @@ export class UnifiedEmailServer extends EventEmitter {
}): void {
this.senderReputationMonitor.recordSendEvent(domain, event);
}
/**
* Check if DKIM key exists for a domain
* @param domain Domain to check
*/
public hasDkimKey(domain: string): boolean {
return this.dkimKeys.has(domain);
}
/**
* Record successful email delivery
* @param domain Sending domain
*/
public recordDelivery(domain: string): void {
this.recordReputationEvent(domain, {
type: 'delivered',
count: 1
});
}
/**
* Record email bounce
* @param domain Sending domain
* @param receivingDomain Receiving domain that bounced
* @param bounceType Type of bounce (hard/soft)
* @param reason Bounce reason
*/
public recordBounce(domain: string, receivingDomain: string, bounceType: 'hard' | 'soft', reason: string): void {
// Record bounce in bounce manager
const bounceRecord = {
id: `bounce_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`,
recipient: `user@${receivingDomain}`,
sender: `user@${domain}`,
domain: domain,
bounceType: bounceType === 'hard' ? BounceType.INVALID_RECIPIENT : BounceType.TEMPORARY_FAILURE,
bounceCategory: bounceType === 'hard' ? BounceCategory.HARD : BounceCategory.SOFT,
timestamp: Date.now(),
smtpResponse: reason,
diagnosticCode: reason,
statusCode: bounceType === 'hard' ? '550' : '450',
processed: false
};
// Process the bounce
this.bounceManager.processBounce(bounceRecord);
// Record reputation event
this.recordReputationEvent(domain, {
type: 'bounce',
count: 1,
hardBounce: bounceType === 'hard',
receivingDomain
});
}
}

File diff suppressed because it is too large Load Diff

View File

@ -201,9 +201,11 @@ export class EmailService {
this.config.ports.map(p => p + 10000) : // Use internal ports (10025, etc.)
this.config.ports; // Use standard ports (25, etc.)
this.unifiedEmailServer = new UnifiedEmailServer({
// Pass null as dcRouter since this is a standalone service
this.unifiedEmailServer = new UnifiedEmailServer(null as any, {
ports: emailPorts,
hostname: this.config.hostname || 'localhost',
domains: [this.config.hostname || 'localhost'], // Default to hostname
auth: this.config.auth,
tls: this.config.tls,
maxMessageSize: this.config.maxMessageSize,