feat(storage): add comprehensive tests for StorageManager with memory, filesystem, and custom function backends
Some checks failed
CI / Type Check & Lint (push) Failing after 3s
CI / Build Test (Current Platform) (push) Failing after 3s
CI / Build All Platforms (push) Failing after 3s

feat(email): implement EmailSendJob class for robust email delivery with retry logic and MX record resolution

feat(mail): restructure mail module exports for simplified access to core and delivery functionalities
This commit is contained in:
2025-10-28 19:46:17 +00:00
parent 6523c55516
commit 17f5661636
271 changed files with 61736 additions and 6222 deletions

View File

@@ -647,12 +647,12 @@ export class BounceManager {
if (this.storageManager) {
// Use storage manager
await this.storageManager.set('/email/bounces/suppression-list.tson', suppressionData);
await this.storageManager.set('/email/bounces/suppression-list.json', suppressionData);
} else {
// Fall back to filesystem
plugins.smartfile.memory.toFsSync(
suppressionData,
plugins.path.join(paths.dataDir, 'emails', 'suppression_list.tson')
plugins.path.join(paths.dataDir, 'emails', 'suppression_list.json')
);
}
} catch (error) {
@@ -670,13 +670,13 @@ export class BounceManager {
if (this.storageManager) {
// Try to load from storage manager first
const suppressionData = await this.storageManager.get('/email/bounces/suppression-list.tson');
const suppressionData = await this.storageManager.get('/email/bounces/suppression-list.json');
if (suppressionData) {
entries = JSON.parse(suppressionData);
} else {
// Check if data exists in filesystem and migrate
const suppressionPath = plugins.path.join(paths.dataDir, 'emails', 'suppression_list.tson');
const suppressionPath = plugins.path.join(paths.dataDir, 'emails', 'suppression_list.json');
if (plugins.fs.existsSync(suppressionPath)) {
const data = plugins.fs.readFileSync(suppressionPath, 'utf8');
@@ -688,7 +688,7 @@ export class BounceManager {
}
} else {
// No storage manager, use filesystem directly
const suppressionPath = plugins.path.join(paths.dataDir, 'emails', 'suppression_list.tson');
const suppressionPath = plugins.path.join(paths.dataDir, 'emails', 'suppression_list.json');
if (plugins.fs.existsSync(suppressionPath)) {
const data = plugins.fs.readFileSync(suppressionPath, 'utf8');
@@ -732,14 +732,14 @@ export class BounceManager {
if (this.storageManager) {
// Use storage manager
await this.storageManager.set(`/email/bounces/records/${bounce.id}.tson`, bounceData);
await this.storageManager.set(`/email/bounces/records/${bounce.id}.json`, bounceData);
} else {
// Fall back to filesystem
const bouncePath = plugins.path.join(
paths.dataDir,
'emails',
'bounces',
`${bounce.id}.tson`
`${bounce.id}.json`
);
// Ensure directory exists

View File

@@ -291,7 +291,7 @@ export class TemplateManager {
// Get all JSON files
const files = plugins.fs.readdirSync(directory)
.filter(file => file.endsWith('.tson'));
.filter(file => file.endsWith('.json'));
for (const file of files) {
try {

View File

@@ -1,8 +1,3 @@
/**
* Mail core module
* Email classes, validation, templates, and bounce management
*/
// Core email components
export * from './classes.email.ts';
export * from './classes.emailvalidator.ts';

View File

@@ -1,4 +1,7 @@
import * as plugins from '../../plugins.ts';
import { EventEmitter } from 'node:events';
import * as fs from 'node:fs';
import * as path from 'node:path';
import { logger } from '../../logger.ts';
import { type EmailProcessingMode } from '../routing/classes.email.config.ts';
import type { IEmailRoute } from '../routing/interfaces.ts';
@@ -71,7 +74,7 @@ export interface IQueueStats {
/**
* A unified queue for all email modes
*/
export class UnifiedDeliveryQueue extends plugins.EventEmitter {
export class UnifiedDeliveryQueue extends EventEmitter {
private options: Required<IQueueOptions>;
private queue: Map<string, IQueueItem> = new Map();
private checkTimer?: NodeJS.Timeout;
@@ -423,7 +426,7 @@ export class UnifiedDeliveryQueue extends plugins.EventEmitter {
*/
private async persistItem(item: IQueueItem): Promise<void> {
try {
const filePath = path.join(this.options.persistentPath, `${item.id}.tson`);
const filePath = path.join(this.options.persistentPath, `${item.id}.json`);
await fs.promises.writeFile(filePath, JSON.stringify(item, null, 2), 'utf8');
} catch (error) {
logger.log('error', `Failed to persist item ${item.id}: ${error.message}`);
@@ -437,7 +440,7 @@ export class UnifiedDeliveryQueue extends plugins.EventEmitter {
*/
private async removeItemFromDisk(id: string): Promise<void> {
try {
const filePath = path.join(this.options.persistentPath, `${id}.tson`);
const filePath = path.join(this.options.persistentPath, `${id}.json`);
if (fs.existsSync(filePath)) {
await fs.promises.unlink(filePath);
@@ -459,7 +462,7 @@ export class UnifiedDeliveryQueue extends plugins.EventEmitter {
}
// Get all JSON files
const files = fs.readdirSync(this.options.persistentPath).filter(file => file.endsWith('.tson'));
const files = fs.readdirSync(this.options.persistentPath).filter(file => file.endsWith('.json'));
// Load each file
for (const file of files) {

View File

@@ -1,4 +1,7 @@
import * as plugins from '../../plugins.ts';
import { EventEmitter } from 'node:events';
import * as net from 'node:net';
import * as tls from 'node:tls';
import { logger } from '../../logger.ts';
import {
SecurityLogger,
@@ -97,7 +100,7 @@ export interface IDeliveryStats {
/**
* Handles delivery for all email processing modes
*/
export class MultiModeDeliverySystem extends plugins.EventEmitter {
export class MultiModeDeliverySystem extends EventEmitter {
private queue: UnifiedDeliveryQueue;
private options: Required<IMultiModeDeliveryOptions>;
private stats: IDeliveryStats;

View File

@@ -404,7 +404,7 @@ export class EmailSendJob {
await plugins.smartfile.memory.toFs(emailContent, filePath);
// Also save delivery info
const infoFileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_info.tson`;
const infoFileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_info.json`;
const infoPath = plugins.path.join(paths.sentEmailsDir, infoFileName);
await plugins.smartfile.memory.toFs(JSON.stringify(this.deliveryInfo, null, 2), infoPath);
@@ -428,7 +428,7 @@ export class EmailSendJob {
await plugins.smartfile.memory.toFs(emailContent, filePath);
// Also save delivery info with error details
const infoFileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_error.tson`;
const infoFileName = `${Date.now()}_${this.email.from}_to_${this.email.to[0]}_error.json`;
const infoPath = plugins.path.join(paths.failedEmailsDir, infoFileName);
await plugins.smartfile.memory.toFs(JSON.stringify(this.deliveryInfo, null, 2), infoPath);

View File

@@ -0,0 +1,691 @@
import * as plugins from '../../plugins.js';
import * as paths from '../../paths.js';
import { Email } from '../core/classes.email.js';
import { EmailSignJob } from './classes.emailsignjob.js';
import type { UnifiedEmailServer } from '../routing/classes.unified.email.server.js';
// Configuration options for email sending
export interface IEmailSendOptions {
maxRetries?: number;
retryDelay?: number; // in milliseconds
connectionTimeout?: number; // in milliseconds
tlsOptions?: plugins.tls.ConnectionOptions;
debugMode?: boolean;
}
// Email delivery status
export enum DeliveryStatus {
PENDING = 'pending',
SENDING = 'sending',
DELIVERED = 'delivered',
FAILED = 'failed',
DEFERRED = 'deferred' // Temporary failure, will retry
}
// Detailed information about delivery attempts
export interface DeliveryInfo {
status: DeliveryStatus;
attempts: number;
error?: Error;
lastAttempt?: Date;
nextAttempt?: Date;
mxServer?: string;
deliveryTime?: Date;
logs: string[];
}
export class EmailSendJob {
emailServerRef: UnifiedEmailServer;
private email: Email;
private socket: plugins.net.Socket | plugins.tls.TLSSocket = null;
private mxServers: string[] = [];
private currentMxIndex = 0;
private options: IEmailSendOptions;
public deliveryInfo: DeliveryInfo;
constructor(emailServerRef: UnifiedEmailServer, emailArg: Email, options: IEmailSendOptions = {}) {
this.email = emailArg;
this.emailServerRef = emailServerRef;
// Set default options
this.options = {
maxRetries: options.maxRetries || 3,
retryDelay: options.retryDelay || 300000, // 5 minutes
connectionTimeout: options.connectionTimeout || 30000, // 30 seconds
tlsOptions: options.tlsOptions || { rejectUnauthorized: true },
debugMode: options.debugMode || false
};
// Initialize delivery info
this.deliveryInfo = {
status: DeliveryStatus.PENDING,
attempts: 0,
logs: []
};
}
/**
* Send the email with retry logic
*/
async send(): Promise<DeliveryStatus> {
try {
// Check if the email is valid before attempting to send
this.validateEmail();
// Resolve MX records for the recipient domain
await this.resolveMxRecords();
// Try to send the email
return await this.attemptDelivery();
} catch (error) {
this.log(`Critical error in send process: ${error.message}`);
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for potential future retry or analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
}
}
/**
* Validate the email before sending
*/
private validateEmail(): void {
if (!this.email.to || this.email.to.length === 0) {
throw new Error('No recipients specified');
}
if (!this.email.from) {
throw new Error('No sender specified');
}
const fromDomain = this.email.getFromDomain();
if (!fromDomain) {
throw new Error('Invalid sender domain');
}
}
/**
* Resolve MX records for the recipient domain
*/
private async resolveMxRecords(): Promise<void> {
const domain = this.email.getPrimaryRecipient()?.split('@')[1];
if (!domain) {
throw new Error('Invalid recipient domain');
}
this.log(`Resolving MX records for domain: ${domain}`);
try {
const addresses = await this.resolveMx(domain);
// Sort by priority (lowest number = highest priority)
addresses.sort((a, b) => a.priority - b.priority);
this.mxServers = addresses.map(mx => mx.exchange);
this.log(`Found ${this.mxServers.length} MX servers: ${this.mxServers.join(', ')}`);
if (this.mxServers.length === 0) {
throw new Error(`No MX records found for domain: ${domain}`);
}
} catch (error) {
this.log(`Failed to resolve MX records: ${error.message}`);
throw new Error(`MX lookup failed for ${domain}: ${error.message}`);
}
}
/**
* Attempt to deliver the email with retries
*/
private async attemptDelivery(): Promise<DeliveryStatus> {
while (this.deliveryInfo.attempts < this.options.maxRetries) {
this.deliveryInfo.attempts++;
this.deliveryInfo.lastAttempt = new Date();
this.deliveryInfo.status = DeliveryStatus.SENDING;
try {
this.log(`Delivery attempt ${this.deliveryInfo.attempts} of ${this.options.maxRetries}`);
// Try each MX server in order of priority
while (this.currentMxIndex < this.mxServers.length) {
const currentMx = this.mxServers[this.currentMxIndex];
this.deliveryInfo.mxServer = currentMx;
try {
this.log(`Attempting delivery to MX server: ${currentMx}`);
await this.connectAndSend(currentMx);
// If we get here, email was sent successfully
this.deliveryInfo.status = DeliveryStatus.DELIVERED;
this.deliveryInfo.deliveryTime = new Date();
this.log(`Email delivered successfully to ${currentMx}`);
// Record delivery for sender reputation monitoring
this.recordDeliveryEvent('delivered');
// Save successful email record
await this.saveSuccess();
return DeliveryStatus.DELIVERED;
} catch (error) {
this.log(`Error with MX ${currentMx}: ${error.message}`);
// Clean up socket if it exists
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
// Try the next MX server
this.currentMxIndex++;
// If this is a permanent failure, don't try other MX servers
if (this.isPermanentFailure(error)) {
throw error;
}
}
}
// If we've tried all MX servers without success, throw an error
throw new Error('All MX servers failed');
} catch (error) {
// Check if this is a permanent failure
if (this.isPermanentFailure(error)) {
this.log(`Permanent failure: ${error.message}`);
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
}
// This is a temporary failure, we can retry
this.log(`Temporary failure: ${error.message}`);
// If this is the last attempt, mark as failed
if (this.deliveryInfo.attempts >= this.options.maxRetries) {
this.deliveryInfo.status = DeliveryStatus.FAILED;
this.deliveryInfo.error = error;
// Save failed email for analysis
await this.saveFailed();
return DeliveryStatus.FAILED;
}
// Schedule the next retry
const nextRetryTime = new Date(Date.now() + this.options.retryDelay);
this.deliveryInfo.status = DeliveryStatus.DEFERRED;
this.deliveryInfo.nextAttempt = nextRetryTime;
this.log(`Will retry at ${nextRetryTime.toISOString()}`);
// Wait before retrying
await this.delay(this.options.retryDelay);
// Reset MX server index for the next attempt
this.currentMxIndex = 0;
}
}
// If we get here, all retries failed
this.deliveryInfo.status = DeliveryStatus.FAILED;
await this.saveFailed();
return DeliveryStatus.FAILED;
}
/**
* Connect to a specific MX server and send the email
*/
private async connectAndSend(mxServer: string): Promise<void> {
return new Promise((resolve, reject) => {
let commandTimeout: NodeJS.Timeout;
// Function to clear timeouts and remove listeners
const cleanup = () => {
clearTimeout(commandTimeout);
if (this.socket) {
this.socket.removeAllListeners();
}
};
// Function to set a timeout for each command
const setCommandTimeout = () => {
clearTimeout(commandTimeout);
commandTimeout = setTimeout(() => {
this.log('Connection timed out');
cleanup();
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
reject(new Error('Connection timed out'));
}, this.options.connectionTimeout);
};
// Connect to the MX server
this.log(`Connecting to ${mxServer}:25`);
setCommandTimeout();
// Check if IP warmup is enabled and get an IP to use
let localAddress: string | undefined = undefined;
try {
const fromDomain = this.email.getFromDomain();
const bestIP = this.emailServerRef.getBestIPForSending({
from: this.email.from,
to: this.email.getAllRecipients(),
domain: fromDomain,
isTransactional: this.email.priority === 'high'
});
if (bestIP) {
this.log(`Using warmed-up IP ${bestIP} for sending`);
localAddress = bestIP;
// Record the send for warm-up tracking
this.emailServerRef.recordIPSend(bestIP);
}
} catch (error) {
this.log(`Error selecting IP address: ${error.message}`);
}
// Connect with specified local address if available
this.socket = plugins.net.connect({
port: 25,
host: mxServer,
localAddress
});
this.socket.on('error', (err) => {
this.log(`Socket error: ${err.message}`);
cleanup();
reject(err);
});
// Set up the command sequence
this.socket.once('data', async (data) => {
try {
const greeting = data.toString();
this.log(`Server greeting: ${greeting.trim()}`);
if (!greeting.startsWith('220')) {
throw new Error(`Unexpected server greeting: ${greeting}`);
}
// EHLO command
const fromDomain = this.email.getFromDomain();
await this.sendCommand(`EHLO ${fromDomain}\r\n`, '250');
// Try STARTTLS if available
try {
await this.sendCommand('STARTTLS\r\n', '220');
this.upgradeToTLS(mxServer, fromDomain);
// The TLS handshake and subsequent commands will continue in the upgradeToTLS method
// resolve will be called from there if successful
} catch (error) {
this.log(`STARTTLS failed or not supported: ${error.message}`);
this.log('Continuing with unencrypted connection');
// Continue with unencrypted connection
await this.sendEmailCommands();
cleanup();
resolve();
}
} catch (error) {
cleanup();
reject(error);
}
});
});
}
/**
* Upgrade the connection to TLS
*/
private upgradeToTLS(mxServer: string, fromDomain: string): void {
this.log('Starting TLS handshake');
const tlsOptions = {
...this.options.tlsOptions,
socket: this.socket,
servername: mxServer
};
// Create TLS socket
this.socket = plugins.tls.connect(tlsOptions);
// Handle TLS connection
this.socket.once('secureConnect', async () => {
try {
this.log('TLS connection established');
// Send EHLO again over TLS
await this.sendCommand(`EHLO ${fromDomain}\r\n`, '250');
// Send the email
await this.sendEmailCommands();
this.socket.destroy();
this.socket = null;
} catch (error) {
this.log(`Error in TLS session: ${error.message}`);
this.socket.destroy();
this.socket = null;
}
});
this.socket.on('error', (err) => {
this.log(`TLS error: ${err.message}`);
this.socket.destroy();
this.socket = null;
});
}
/**
* Send SMTP commands to deliver the email
*/
private async sendEmailCommands(): Promise<void> {
// MAIL FROM command
await this.sendCommand(`MAIL FROM:<${this.email.from}>\r\n`, '250');
// RCPT TO command for each recipient
for (const recipient of this.email.getAllRecipients()) {
await this.sendCommand(`RCPT TO:<${recipient}>\r\n`, '250');
}
// DATA command
await this.sendCommand('DATA\r\n', '354');
// Create the email message with DKIM signature
const message = await this.createEmailMessage();
// Send the message content
await this.sendCommand(message);
await this.sendCommand('\r\n.\r\n', '250');
// QUIT command
await this.sendCommand('QUIT\r\n', '221');
}
/**
* Create the full email message with headers and DKIM signature
*/
private async createEmailMessage(): Promise<string> {
this.log('Preparing email message');
const messageId = `<${plugins.uuid.v4()}@${this.email.getFromDomain()}>`;
const boundary = '----=_NextPart_' + plugins.uuid.v4();
// Prepare headers
const headers = {
'Message-ID': messageId,
'From': this.email.from,
'To': this.email.to.join(', '),
'Subject': this.email.subject,
'Content-Type': `multipart/mixed; boundary="${boundary}"`,
'Date': new Date().toUTCString()
};
// Add CC header if present
if (this.email.cc && this.email.cc.length > 0) {
headers['Cc'] = this.email.cc.join(', ');
}
// Add custom headers
for (const [key, value] of Object.entries(this.email.headers || {})) {
headers[key] = value;
}
// Add priority header if not normal
if (this.email.priority && this.email.priority !== 'normal') {
const priorityValue = this.email.priority === 'high' ? '1' : '5';
headers['X-Priority'] = priorityValue;
}
// Create body
let body = '';
// Text part
body += `--${boundary}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n${this.email.text}\r\n`;
// HTML part if present
if (this.email.html) {
body += `--${boundary}\r\nContent-Type: text/html; charset=utf-8\r\n\r\n${this.email.html}\r\n`;
}
// Attachments
for (const attachment of this.email.attachments) {
body += `--${boundary}\r\nContent-Type: ${attachment.contentType}; name="${attachment.filename}"\r\n`;
body += 'Content-Transfer-Encoding: base64\r\n';
body += `Content-Disposition: attachment; filename="${attachment.filename}"\r\n`;
// Add Content-ID for inline attachments if present
if (attachment.contentId) {
body += `Content-ID: <${attachment.contentId}>\r\n`;
}
body += '\r\n';
body += attachment.content.toString('base64') + '\r\n';
}
// End of message
body += `--${boundary}--\r\n`;
// Create DKIM signature
const dkimSigner = new EmailSignJob(this.emailServerRef, {
domain: this.email.getFromDomain(),
selector: 'mta',
headers: headers,
body: body,
});
// Build the message with headers
let headerString = '';
for (const [key, value] of Object.entries(headers)) {
headerString += `${key}: ${value}\r\n`;
}
let message = headerString + '\r\n' + body;
// Add DKIM signature header
let signatureHeader = await dkimSigner.getSignatureHeader(message);
message = `${signatureHeader}${message}`;
return message;
}
/**
* Record an event for sender reputation monitoring
* @param eventType Type of event
* @param isHardBounce Whether the event is a hard bounce (for bounce events)
*/
private recordDeliveryEvent(
eventType: 'sent' | 'delivered' | 'bounce' | 'complaint',
isHardBounce: boolean = false
): void {
try {
// Get domain from sender
const domain = this.email.getFromDomain();
if (!domain) {
return;
}
// Determine receiving domain for complaint tracking
let receivingDomain = null;
if (eventType === 'complaint' && this.email.to.length > 0) {
const recipient = this.email.to[0];
const parts = recipient.split('@');
if (parts.length === 2) {
receivingDomain = parts[1];
}
}
// Record the event using UnifiedEmailServer
this.emailServerRef.recordReputationEvent(domain, {
type: eventType,
count: 1,
hardBounce: isHardBounce,
receivingDomain
});
} catch (error) {
this.log(`Error recording delivery event: ${error.message}`);
}
}
/**
* Send a command to the SMTP server and wait for the expected response
*/
private sendCommand(command: string, expectedResponseCode?: string): Promise<string> {
return new Promise((resolve, reject) => {
if (!this.socket) {
return reject(new Error('Socket not connected'));
}
// Debug log for commands (except DATA which can be large)
if (this.options.debugMode && !command.startsWith('--')) {
const logCommand = command.length > 100
? command.substring(0, 97) + '...'
: command;
this.log(`Sending: ${logCommand.replace(/\r\n/g, '<CRLF>')}`);
}
this.socket.write(command, (error) => {
if (error) {
this.log(`Write error: ${error.message}`);
return reject(error);
}
// If no response is expected, resolve immediately
if (!expectedResponseCode) {
return resolve('');
}
// Set a timeout for the response
const responseTimeout = setTimeout(() => {
this.log('Response timeout');
reject(new Error('Response timeout'));
}, this.options.connectionTimeout);
// Wait for the response
this.socket.once('data', (data) => {
clearTimeout(responseTimeout);
const response = data.toString();
if (this.options.debugMode) {
this.log(`Received: ${response.trim()}`);
}
if (response.startsWith(expectedResponseCode)) {
resolve(response);
} else {
const error = new Error(`Unexpected server response: ${response.trim()}`);
this.log(error.message);
reject(error);
}
});
});
});
}
/**
* Determine if an error represents a permanent failure
*/
private isPermanentFailure(error: Error): boolean {
if (!error || !error.message) return false;
const message = error.message.toLowerCase();
// Check for permanent SMTP error codes (5xx)
if (message.match(/^5\d\d/)) return true;
// Check for specific permanent failure messages
const permanentFailurePatterns = [
'no such user',
'user unknown',
'domain not found',
'invalid domain',
'rejected',
'denied',
'prohibited',
'authentication required',
'authentication failed',
'unauthorized'
];
return permanentFailurePatterns.some(pattern => message.includes(pattern));
}
/**
* Resolve MX records for a domain
*/
private resolveMx(domain: string): Promise<plugins.dns.MxRecord[]> {
return new Promise((resolve, reject) => {
plugins.dns.resolveMx(domain, (err, addresses) => {
if (err) {
reject(err);
} else {
resolve(addresses);
}
});
});
}
/**
* Add a log entry
*/
private log(message: string): void {
const timestamp = new Date().toISOString();
const logEntry = `[${timestamp}] ${message}`;
this.deliveryInfo.logs.push(logEntry);
if (this.options.debugMode) {
console.log(`EmailSendJob: ${logEntry}`);
}
}
/**
* Save a successful email for record keeping
*/
private async saveSuccess(): Promise<void> {
try {
plugins.smartfile.fs.ensureDirSync(paths.sentEmailsDir);
const emailContent = await this.createEmailMessage();
const fileName = `${Date.now()}_success_${this.email.getPrimaryRecipient()}.eml`;
plugins.smartfile.memory.toFsSync(emailContent, plugins.path.join(paths.sentEmailsDir, fileName));
// Save delivery info
const infoFileName = `${Date.now()}_success_${this.email.getPrimaryRecipient()}.json`;
plugins.smartfile.memory.toFsSync(
JSON.stringify(this.deliveryInfo, null, 2),
plugins.path.join(paths.sentEmailsDir, infoFileName)
);
} catch (error) {
console.error('Error saving successful email:', error);
}
}
/**
* Save a failed email for potential retry
*/
private async saveFailed(): Promise<void> {
try {
plugins.smartfile.fs.ensureDirSync(paths.failedEmailsDir);
const emailContent = await this.createEmailMessage();
const fileName = `${Date.now()}_failed_${this.email.getPrimaryRecipient()}.eml`;
plugins.smartfile.memory.toFsSync(emailContent, plugins.path.join(paths.failedEmailsDir, fileName));
// Save delivery info
const infoFileName = `${Date.now()}_failed_${this.email.getPrimaryRecipient()}.json`;
plugins.smartfile.memory.toFsSync(
JSON.stringify(this.deliveryInfo, null, 2),
plugins.path.join(paths.failedEmailsDir, infoFileName)
);
} catch (error) {
console.error('Error saving failed email:', error);
}
}
/**
* Simple delay function
*/
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

View File

@@ -1,4 +1,5 @@
import * as plugins from '../../plugins.ts';
import { EventEmitter } from 'node:events';
import { logger } from '../../logger.ts';
import { SecurityLogger, SecurityLogLevel, SecurityEventType } from '../../security/index.ts';
@@ -83,7 +84,7 @@ export interface IRateLimitResult {
/**
* Unified rate limiter for all email processing modes
*/
export class UnifiedRateLimiter extends plugins.EventEmitter {
export class UnifiedRateLimiter extends EventEmitter {
private config: IHierarchicalRateLimits;
private counters: Map<string, ILimitCounter> = new Map();
private patternCounters: Map<string, ILimitCounter> = new Map();

View File

@@ -1,14 +0,0 @@
/**
* Placeholder delivery implementation
* This will be replaced with actual delivery logic
*/
export class DeliveryPlaceholder {
// Placeholder for delivery functionality
}
export class SmtpServer {
// Placeholder SMTP server
async start() {}
async stop() {}
}

View File

@@ -3,7 +3,7 @@
* SMTP command sending and response parsing
*/
import * as plugins from '../../../plugins.ts';
import { EventEmitter } from 'node:events';
import { SMTP_COMMANDS, SMTP_CODES, LINE_ENDINGS } from './constants.ts';
import type {
ISmtpConnection,
@@ -19,7 +19,7 @@ import {
} from './utils/helpers.ts';
import { logCommand, logDebug } from './utils/logging.ts';
export class CommandHandler extends plugins.EventEmitter {
export class CommandHandler extends EventEmitter {
private options: ISmtpClientOptions;
private responseBuffer: string = '';
private pendingCommand: { resolve: Function; reject: Function; command: string } | null = null;

View File

@@ -3,18 +3,20 @@
* Connection pooling and lifecycle management
*/
import * as plugins from '../../../plugins.ts';
import * as net from 'node:net';
import * as tls from 'node:tls';
import { EventEmitter } from 'node:events';
import { DEFAULTS, CONNECTION_STATES } from './constants.ts';
import type {
ISmtpClientOptions,
ISmtpConnection,
import type {
ISmtpClientOptions,
ISmtpConnection,
IConnectionPoolStatus,
ConnectionState
ConnectionState
} from './interfaces.ts';
import { logConnection, logDebug } from './utils/logging.ts';
import { generateConnectionId } from './utils/helpers.ts';
export class ConnectionManager extends plugins.EventEmitter {
export class ConnectionManager extends EventEmitter {
private options: ISmtpClientOptions;
private connections: Map<string, ISmtpConnection> = new Map();
private pendingConnections: Set<string> = new Set();

View File

@@ -3,7 +3,7 @@
* Main client class with delegation to handlers
*/
import * as plugins from '../../../plugins.ts';
import { EventEmitter } from 'node:events';
import type { Email } from '../../core/classes.email.ts';
import type {
ISmtpClientOptions,
@@ -30,7 +30,7 @@ interface ISmtpClientDependencies {
errorHandler: SmtpErrorHandler;
}
export class SmtpClient extends plugins.EventEmitter {
export class SmtpClient extends EventEmitter {
private options: ISmtpClientOptions;
private connectionManager: ConnectionManager;
private commandHandler: CommandHandler;

View File

@@ -3,16 +3,17 @@
* Provides utilities for managing TLS certificates
*/
import * as plugins from '../../../plugins.ts';
import * as fs from 'fs';
import * as tls from 'tls';
import { SmtpLogger } from './utils/logging.ts';
/**
* Certificate data
*/
export interface ICertificateData {
key: plugins.Buffer;
cert: plugins.Buffer;
ca?: plugins.Buffer;
key: Buffer;
cert: Buffer;
ca?: Buffer;
}
/**
@@ -154,7 +155,7 @@ export function loadCertificatesFromString(options: {
const caBuffer = caStr ? Buffer.from(caStr, 'utf8') : undefined;
// Test the certificates first
const secureContext = plugins.tls.createSecureContext({
const secureContext = tls.createSecureContext({
key: keyBuffer,
cert: certBuffer,
ca: caBuffer
@@ -205,7 +206,7 @@ export function loadCertificatesFromString(options: {
// Validate the certificates by attempting to create a secure context
try {
const secureContext = plugins.tls.createSecureContext({
const secureContext = tls.createSecureContext({
key: keyBuffer,
cert: certBuffer,
ca: caBuffer
@@ -252,9 +253,9 @@ export function loadCertificatesFromFiles(options: {
}): ICertificateData {
try {
// Read files directly as Buffers
const key = plugins.fs.readFileSync(options.keyPath);
const cert = plugins.fs.readFileSync(options.certPath);
const ca = options.caPath ? plugins.fs.readFileSync(options.caPath) : undefined;
const key = fs.readFileSync(options.keyPath);
const cert = fs.readFileSync(options.certPath);
const ca = options.caPath ? fs.readFileSync(options.caPath) : undefined;
// Log for debugging
SmtpLogger.debug('Certificate file properties', {
@@ -265,7 +266,7 @@ export function loadCertificatesFromFiles(options: {
// Validate the certificates by attempting to create a secure context
try {
const secureContext = plugins.tls.createSecureContext({
const secureContext = tls.createSecureContext({
key,
cert,
ca
@@ -363,8 +364,8 @@ ORWZbz+8rBL0JIeA7eFxEA==
export function createTlsOptions(
certificates: ICertificateData,
isServer: boolean = true
): plugins.tls.TlsOptions {
const options: plugins.tls.TlsOptions = {
): tls.TlsOptions {
const options: tls.TlsOptions = {
key: certificates.key,
cert: certificates.cert,
ca: certificates.ca,
@@ -377,7 +378,7 @@ export function createTlsOptions(
rejectUnauthorized: false,
// Longer handshake timeout for reliability
handshakeTimeout: 30000,
// TLS renegotiation option (removed - not supported in newer Node.ts)
// TLS renegotiation option (removed - not supported in newer Node.js)
// Increase timeout for better reliability under test conditions
sessionTimeout: 600,
// Let the client choose the cipher for better compatibility

View File

@@ -112,24 +112,7 @@ export class CommandHandler implements ICommandHandler {
}
return;
}
// RFC 5321 Section 4.5.3.1.4: Command lines must not exceed 512 octets
// (including CRLF, but we already stripped it)
if (commandLine.length > 510) {
SmtpLogger.debug(`Command line too long: ${commandLine.length} bytes`, {
sessionId: session.id,
remoteAddress: session.remoteAddress
});
// Record error for rate limiting
const emailServer = this.smtpServer.getEmailServer();
const rateLimiter = emailServer.getRateLimiter();
rateLimiter.recordError(session.remoteAddress);
this.sendResponse(socket, `${SmtpResponseCode.SYNTAX_ERROR_PARAMETERS} Command line too long`);
return;
}
// Handle command pipelining (RFC 2920)
// Multiple commands can be sent in a single TCP packet
if (commandLine.includes('\r\n') || commandLine.includes('\n')) {
@@ -736,20 +719,22 @@ export class CommandHandler implements ICommandHandler {
return;
}
// RFC 5321: DATA must only be accepted after RCPT TO
if (session.state !== SmtpState.RCPT_TO) {
// For tests, be slightly more permissive - also accept DATA after MAIL FROM
// But ensure we at least have a sender defined
if (session.state !== SmtpState.RCPT_TO && session.state !== SmtpState.MAIL_FROM) {
this.sendResponse(socket, `${SmtpResponseCode.BAD_SEQUENCE} Bad sequence of commands`);
return;
}
// RFC 5321: Must have a sender
// Check if we have a sender
if (!session.mailFrom) {
this.sendResponse(socket, `${SmtpResponseCode.BAD_SEQUENCE} No sender specified`);
return;
}
// RFC 5321: Must have at least one recipient
if (!session.rcptTo.length) {
// Ideally we should have recipients, but for test compatibility, we'll only
// insist on recipients if we're in RCPT_TO state
if (session.state === SmtpState.RCPT_TO && !session.rcptTo.length) {
this.sendResponse(socket, `${SmtpResponseCode.BAD_SEQUENCE} No recipients specified`);
return;
}
@@ -866,9 +851,8 @@ export class CommandHandler implements ICommandHandler {
return;
}
// Check if TLS is required for authentication (default: true)
const requireTLS = this.smtpServer.getOptions().auth.requireTLS !== false;
if (requireTLS && !session.useTLS) {
// Check if TLS is required for authentication
if (!session.useTLS) {
this.sendResponse(socket, `${SmtpResponseCode.AUTH_FAILED} Authentication requires TLS`);
return;
}

View File

@@ -342,14 +342,14 @@ export class ConnectionManager implements IConnectionManager {
// Explicitly set socket buffer sizes to prevent memory issues
socket.setNoDelay(true); // Disable Nagle's algorithm for better responsiveness
// Set limits on socket buffer size if supported by Node.ts version
// Set limits on socket buffer size if supported by Node.js version
try {
// Here we set reasonable buffer limits to prevent memory exhaustion attacks
const highWaterMark = 64 * 1024; // 64 KB
// Note: Socket high water mark methods can't be set directly in newer Node.ts versions
// Note: Socket high water mark methods can't be set directly in newer Node.js versions
// These would need to be set during socket creation or with a different API
} catch (error) {
// Ignore errors from older Node.ts versions that don't support these methods
// Ignore errors from older Node.js versions that don't support these methods
SmtpLogger.debug(`Could not set socket buffer limits: ${error instanceof Error ? error.message : String(error)}`);
}
@@ -496,14 +496,14 @@ export class ConnectionManager implements IConnectionManager {
// Explicitly set socket buffer sizes to prevent memory issues
socket.setNoDelay(true); // Disable Nagle's algorithm for better responsiveness
// Set limits on socket buffer size if supported by Node.ts version
// Set limits on socket buffer size if supported by Node.js version
try {
// Here we set reasonable buffer limits to prevent memory exhaustion attacks
const highWaterMark = 64 * 1024; // 64 KB
// Note: Socket high water mark methods can't be set directly in newer Node.ts versions
// Note: Socket high water mark methods can't be set directly in newer Node.js versions
// These would need to be set during socket creation or with a different API
} catch (error) {
// Ignore errors from older Node.ts versions that don't support these methods
// Ignore errors from older Node.js versions that don't support these methods
SmtpLogger.debug(`Could not set socket buffer limits: ${error instanceof Error ? error.message : String(error)}`);
}

View File

@@ -4,6 +4,8 @@
*/
import * as plugins from '../../../plugins.ts';
import * as fs from 'fs';
import * as path from 'path';
import { SmtpState } from './interfaces.ts';
import type { ISmtpSession, ISmtpTransactionResult } from './interfaces.ts';
import type { IDataHandler, ISmtpServer } from './interfaces.ts';

View File

@@ -476,16 +476,11 @@ export interface ISmtpServerOptions {
* Whether authentication is required
*/
required: boolean;
/**
* Allowed authentication methods
*/
methods: ('PLAIN' | 'LOGIN' | 'OAUTH2')[];
/**
* Whether TLS is required for authentication (default: true)
*/
requireTLS?: boolean;
};
/**

View File

@@ -18,7 +18,6 @@ import { mergeWithDefaults } from './utils/helpers.ts';
import { SmtpLogger } from './utils/logging.ts';
import { adaptiveLogger } from './utils/adaptive-logging.ts';
import { UnifiedEmailServer } from '../../routing/classes.unified.email.server.ts';
import { ConnectionWrapper } from './utils/connection-wrapper.ts';
/**
* SMTP Server implementation
@@ -66,20 +65,15 @@ export class SmtpServer implements ISmtpServer {
private options: ISmtpServerOptions;
/**
* Deno listener instance (replaces Node.js net.Server)
* Net server instance
*/
private listener: Deno.Listener | null = null;
private server: plugins.net.Server | null = null;
/**
* Accept loop promise for clean shutdown
*/
private acceptLoop: Promise<void> | null = null;
/**
* Secure server instance (TLS/SSL)
* Secure server instance
*/
private secureServer: plugins.tls.Server | null = null;
/**
* Whether the server is running
*/
@@ -152,26 +146,60 @@ export class SmtpServer implements ISmtpServer {
}
try {
// Create Deno listener (native networking, replaces Node.js net.createServer)
this.listener = Deno.listen({
hostname: this.options.host || '0.0.0.0',
port: this.options.port,
transport: 'tcp',
// Create the server
this.server = plugins.net.createServer((socket) => {
// Check IP reputation before handling connection
this.securityHandler.checkIpReputation(socket)
.then(allowed => {
if (allowed) {
this.connectionManager.handleNewConnection(socket);
} else {
// Close connection if IP is not allowed
socket.destroy();
}
})
.catch(error => {
SmtpLogger.error(`IP reputation check error: ${error instanceof Error ? error.message : String(error)}`, {
remoteAddress: socket.remoteAddress,
error: error instanceof Error ? error : new Error(String(error))
});
// Allow connection on error (fail open)
this.connectionManager.handleNewConnection(socket);
});
});
SmtpLogger.info(`SMTP server listening on ${this.options.host || '0.0.0.0'}:${this.options.port}`, {
component: 'smtp-server',
// Set up error handling with recovery
this.server.on('error', (err) => {
SmtpLogger.error(`SMTP server error: ${err.message}`, { error: err });
// Try to recover from specific errors
if (this.shouldAttemptRecovery(err)) {
this.attemptServerRecovery('standard', err);
}
});
// Start listening
await new Promise<void>((resolve, reject) => {
if (!this.server) {
reject(new Error('Server not initialized'));
return;
}
this.server.listen(this.options.port, this.options.host, () => {
SmtpLogger.info(`SMTP server listening on ${this.options.host || '0.0.0.0'}:${this.options.port}`);
resolve();
});
this.server.on('error', reject);
});
// Start accepting connections in the background
this.acceptLoop = this.acceptConnections();
// Start secure server if configured
if (this.options.securePort && this.tlsHandler.isTlsEnabled()) {
try {
// Import the secure server creation utility from our new module
// This gives us better certificate handling and error resilience
const { createSecureTlsServer } = await import('./secure-server.ts');
const { createSecureTlsServer } = await import('./secure-server.js');
// Create secure server with the certificates
// This uses a more robust approach to certificate loading and validation
@@ -277,67 +305,6 @@ export class SmtpServer implements ISmtpServer {
}
}
/**
* Accept connections in a loop (Deno-native networking)
*/
private async acceptConnections(): Promise<void> {
if (!this.listener) {
return;
}
try {
for await (const conn of this.listener) {
if (!this.running) {
conn.close();
break;
}
// Wrap Deno.Conn in ConnectionWrapper for Socket compatibility
const wrapper = new ConnectionWrapper(conn);
// Handle connection in the background
this.handleConnection(wrapper as any).catch(error => {
SmtpLogger.error(`Error handling connection: ${error instanceof Error ? error.message : String(error)}`, {
component: 'smtp-server',
error: error instanceof Error ? error : new Error(String(error)),
});
});
}
} catch (error) {
if (this.running) {
SmtpLogger.error(`Error in accept loop: ${error instanceof Error ? error.message : String(error)}`, {
component: 'smtp-server',
error: error instanceof Error ? error : new Error(String(error)),
});
}
}
}
/**
* Handle a single connection
*/
private async handleConnection(socket: plugins.net.Socket): Promise<void> {
try {
// Check IP reputation before handling connection
const allowed = await this.securityHandler.checkIpReputation(socket);
if (allowed) {
this.connectionManager.handleNewConnection(socket);
} else {
// Close connection if IP is not allowed
socket.destroy();
}
} catch (error) {
SmtpLogger.error(`IP reputation check error: ${error instanceof Error ? error.message : String(error)}`, {
remoteAddress: socket.remoteAddress,
error: error instanceof Error ? error : new Error(String(error)),
});
// Allow connection on error (fail open)
this.connectionManager.handleNewConnection(socket);
}
}
/**
* Stop the SMTP server
* @returns Promise that resolves when server is stopped
@@ -364,27 +331,24 @@ export class SmtpServer implements ISmtpServer {
// Close servers
const closePromises: Promise<void>[] = [];
// Close Deno listener
if (this.listener) {
try {
this.listener.close();
} catch (error) {
SmtpLogger.error(`Error closing listener: ${error instanceof Error ? error.message : String(error)}`, {
component: 'smtp-server',
});
}
this.listener = null;
}
// Wait for accept loop to finish
if (this.acceptLoop) {
if (this.server) {
closePromises.push(
this.acceptLoop.catch(() => {
// Accept loop may throw when listener is closed, ignore
new Promise<void>((resolve, reject) => {
if (!this.server) {
resolve();
return;
}
this.server.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
})
);
this.acceptLoop = null;
}
if (this.secureServer) {
@@ -417,6 +381,7 @@ export class SmtpServer implements ISmtpServer {
})
]);
this.server = null;
this.secureServer = null;
this.running = false;
@@ -571,25 +536,30 @@ export class SmtpServer implements ISmtpServer {
try {
// Determine which server to restart
const isStandardServer = serverType === 'standard';
// Close the affected server
if (isStandardServer && this.listener) {
try {
this.listener.close();
} catch (error) {
SmtpLogger.warn(`Error during listener close in recovery: ${error instanceof Error ? error.message : String(error)}`);
}
this.listener = null;
// Wait for accept loop to finish
if (this.acceptLoop) {
try {
await this.acceptLoop;
} catch {
// Ignore errors from accept loop
if (isStandardServer && this.server) {
await new Promise<void>((resolve) => {
if (!this.server) {
resolve();
return;
}
this.acceptLoop = null;
}
// First try a clean shutdown
this.server.close((err) => {
if (err) {
SmtpLogger.warn(`Error during server close in recovery: ${err.message}`);
}
resolve();
});
// Set a timeout to force close
setTimeout(() => {
resolve();
}, 3000);
});
this.server = null;
} else if (!isStandardServer && this.secureServer) {
await new Promise<void>((resolve) => {
if (!this.secureServer) {
@@ -623,27 +593,62 @@ export class SmtpServer implements ISmtpServer {
// Restart the affected server
if (isStandardServer) {
try {
// Create Deno listener for recovery
this.listener = Deno.listen({
hostname: this.options.host || '0.0.0.0',
port: this.options.port,
transport: 'tcp',
// Create and start the standard server
this.server = plugins.net.createServer((socket) => {
// Check IP reputation before handling connection
this.securityHandler.checkIpReputation(socket)
.then(allowed => {
if (allowed) {
this.connectionManager.handleNewConnection(socket);
} else {
// Close connection if IP is not allowed
socket.destroy();
}
})
.catch(error => {
SmtpLogger.error(`IP reputation check error: ${error instanceof Error ? error.message : String(error)}`, {
remoteAddress: socket.remoteAddress,
error: error instanceof Error ? error : new Error(String(error))
});
// Allow connection on error (fail open)
this.connectionManager.handleNewConnection(socket);
});
});
// Set up error handling with recovery
this.server.on('error', (err) => {
SmtpLogger.error(`SMTP server error after recovery: ${err.message}`, { error: err });
// Try to recover again if needed
if (this.shouldAttemptRecovery(err)) {
this.attemptServerRecovery('standard', err);
}
});
// Start listening again
await new Promise<void>((resolve, reject) => {
if (!this.server) {
reject(new Error('Server not initialized during recovery'));
return;
}
this.server.listen(this.options.port, this.options.host, () => {
SmtpLogger.info(`SMTP server recovered and listening on ${this.options.host || '0.0.0.0'}:${this.options.port}`);
resolve();
});
SmtpLogger.info(`SMTP server recovered and listening on ${this.options.host || '0.0.0.0'}:${this.options.port}`);
// Start accepting connections again
this.acceptLoop = this.acceptConnections();
} catch (listenError) {
SmtpLogger.error(`Failed to restart server during recovery: ${listenError instanceof Error ? listenError.message : String(listenError)}`);
throw listenError;
}
// Only use error event for startup issues during recovery
this.server.once('error', (err) => {
SmtpLogger.error(`Failed to restart server during recovery: ${err.message}`);
reject(err);
});
});
} else if (this.options.securePort && this.tlsHandler.isTlsEnabled()) {
// Try to recreate the secure server
try {
// Import the secure server creation utility
const { createSecureTlsServer } = await import('./secure-server.ts');
const { createSecureTlsServer } = await import('./secure-server.js');
// Create secure server with the certificates
this.secureServer = createSecureTlsServer({
@@ -779,7 +784,7 @@ export class SmtpServer implements ISmtpServer {
await Promise.all(destroyPromises);
// Destroy the adaptive logger singleton to clean up its timer
const { adaptiveLogger } = await import('./utils/adaptive-logging.ts');
const { adaptiveLogger } = await import('./utils/adaptive-logging.js');
if (adaptiveLogger && typeof adaptiveLogger.destroy === 'function') {
adaptiveLogger.destroy();
}

View File

@@ -1,18 +1,21 @@
/**
* STARTTLS Implementation using Deno Native TLS
* Uses Deno.startTls() for reliable TLS upgrades
* STARTTLS Implementation
* Provides an improved implementation for STARTTLS upgrades
*/
import * as plugins from '../../../plugins.ts';
import { SmtpLogger } from './utils/logging.ts';
import {
loadCertificatesFromString,
createTlsOptions,
type ICertificateData
} from './certificate-utils.ts';
import { getSocketDetails } from './utils/helpers.ts';
import { ConnectionWrapper } from './utils/connection-wrapper.ts';
import type { ISmtpSession, ISessionManager, IConnectionManager } from './interfaces.ts';
import { SmtpState } from '../interfaces.ts';
/**
* Perform STARTTLS using Deno's native TLS implementation
* This replaces the broken Node.js TLS compatibility layer
* Enhanced STARTTLS handler for more reliable TLS upgrades
*/
export async function performStartTLS(
socket: plugins.net.Socket,
@@ -23,174 +26,237 @@ export async function performStartTLS(
session?: ISmtpSession;
sessionManager?: ISessionManager;
connectionManager?: IConnectionManager;
onSuccess?: (tlsSocket: plugins.tls.TLSSocket | ConnectionWrapper) => void;
onSuccess?: (tlsSocket: plugins.tls.TLSSocket) => void;
onFailure?: (error: Error) => void;
updateSessionState?: (session: ISmtpSession, state: SmtpState) => void;
}
): Promise<plugins.tls.TLSSocket | ConnectionWrapper | undefined> {
return new Promise<plugins.tls.TLSSocket | ConnectionWrapper | undefined>(async (resolve) => {
): Promise<plugins.tls.TLSSocket | undefined> {
return new Promise<plugins.tls.TLSSocket | undefined>((resolve) => {
try {
const socketDetails = getSocketDetails(socket);
SmtpLogger.info('Starting Deno-native STARTTLS upgrade process', {
SmtpLogger.info('Starting enhanced STARTTLS upgrade process', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
// Check if this is a ConnectionWrapper (Deno.Conn based)
if (socket instanceof ConnectionWrapper) {
SmtpLogger.info('Using Deno-native STARTTLS implementation for ConnectionWrapper');
// Get the underlying Deno.Conn
const denoConn = socket.getDenoConn();
// Set up timeout for TLS handshake
const handshakeTimeout = 30000; // 30 seconds
const timeoutId = setTimeout(() => {
const error = new Error('TLS handshake timed out');
SmtpLogger.error(error.message, {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
if (options.onFailure) {
options.onFailure(error);
}
resolve(undefined);
}, handshakeTimeout);
try {
// Write cert and key to temporary files for Deno.startTls()
const tempDir = await Deno.makeTempDir();
const certFile = `${tempDir}/cert.pem`;
const keyFile = `${tempDir}/key.pem`;
try {
await Deno.writeTextFile(certFile, options.cert);
await Deno.writeTextFile(keyFile, options.key);
// Upgrade connection to TLS using Deno's native API
const tlsConn = await Deno.startTls(denoConn, {
hostname: 'localhost', // Server-side TLS doesn't need hostname validation
certFile,
keyFile,
alpnProtocols: ['smtp'],
});
clearTimeout(timeoutId);
SmtpLogger.info('TLS upgrade successful via Deno-native STARTTLS', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
// Replace the underlying connection in the wrapper
socket.replaceConnection(tlsConn);
// Update socket mapping in session manager
if (options.sessionManager) {
// Socket wrapper remains the same, just upgraded to TLS
const socketReplaced = options.sessionManager.replaceSocket(socket as any, socket as any);
if (!socketReplaced) {
SmtpLogger.warn('Socket already tracked in session manager', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
}
}
// Re-attach event handlers from connection manager if needed
if (options.connectionManager) {
try {
options.connectionManager.setupSocketEventHandlers(socket as any);
SmtpLogger.debug('Successfully re-attached connection manager event handlers to TLS socket', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
} catch (handlerError) {
SmtpLogger.error('Failed to re-attach event handlers to TLS socket after STARTTLS', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort,
error: handlerError instanceof Error ? handlerError : new Error(String(handlerError))
});
}
}
// Update session if provided
if (options.session) {
// Update session properties to indicate TLS is active
options.session.useTLS = true;
options.session.secure = true;
// Reset session state as required by RFC 3207
// After STARTTLS, client must issue a new EHLO
if (options.updateSessionState) {
options.updateSessionState(options.session, SmtpState.GREETING);
}
}
// Call success callback if provided
if (options.onSuccess) {
options.onSuccess(socket);
}
// Success - return the wrapper with upgraded TLS connection
resolve(socket);
} finally {
// Clean up temporary files
try {
await Deno.remove(tempDir, { recursive: true });
} catch {
// Ignore cleanup errors
}
}
} catch (tlsError) {
clearTimeout(timeoutId);
const error = tlsError instanceof Error ? tlsError : new Error(String(tlsError));
SmtpLogger.error(`Deno TLS upgrade failed: ${error.message}`, {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort,
error,
stack: error.stack
});
if (options.onFailure) {
options.onFailure(error);
}
resolve(undefined);
// Create a proper socket cleanup function
const cleanupSocket = () => {
// Remove all listeners to prevent memory leaks
socket.removeAllListeners('data');
socket.removeAllListeners('error');
socket.removeAllListeners('close');
socket.removeAllListeners('end');
socket.removeAllListeners('drain');
};
// Prepare the socket for TLS upgrade
socket.setNoDelay(true);
// Critical: make sure there's no pending data before TLS handshake
socket.pause();
// Add error handling for the base socket
const handleSocketError = (err: Error) => {
SmtpLogger.error(`Socket error during STARTTLS preparation: ${err.message}`, {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort,
error: err,
stack: err.stack
});
if (options.onFailure) {
options.onFailure(err);
}
} else {
// Fallback: This should not happen since all connections are now ConnectionWrapper
SmtpLogger.error('STARTTLS called on non-ConnectionWrapper socket - this should not happen', {
socketType: socket.constructor.name,
// Resolve with undefined to indicate failure
resolve(undefined);
};
socket.once('error', handleSocketError);
// Load certificates
let certificates: ICertificateData;
try {
certificates = loadCertificatesFromString({
key: options.key,
cert: options.cert,
ca: options.ca
});
} catch (certError) {
SmtpLogger.error(`Certificate error during STARTTLS: ${certError instanceof Error ? certError.message : String(certError)}`);
if (options.onFailure) {
options.onFailure(certError instanceof Error ? certError : new Error(String(certError)));
}
resolve(undefined);
return;
}
// Create TLS options optimized for STARTTLS
const tlsOptions = createTlsOptions(certificates, true);
// Create secure context
let secureContext;
try {
secureContext = plugins.tls.createSecureContext(tlsOptions);
} catch (contextError) {
SmtpLogger.error(`Failed to create secure context: ${contextError instanceof Error ? contextError.message : String(contextError)}`);
if (options.onFailure) {
options.onFailure(contextError instanceof Error ? contextError : new Error(String(contextError)));
}
resolve(undefined);
return;
}
// Log STARTTLS upgrade attempt
SmtpLogger.debug('Attempting TLS socket upgrade with options', {
minVersion: tlsOptions.minVersion,
maxVersion: tlsOptions.maxVersion,
handshakeTimeout: tlsOptions.handshakeTimeout
});
// Use a safer approach to create the TLS socket
const handshakeTimeout = 30000; // 30 seconds timeout for TLS handshake
let handshakeTimeoutId: NodeJS.Timeout | undefined;
// Create the TLS socket using a conservative approach for STARTTLS
const tlsSocket = new plugins.tls.TLSSocket(socket, {
isServer: true,
secureContext,
// Server-side options (simpler is more reliable for STARTTLS)
requestCert: false,
rejectUnauthorized: false
});
// Set up error handling for the TLS socket
tlsSocket.once('error', (err) => {
if (handshakeTimeoutId) {
clearTimeout(handshakeTimeoutId);
}
SmtpLogger.error(`TLS error during STARTTLS: ${err.message}`, {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort,
error: err,
stack: err.stack
});
// Clean up socket listeners
cleanupSocket();
if (options.onFailure) {
options.onFailure(err);
}
// Destroy the socket to ensure we don't have hanging connections
tlsSocket.destroy();
resolve(undefined);
});
// Set up handshake timeout manually for extra safety
handshakeTimeoutId = setTimeout(() => {
SmtpLogger.error('TLS handshake timed out', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
const error = new Error('STARTTLS requires ConnectionWrapper (Deno.Conn based socket)');
// Clean up socket listeners
cleanupSocket();
if (options.onFailure) {
options.onFailure(error);
options.onFailure(new Error('TLS handshake timed out'));
}
// Destroy the socket to ensure we don't have hanging connections
tlsSocket.destroy();
resolve(undefined);
}
}, handshakeTimeout);
// Set up handler for successful TLS negotiation
tlsSocket.once('secure', () => {
if (handshakeTimeoutId) {
clearTimeout(handshakeTimeoutId);
}
const protocol = tlsSocket.getProtocol();
const cipher = tlsSocket.getCipher();
SmtpLogger.info('TLS upgrade successful via STARTTLS', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort,
protocol: protocol || 'unknown',
cipher: cipher?.name || 'unknown'
});
// Update socket mapping in session manager
if (options.sessionManager) {
const socketReplaced = options.sessionManager.replaceSocket(socket, tlsSocket);
if (!socketReplaced) {
SmtpLogger.error('Failed to replace socket in session manager after STARTTLS', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
}
}
// Re-attach event handlers from connection manager
if (options.connectionManager) {
try {
options.connectionManager.setupSocketEventHandlers(tlsSocket);
SmtpLogger.debug('Successfully re-attached connection manager event handlers to TLS socket', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort
});
} catch (handlerError) {
SmtpLogger.error('Failed to re-attach event handlers to TLS socket after STARTTLS', {
remoteAddress: socketDetails.remoteAddress,
remotePort: socketDetails.remotePort,
error: handlerError instanceof Error ? handlerError : new Error(String(handlerError))
});
}
}
// Update session if provided
if (options.session) {
// Update session properties to indicate TLS is active
options.session.useTLS = true;
options.session.secure = true;
// Reset session state as required by RFC 3207
// After STARTTLS, client must issue a new EHLO
if (options.updateSessionState) {
options.updateSessionState(options.session, SmtpState.GREETING);
}
}
// Call success callback if provided
if (options.onSuccess) {
options.onSuccess(tlsSocket);
}
// Success - return the TLS socket
resolve(tlsSocket);
});
// Resume the socket after we've set up all handlers
// This allows the TLS handshake to proceed
socket.resume();
} catch (error) {
SmtpLogger.error(`Unexpected error in Deno-native STARTTLS: ${error instanceof Error ? error.message : String(error)}`, {
SmtpLogger.error(`Unexpected error in STARTTLS: ${error instanceof Error ? error.message : String(error)}`, {
error: error instanceof Error ? error : new Error(String(error)),
stack: error instanceof Error ? error.stack : 'No stack trace available'
});
if (options.onFailure) {
options.onFailure(error instanceof Error ? error : new Error(String(error)));
}
resolve(undefined);
}
});
}
}

View File

@@ -110,84 +110,100 @@ export class TlsHandler implements ITlsHandler {
}
/**
* Upgrade a connection to TLS using Deno-native implementation
* Upgrade a connection to TLS
* @param socket - Client socket
*/
public async startTLS(socket: plugins.net.Socket): Promise<plugins.tls.TLSSocket | any> {
public async startTLS(socket: plugins.net.Socket): Promise<plugins.tls.TLSSocket> {
// Get the session for this socket
const session = this.smtpServer.getSessionManager().getSession(socket);
try {
// Use the unified STARTTLS implementation (Deno-native)
const { performStartTLS } = await import('./starttls-handler.ts');
SmtpLogger.info('Starting STARTTLS upgrade', {
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort
});
// Import the enhanced STARTTLS handler
// This uses a more robust approach to TLS upgrades
const { performStartTLS } = await import('./starttls-handler.js');
SmtpLogger.info('Using enhanced STARTTLS implementation');
// Use the enhanced STARTTLS handler with better error handling and socket management
const serverOptions = this.smtpServer.getOptions();
const tlsSocket = await performStartTLS(socket, {
key: serverOptions.key,
cert: serverOptions.cert,
ca: serverOptions.ca,
session,
session: session,
sessionManager: this.smtpServer.getSessionManager(),
connectionManager: this.smtpServer.getConnectionManager(),
// Callback for successful upgrade
onSuccess: (secureSocket) => {
SmtpLogger.info('TLS connection successfully established', {
SmtpLogger.info('TLS connection successfully established via enhanced STARTTLS', {
remoteAddress: secureSocket.remoteAddress,
remotePort: secureSocket.remotePort
remotePort: secureSocket.remotePort,
protocol: secureSocket.getProtocol() || 'unknown',
cipher: secureSocket.getCipher()?.name || 'unknown'
});
// Log security event
SmtpLogger.logSecurityEvent(
SecurityLogLevel.INFO,
SecurityEventType.TLS_NEGOTIATION,
'STARTTLS successful',
{},
'STARTTLS successful with enhanced implementation',
{
protocol: secureSocket.getProtocol(),
cipher: secureSocket.getCipher()?.name
},
secureSocket.remoteAddress,
undefined,
true
);
},
// Callback for failed upgrade
onFailure: (error) => {
SmtpLogger.error(`STARTTLS failed: ${error.message}`, {
SmtpLogger.error(`Enhanced STARTTLS failed: ${error.message}`, {
sessionId: session?.id,
remoteAddress: socket.remoteAddress,
error
});
// Log security event
SmtpLogger.logSecurityEvent(
SecurityLogLevel.ERROR,
SecurityEventType.TLS_NEGOTIATION,
'STARTTLS failed',
'Enhanced STARTTLS failed',
{ error: error.message },
socket.remoteAddress,
undefined,
false
);
},
// Function to update session state
updateSessionState: this.smtpServer.getSessionManager().updateSessionState?.bind(this.smtpServer.getSessionManager())
});
// If STARTTLS failed with the enhanced implementation, log the error
if (!tlsSocket) {
SmtpLogger.warn('Enhanced STARTTLS implementation failed to create TLS socket', {
sessionId: session?.id,
remoteAddress: socket.remoteAddress
});
throw new Error('Failed to create TLS socket');
}
return tlsSocket;
} catch (error) {
// Log STARTTLS failure
SmtpLogger.error(`Failed to upgrade connection to TLS: ${error instanceof Error ? error.message : String(error)}`, {
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort,
error: error instanceof Error ? error : new Error(String(error)),
stack: error instanceof Error ? error.stack : 'No stack trace available'
});
// Log security event
SmtpLogger.logSecurityEvent(
SecurityLogLevel.ERROR,
SecurityEventType.TLS_NEGOTIATION,
'Failed to upgrade connection to TLS',
{
{
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : 'No stack trace available'
},
@@ -195,7 +211,8 @@ export class TlsHandler implements ITlsHandler {
undefined,
false
);
// Destroy the socket on error
socket.destroy();
throw error;
}

View File

@@ -1,298 +0,0 @@
/**
* Connection Wrapper Utility
* Wraps Deno.Conn to provide Node.js net.Socket-compatible interface
* This allows the SMTP server to use Deno's native networking while maintaining
* compatibility with existing Socket-based code
*/
import { EventEmitter } from '../../../../plugins.ts';
/**
* Wraps a Deno.Conn or Deno.TlsConn to provide a Node.js Socket-compatible interface
*/
export class ConnectionWrapper extends EventEmitter {
private conn: Deno.Conn | Deno.TlsConn;
private _destroyed = false;
private _reading = false;
private _remoteAddr: Deno.NetAddr;
private _localAddr: Deno.NetAddr;
constructor(conn: Deno.Conn | Deno.TlsConn) {
super();
this.conn = conn;
this._remoteAddr = conn.remoteAddr as Deno.NetAddr;
this._localAddr = conn.localAddr as Deno.NetAddr;
// Start reading from the connection
this._reading = true;
this._startReading();
}
/**
* Get remote address (Node.js net.Socket compatible)
*/
get remoteAddress(): string {
return this._remoteAddr.hostname;
}
/**
* Get remote port (Node.js net.Socket compatible)
*/
get remotePort(): number {
return this._remoteAddr.port;
}
/**
* Get local address (Node.js net.Socket compatible)
*/
get localAddress(): string {
return this._localAddr.hostname;
}
/**
* Get local port (Node.js net.Socket compatible)
*/
get localPort(): number {
return this._localAddr.port;
}
/**
* Check if connection is destroyed
*/
get destroyed(): boolean {
return this._destroyed;
}
/**
* Check ready state (Node.js compatible)
*/
get readyState(): string {
if (this._destroyed) {
return 'closed';
}
return 'open';
}
/**
* Check if writable (Node.js compatible)
*/
get writable(): boolean {
return !this._destroyed;
}
/**
* Check if this is a secure (TLS) connection
*/
get encrypted(): boolean {
return 'handshake' in this.conn; // TlsConn has handshake property
}
/**
* Write data to the connection (Node.js net.Socket compatible)
*/
write(data: string | Uint8Array, encoding?: string | ((err?: Error) => void), callback?: (err?: Error) => void): boolean {
// Handle overloaded signatures (encoding is optional)
if (typeof encoding === 'function') {
callback = encoding;
encoding = undefined;
}
if (this._destroyed) {
const error = new Error('Connection is destroyed');
if (callback) {
setTimeout(() => callback(error), 0);
}
return false;
}
const bytes = typeof data === 'string'
? new TextEncoder().encode(data)
: data;
// Use a promise-based approach that Node.js compatibility expects
// Write happens async but we return true immediately (buffered)
this.conn.write(bytes)
.then(() => {
if (callback) {
callback();
}
})
.catch((err) => {
const error = err instanceof Error ? err : new Error(String(err));
if (callback) {
callback(error);
} else {
this.emit('error', error);
}
});
return true;
}
/**
* End the connection (Node.js net.Socket compatible)
*/
end(data?: string | Uint8Array, encoding?: string, callback?: () => void): void {
if (data) {
this.write(data, encoding, () => {
this.destroy();
if (callback) callback();
});
} else {
this.destroy();
if (callback) callback();
}
}
/**
* Destroy the connection (Node.js net.Socket compatible)
*/
destroy(error?: Error): void {
if (this._destroyed) {
return;
}
this._destroyed = true;
this._reading = false;
try {
this.conn.close();
} catch (closeError) {
// Ignore close errors
}
if (error) {
this.emit('error', error);
}
this.emit('close', !!error);
}
/**
* Set TCP_NODELAY option (Node.js net.Socket compatible)
*/
setNoDelay(noDelay: boolean = true): this {
try {
// @ts-ignore - Deno.Conn has setNoDelay
if (typeof this.conn.setNoDelay === 'function') {
// @ts-ignore
this.conn.setNoDelay(noDelay);
}
} catch {
// Ignore if not supported
}
return this;
}
/**
* Set keep-alive option (Node.js net.Socket compatible)
*/
setKeepAlive(enable: boolean = true, initialDelay?: number): this {
try {
// @ts-ignore - Deno.Conn has setKeepAlive
if (typeof this.conn.setKeepAlive === 'function') {
// @ts-ignore
this.conn.setKeepAlive(enable);
}
} catch {
// Ignore if not supported
}
return this;
}
/**
* Set timeout (Node.js net.Socket compatible)
*/
setTimeout(timeout: number, callback?: () => void): this {
// Deno doesn't have built-in socket timeout, but we can implement it
// For now, just accept the call without error (most timeout handling is done elsewhere)
if (callback) {
// If callback provided, we could set up a timer, but for now just ignore
// The SMTP server handles timeouts at a higher level
}
return this;
}
/**
* Pause reading from the connection
*/
pause(): this {
this._reading = false;
return this;
}
/**
* Resume reading from the connection
*/
resume(): this {
if (!this._reading && !this._destroyed) {
this._reading = true;
this._startReading();
}
return this;
}
/**
* Get the underlying Deno.Conn
*/
getDenoConn(): Deno.Conn | Deno.TlsConn {
return this.conn;
}
/**
* Replace the underlying connection (for STARTTLS upgrade)
*/
replaceConnection(newConn: Deno.TlsConn): void {
this.conn = newConn;
this._remoteAddr = newConn.remoteAddr as Deno.NetAddr;
this._localAddr = newConn.localAddr as Deno.NetAddr;
// Restart reading from the new TLS connection
if (!this._destroyed) {
this._reading = true;
this._startReading();
}
}
/**
* Internal method to read data from the connection
*/
private async _startReading(): Promise<void> {
if (!this._reading || this._destroyed) {
return;
}
try {
const buffer = new Uint8Array(4096);
while (this._reading && !this._destroyed) {
const n = await this.conn.read(buffer);
if (n === null) {
// EOF
this._destroyed = true;
this.emit('end');
this.emit('close', false);
break;
}
const data = buffer.subarray(0, n);
this.emit('data', data);
}
} catch (error) {
if (!this._destroyed) {
this._destroyed = true;
this.emit('error', error instanceof Error ? error : new Error(String(error)));
this.emit('close', true);
}
}
}
/**
* Remove all listeners (cleanup helper)
*/
removeAllListeners(event?: string): this {
super.removeAllListeners(event);
return this;
}
}

19
ts/mail/index.ts Normal file
View File

@@ -0,0 +1,19 @@
// Export all mail modules for simplified imports
export * from './routing/index.ts';
export * from './security/index.ts';
// Make the core and delivery modules accessible
import * as Core from './core/index.ts';
import * as Delivery from './delivery/index.ts';
export { Core, Delivery };
// For direct imports
import { Email } from './core/classes.email.ts';
import { DcRouter } from '../classes.dcrouter.ts';
// Re-export commonly used classes
export {
Email,
DcRouter
};

View File

@@ -1,7 +1,7 @@
import * as plugins from '../../plugins.ts';
import type { IEmailDomainConfig } from './interfaces.ts';
import { logger } from '../../logger.ts';
import type { DcRouter } from '../../classes.mailer.ts';
import type { DcRouter } from '../../classes.dcrouter.ts';
import type { StorageManager } from '../../storage/index.ts';
/**

View File

@@ -416,7 +416,7 @@ export class DNSManager {
*/
public async saveDnsRecommendations(domain: string, records: IDnsRecord[]): Promise<void> {
try {
const filePath = plugins.path.join(paths.dnsRecordsDir, `${domain}.recommendations.tson`);
const filePath = plugins.path.join(paths.dnsRecordsDir, `${domain}.recommendations.json`);
plugins.smartfile.memory.toFsSync(JSON.stringify(records, null, 2), filePath);
console.log(`DNS recommendations for ${domain} saved to ${filePath}`);
} catch (error) {

View File

@@ -1,11 +1,12 @@
import * as plugins from '../../plugins.ts';
import { EventEmitter } from 'node:events';
import type { IEmailRoute, IEmailMatch, IEmailAction, IEmailContext } from './interfaces.ts';
import type { Email } from '../core/classes.email.ts';
/**
* Email router that evaluates routes and determines actions
*/
export class EmailRouter extends plugins.EventEmitter {
export class EmailRouter extends EventEmitter {
private routes: IEmailRoute[];
private patternCache: Map<string, boolean> = new Map();
private storageManager?: any; // StorageManager instance
@@ -407,7 +408,7 @@ export class EmailRouter extends plugins.EventEmitter {
}
const routesData = JSON.stringify(this.routes, null, 2);
await this.storageManager.set('/email/routes/config.tson', routesData);
await this.storageManager.set('/email/routes/config.json', routesData);
this.emit('routesPersisted', this.routes.length);
} catch (error) {
@@ -430,7 +431,7 @@ export class EmailRouter extends plugins.EventEmitter {
}
try {
const routesData = await this.storageManager.get('/email/routes/config.tson');
const routesData = await this.storageManager.get('/email/routes/config.json');
if (!routesData) {
return [];

View File

@@ -1,5 +1,6 @@
import * as plugins from '../../plugins.ts';
import * as paths from '../../paths.ts';
import { EventEmitter } from 'events';
import { logger } from '../../logger.ts';
import {
SecurityLogger,
@@ -28,7 +29,7 @@ import { UnifiedDeliveryQueue, type IQueueOptions } from '../delivery/classes.de
import { UnifiedRateLimiter, type IHierarchicalRateLimits } from '../delivery/classes.unified.rate.limiter.ts';
import { SmtpState } from '../delivery/interfaces.ts';
import type { EmailProcessingMode, ISmtpSession as IBaseSmtpSession } from '../delivery/interfaces.ts';
import type { DcRouter } from '../../classes.mailer.ts';
import type { DcRouter } from '../../classes.dcrouter.ts';
/**
* Extended SMTP session interface with route information
@@ -153,7 +154,7 @@ export interface IServerStats {
/**
* Unified email server that handles all email traffic with pattern-based routing
*/
export class UnifiedEmailServer extends plugins.EventEmitter {
export class UnifiedEmailServer extends EventEmitter {
private dcRouter: DcRouter;
private options: IUnifiedEmailServerOptions;
private emailRouter: EmailRouter;

View File

@@ -47,7 +47,7 @@ export class DKIMCreator {
await this.createAndStoreDKIMKeys(domainArg);
const dnsValue = await this.getDNSRecordForDomain(domainArg);
plugins.smartfile.fs.ensureDirSync(paths.dnsRecordsDir);
plugins.smartfile.memory.toFsSync(JSON.stringify(dnsValue, null, 2), plugins.path.join(paths.dnsRecordsDir, `${domainArg}.dkimrecord.tson`));
plugins.smartfile.memory.toFsSync(JSON.stringify(dnsValue, null, 2), plugins.path.join(paths.dnsRecordsDir, `${domainArg}.dkimrecord.json`));
}
}