This commit is contained in:
2025-05-27 15:06:44 +00:00
parent 073c8378c7
commit cfea44742a
5 changed files with 232 additions and 1942 deletions

View File

@ -22,16 +22,15 @@ import type {
} from './classes.email.config.js';
import { Email } from '../core/classes.email.js';
import { BounceManager, BounceType, BounceCategory } from '../core/classes.bouncemanager.js';
import * as net from 'node:net';
import * as tls from 'node:tls';
import * as stream from 'node:stream';
import { createSmtpServer } from '../delivery/smtpserver/index.js';
import { createPooledSmtpClient } from '../delivery/smtpclient/create-client.js';
import type { SmtpClient } from '../delivery/smtpclient/smtp-client.js';
import { MultiModeDeliverySystem, type IMultiModeDeliveryOptions } from '../delivery/classes.delivery.system.js';
import { UnifiedDeliveryQueue, type IQueueOptions } from '../delivery/classes.delivery.queue.js';
import { UnifiedRateLimiter, type IHierarchicalRateLimits } from '../delivery/classes.unified.rate.limiter.js';
import { SmtpState } from '../delivery/interfaces.js';
import type { EmailProcessingMode, ISmtpSession as IBaseSmtpSession } from '../delivery/interfaces.js';
import { smtpClientMod } from '../delivery/index.js';
import type { DcRouter } from '../../classes.dcrouter.js';
/**
@ -180,6 +179,7 @@ export class UnifiedEmailServer extends EventEmitter {
public deliverySystem: MultiModeDeliverySystem;
private rateLimiter: UnifiedRateLimiter;
private dkimKeys: Map<string, string> = new Map(); // domain -> private key
private smtpClients: Map<string, SmtpClient> = new Map(); // host:port -> client
constructor(dcRouter: DcRouter, options: IUnifiedEmailServerOptions) {
super();
@ -303,6 +303,37 @@ export class UnifiedEmailServer extends EventEmitter {
// We'll create the SMTP servers during the start() method
}
/**
* Get or create an SMTP client for the given host and port
* Uses connection pooling for efficiency
*/
public getSmtpClient(host: string, port: number = 25): SmtpClient {
const clientKey = `${host}:${port}`;
// Check if we already have a client for this destination
let client = this.smtpClients.get(clientKey);
if (!client) {
// Create a new pooled SMTP client
client = createPooledSmtpClient({
host,
port,
secure: port === 465,
connectionTimeout: this.options.outbound?.connectionTimeout || 30000,
socketTimeout: this.options.outbound?.socketTimeout || 120000,
maxConnections: this.options.outbound?.maxConnections || 10,
maxMessages: 1000, // Messages per connection before reconnect
pool: true,
debug: false
});
this.smtpClients.set(clientKey, client);
logger.log('info', `Created new SMTP client pool for ${clientKey}`);
}
return client;
}
/**
* Start the unified email server
*/
@ -469,6 +500,17 @@ export class UnifiedEmailServer extends EventEmitter {
logger.log('info', 'Email delivery queue shut down');
}
// Close all SMTP client connections
for (const [clientKey, client] of this.smtpClients) {
try {
await client.close();
logger.log('info', `Closed SMTP client pool for ${clientKey}`);
} catch (error) {
logger.log('warn', `Error closing SMTP client for ${clientKey}: ${error.message}`);
}
}
this.smtpClients.clear();
logger.log('info', 'UnifiedEmailServer stopped successfully');
this.emit('stopped');
} catch (error) {
@ -480,89 +522,6 @@ export class UnifiedEmailServer extends EventEmitter {
/**
* Handle incoming email data (stub implementation)
*/
private onData(stream: stream.Readable, session: IExtendedSmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `Processing email data for session ${session.id}`);
const startTime = Date.now();
const chunks: Buffer[] = [];
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
stream.on('end', async () => {
try {
const data = Buffer.concat(chunks);
const mode = session.processingMode || this.options.defaultMode;
// Determine processing mode based on matched rule
const processedEmail = await this.processEmailByMode(data, session, mode);
// Update statistics
this.stats.messages.processed++;
this.stats.messages.delivered++;
// Calculate processing time
const processingTime = Date.now() - startTime;
this.processingTimes.push(processingTime);
this.updateProcessingTimeStats();
// Emit event for delivery queue
this.emit('emailProcessed', processedEmail, mode, session.matchedRule);
logger.log('info', `Email processed successfully in ${processingTime}ms, mode: ${mode}`);
callback();
} catch (error) {
logger.log('error', `Error processing email: ${error.message}`);
// Update statistics
this.stats.messages.processed++;
this.stats.messages.failed++;
// Calculate processing time for failed attempts too
const processingTime = Date.now() - startTime;
this.processingTimes.push(processingTime);
this.updateProcessingTimeStats();
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email processing failed',
ipAddress: session.remoteAddress,
details: {
error: error.message,
sessionId: session.id,
mode: session.processingMode,
processingTime
},
success: false
});
callback(error);
}
});
stream.on('error', (err) => {
logger.log('error', `Stream error: ${err.message}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email stream error',
ipAddress: session.remoteAddress,
details: {
error: err.message,
sessionId: session.id
},
success: false
});
callback(err);
});
}
/**
* Update processing time statistics
@ -636,8 +595,7 @@ export class UnifiedEmailServer extends EventEmitter {
// Process based on mode
switch (mode) {
case 'forward':
await this.handleForwardMode(email, session);
break;
throw new Error('Forward mode is not implemented');
case 'mta':
await this.handleMtaMode(email, session);
@ -655,99 +613,6 @@ export class UnifiedEmailServer extends EventEmitter {
return email;
}
/**
* Handle email in forward mode (SMTP proxy)
*/
private async handleForwardMode(email: Email, session: IExtendedSmtpSession): Promise<void> {
logger.log('info', `Handling email in forward mode for session ${session.id}`);
// Get target server information
const rule = session.matchedRule;
const targetServer = rule?.target?.server || this.options.defaultServer;
const targetPort = rule?.target?.port || this.options.defaultPort || 25;
const useTls = rule?.target?.useTls ?? this.options.defaultTls ?? false;
if (!targetServer) {
throw new Error('No target server configured for forward mode');
}
logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`);
try {
// Create a simple SMTP client connection to the target server
const client = new net.Socket();
await new Promise<void>((resolve, reject) => {
// Connect to the target server
client.connect({
host: targetServer,
port: targetPort
});
client.on('data', (data) => {
const response = data.toString().trim();
logger.log('debug', `SMTP response: ${response}`);
// Handle SMTP response codes
if (response.startsWith('2')) {
// Success response
resolve();
} else if (response.startsWith('5')) {
// Permanent error
reject(new Error(`SMTP error: ${response}`));
}
});
client.on('error', (err) => {
logger.log('error', `SMTP client error: ${err.message}`);
reject(err);
});
// SMTP client commands would go here in a full implementation
// For now, just finish the connection
client.end();
resolve();
});
logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.EMAIL_FORWARDING,
message: 'Email forwarded',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
targetServer,
targetPort,
useTls,
ruleName: rule?.pattern || 'default',
subject: email.subject
},
success: true
});
} catch (error) {
logger.log('error', `Failed to forward email: ${error.message}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_FORWARDING,
message: 'Email forwarding failed',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
targetServer,
targetPort,
useTls,
ruleName: rule?.pattern || 'default',
error: error.message
},
success: false
});
throw error;
}
}
/**
* Handle email in MTA mode (programmatic processing)
@ -948,24 +813,7 @@ export class UnifiedEmailServer extends EventEmitter {
return filename.substring(filename.lastIndexOf('.')).toLowerCase();
}
/**
* Handle server errors
*/
private onError(err: Error): void {
logger.log('error', `Server error: ${err.message}`);
this.emit('error', err);
}
/**
* Handle server close
*/
private onClose(): void {
logger.log('info', 'Server closed');
this.emit('close');
// Update statistics
this.stats.connections.current = 0;
}
/**
* Set up automatic DKIM configuration with DNS server
@ -1025,25 +873,6 @@ export class UnifiedEmailServer extends EventEmitter {
}
}
/**
* Get SMTP client for a specific destination
*/
public getSmtpClient(host: string, port: number): smtpClientMod.SmtpClient {
const key = `${host}:${port}`;
if (!this.smtpClients.has(key)) {
// Create a new client for this destination
const client = smtpClientMod.createSmtpClient({
...this.smtpClientConfig,
host,
port
});
this.smtpClients.set(key, client);
}
return this.smtpClients.get(key)!;
}
/**
* Generate SmartProxy routes for email ports