Compare commits

...

4 Commits

19 changed files with 4938 additions and 2159 deletions

View File

@ -1,5 +1,31 @@
# Changelog # Changelog
## 2025-05-08 - 2.8.0 - feat(docs)
Update documentation to include consolidated email handling and patternbased routing details
- Extended MTA section to describe the new unified email processing system with forward, MTA, and process modes
- Updated system diagram to reflect DcRouter integration with UnifiedEmailServer, DeliveryQueue, DeliverySystem, and RateLimiter
- Revised readme.plan.md checklists to mark completed features in core architecture, multimodal processing, unified queue, and DcRouter integration
## 2025-05-08 - 2.7.0 - feat(dcrouter)
Implement unified email configuration with patternbased routing and consolidated email processing. Migrate SMTP forwarding and storeandforward into a single, configuration-driven system that supports glob pattern matching in domain rules.
- Introduced IEmailConfig interface to consolidate MTA, forwarding, and processing settings.
- Added pattern-based domain routing with glob patterns (e.g., '*@example.com', '*@*.example.net').
- Reworked DcRouter integration to expose unified email handling and updated readme.plan.md and changelog.md accordingly.
- Removed deprecated SMTP forwarding components in favor of the consolidated approach.
## 2025-05-08 - 2.7.0 - feat(dcrouter)
Implement consolidated email configuration with pattern-based routing
- Added new pattern-based email routing with glob patterns (e.g., `*@task.vc`, `*@*.example.net`)
- Consolidated all email functionality (MTA, forwarding, processing) under a unified `emailConfig` interface
- Implemented domain router with pattern specificity calculation for most accurate matching
- Removed deprecated components (SMTP forwarding, Store-and-Forward) in favor of the unified approach
- Updated DcRouter tests to use the new consolidated email configuration pattern
- Enhanced inline documentation with detailed interface definitions and configuration examples
- Updated implementation plan with comprehensive component designs for the unified email system
## 2025-05-07 - 2.6.0 - feat(dcrouter) ## 2025-05-07 - 2.6.0 - feat(dcrouter)
Implement integrated DcRouter with comprehensive SmartProxy configuration, enhanced SMTP processing, and robust storeandforward email routing Implement integrated DcRouter with comprehensive SmartProxy configuration, enhanced SMTP processing, and robust storeandforward email routing

View File

@ -1,7 +1,7 @@
{ {
"name": "@serve.zone/platformservice", "name": "@serve.zone/platformservice",
"private": true, "private": true,
"version": "2.6.0", "version": "2.8.0",
"description": "A multifaceted platform service handling mail, SMS, letter delivery, and AI services.", "description": "A multifaceted platform service handling mail, SMS, letter delivery, and AI services.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts", "typings": "dist_ts/index.d.ts",

145
readme.md
View File

@ -103,38 +103,153 @@ async function sendLetter() {
sendLetter(); sendLetter();
``` ```
### Mail Transfer Agent (MTA) ### Mail Transfer Agent (MTA) and Consolidated Email Handling
The platform includes a robust Mail Transfer Agent (MTA) for enterprise-grade email handling with complete control over the email delivery process: The platform includes a robust Mail Transfer Agent (MTA) for enterprise-grade email handling with complete control over the email delivery process.
Additionally, the platform now features a consolidated email configuration system with pattern-based routing:
```mermaid ```mermaid
graph TD graph TD
API[API Clients] --> ApiManager API[API Clients] --> ApiManager
SMTP[External SMTP Servers] <--> SMTPServer SMTP[External SMTP Servers] <--> UnifiedEmailServer
subgraph "DcRouter Email System"
DcRouter[DcRouter] --> UnifiedEmailServer[Unified Email Server]
DcRouter --> DomainRouter[Domain Router]
UnifiedEmailServer --> MultiModeProcessor[Multi-Mode Processor]
MultiModeProcessor --> ForwardMode[Forward Mode]
MultiModeProcessor --> MtaMode[MTA Mode]
MultiModeProcessor --> ProcessMode[Process Mode]
ApiManager[API Manager] --> DcRouter
end
subgraph "MTA Service" subgraph "MTA Service"
MtaService[MTA Service] --> SMTPServer[SMTP Server] MtaMode --> MtaService[MTA Service]
MtaService --> EmailSendJob[Email Send Job] MtaService --> EmailSendJob[Email Send Job]
MtaService --> DnsManager[DNS Manager] MtaService --> DnsManager[DNS Manager]
MtaService --> DkimCreator[DKIM Creator] MtaService --> DkimCreator[DKIM Creator]
ApiManager[API Manager] --> MtaService
end end
subgraph "External Services" subgraph "External Services"
DnsManager <--> DNS[DNS Servers] DnsManager <--> DNS[DNS Servers]
EmailSendJob <--> MXServers[MX Servers] EmailSendJob <--> MXServers[MX Servers]
ForwardMode <--> ExternalSMTP[External SMTP Servers]
end end
``` ```
The MTA service provides: #### Key Features
- Complete SMTP server for receiving emails
- DKIM signing and verification
- SPF and DMARC support
- DNS record management
- Retry logic with queue processing
- TLS encryption
Here's how to use the MTA service: The email handling system provides:
- **Pattern-based Routing**: Route emails based on glob patterns like `*@domain.com` or `*@*.domain.com`
- **Multi-Modal Processing**: Handle different email domains with different processing modes:
- **Forward Mode**: SMTP forwarding to other servers
- **MTA Mode**: Full Mail Transfer Agent capabilities
- **Process Mode**: Store-and-forward with content scanning
- **Unified Configuration**: Single configuration interface for all email handling
- **Shared Infrastructure**: Use same ports (25, 587, 465) for all email handling
- **Complete SMTP Server**: Receive emails with TLS and authentication support
- **DKIM, SPF, DMARC**: Full email authentication standard support
- **Content Scanning**: Check for spam, viruses, and other threats
- **Advanced Delivery Management**: Queue, retry, and track delivery status
#### Using the Consolidated Email System
Here's how to use the consolidated email system:
```ts
import { DcRouter, IEmailConfig, EmailProcessingMode } from '@serve.zone/platformservice';
async function setupEmailHandling() {
// Configure the email handling system
const dcRouter = new DcRouter({
emailConfig: {
ports: [25, 587, 465],
hostname: 'mail.example.com',
// TLS configuration
tls: {
certPath: '/path/to/cert.pem',
keyPath: '/path/to/key.pem'
},
// Default handling for unmatched domains
defaultMode: 'forward' as EmailProcessingMode,
defaultServer: 'fallback.mail.example.com',
defaultPort: 25,
// Pattern-based routing rules
domainRules: [
{
// Forward all company.com emails to internal mail server
pattern: '*@company.com',
mode: 'forward' as EmailProcessingMode,
target: {
server: 'internal-mail.company.local',
port: 25,
useTls: true
}
},
{
// Process notifications.company.com with MTA
pattern: '*@notifications.company.com',
mode: 'mta' as EmailProcessingMode,
mtaOptions: {
domain: 'notifications.company.com',
dkimSign: true,
dkimOptions: {
domainName: 'notifications.company.com',
keySelector: 'mail',
privateKey: '...'
}
}
},
{
// Scan marketing emails for content and transform
pattern: '*@marketing.company.com',
mode: 'process' as EmailProcessingMode,
contentScanning: true,
scanners: [
{
type: 'spam',
threshold: 5.0,
action: 'tag'
}
],
transformations: [
{
type: 'addHeader',
header: 'X-Marketing',
value: 'true'
}
]
}
]
}
});
// Start the system
await dcRouter.start();
console.log('DcRouter with email handling started');
// Later, you can update rules dynamically
await dcRouter.updateDomainRules([
{
pattern: '*@newdomain.com',
mode: 'forward' as EmailProcessingMode,
target: {
server: 'mail.newdomain.com',
port: 25
}
}
]);
}
setupEmailHandling();
```
#### Using the MTA Service Directly
You can still use the MTA service directly for more granular control:
```ts ```ts
import { MtaService, Email } from '@serve.zone/platformservice'; import { MtaService, Email } from '@serve.zone/platformservice';
@ -170,7 +285,9 @@ async function useMtaService() {
useMtaService(); useMtaService();
``` ```
The MTA provides key advantages for applications requiring: The consolidated email system provides key advantages for applications requiring:
- Domain-specific email handling
- Flexible email routing
- High-volume email sending - High-volume email sending
- Compliance with email authentication standards - Compliance with email authentication standards
- Detailed delivery tracking - Detailed delivery tracking

File diff suppressed because it is too large Load Diff

View File

@ -2,9 +2,10 @@ import { tap, expect } from '@push.rocks/tapbundle';
import * as plugins from '../ts/plugins.js'; import * as plugins from '../ts/plugins.js';
import { import {
DcRouter, DcRouter,
type IDcRouterOptions, type IDcRouterOptions,
type ISmtpForwardingConfig, type IEmailConfig,
type IDomainRoutingConfig type EmailProcessingMode,
type IDomainRule
} from '../ts/dcrouter/index.js'; } from '../ts/dcrouter/index.js';
tap.test('DcRouter class - basic functionality', async () => { tap.test('DcRouter class - basic functionality', async () => {
@ -21,71 +22,97 @@ tap.test('DcRouter class - basic functionality', async () => {
expect(router.options.tls.contactEmail).toEqual('test@example.com'); expect(router.options.tls.contactEmail).toEqual('test@example.com');
}); });
tap.test('DcRouter class - HTTP routing configuration', async () => { tap.test('DcRouter class - SmartProxy configuration', async () => {
// Create HTTP routing configuration // Create SmartProxy configuration
const httpRoutes: IDomainRoutingConfig[] = [ const smartProxyConfig: plugins.smartproxy.ISmartProxyOptions = {
{ fromPort: 443,
domain: 'example.com', toPort: 8080,
targetServer: '192.168.1.10', targetIP: '10.0.0.10',
targetPort: 8080, sniEnabled: true,
useTls: true acme: {
port: 80,
enabled: true,
autoRenew: true,
useProduction: false,
renewThresholdDays: 30,
accountEmail: 'admin@example.com'
}, },
{ globalPortRanges: [
domain: '*.example.org', { from: 80, to: 80 },
targetServer: '192.168.1.20', { from: 443, to: 443 }
targetPort: 9000, ],
useTls: false domainConfigs: [
}
];
const options: IDcRouterOptions = {
httpDomainRoutes: httpRoutes,
tls: {
contactEmail: 'test@example.com'
}
};
const router = new DcRouter(options);
expect(router.options.httpDomainRoutes.length).toEqual(2);
expect(router.options.httpDomainRoutes[0].domain).toEqual('example.com');
expect(router.options.httpDomainRoutes[1].domain).toEqual('*.example.org');
});
tap.test('DcRouter class - SMTP forwarding configuration', async () => {
// Create SMTP forwarding configuration
const smtpForwarding: ISmtpForwardingConfig = {
enabled: true,
ports: [25, 587, 465],
defaultServer: 'mail.example.com',
defaultPort: 25,
useTls: true,
preserveSourceIp: true,
domainRoutes: [
{ {
domain: 'example.com', domains: ['example.com', 'www.example.com'],
server: 'mail1.example.com', allowedIPs: ['0.0.0.0/0'],
port: 25 targetIPs: ['10.0.0.10'],
}, portRanges: [
{ { from: 80, to: 80 },
domain: 'example.org', { from: 443, to: 443 }
server: 'mail2.example.org', ]
port: 587
} }
] ]
}; };
const options: IDcRouterOptions = { const options: IDcRouterOptions = {
smtpForwarding, smartProxyConfig,
tls: { tls: {
contactEmail: 'test@example.com' contactEmail: 'test@example.com'
} }
}; };
const router = new DcRouter(options); const router = new DcRouter(options);
expect(router.options.smtpForwarding.enabled).toEqual(true); expect(router.options.smartProxyConfig).toBeTruthy();
expect(router.options.smtpForwarding.ports.length).toEqual(3); expect(router.options.smartProxyConfig.domainConfigs.length).toEqual(1);
expect(router.options.smtpForwarding.domainRoutes.length).toEqual(2); expect(router.options.smartProxyConfig.domainConfigs[0].domains[0]).toEqual('example.com');
expect(router.options.smtpForwarding.domainRoutes[0].domain).toEqual('example.com'); });
tap.test('DcRouter class - Email configuration', async () => {
// Create consolidated email configuration
const emailConfig: IEmailConfig = {
ports: [25, 587, 465],
hostname: 'mail.example.com',
maxMessageSize: 50 * 1024 * 1024, // 50MB
defaultMode: 'forward' as EmailProcessingMode,
defaultServer: 'fallback-mail.example.com',
defaultPort: 25,
defaultTls: true,
domainRules: [
{
pattern: '*@example.com',
mode: 'forward' as EmailProcessingMode,
target: {
server: 'mail1.example.com',
port: 25,
useTls: true
}
},
{
pattern: '*@example.org',
mode: 'mta' as EmailProcessingMode,
mtaOptions: {
domain: 'example.org',
allowLocalDelivery: true
}
}
]
};
const options: IDcRouterOptions = {
emailConfig,
tls: {
contactEmail: 'test@example.com'
}
};
const router = new DcRouter(options);
expect(router.options.emailConfig).toBeTruthy();
expect(router.options.emailConfig.ports.length).toEqual(3);
expect(router.options.emailConfig.domainRules.length).toEqual(2);
expect(router.options.emailConfig.domainRules[0].pattern).toEqual('*@example.com');
expect(router.options.emailConfig.domainRules[1].pattern).toEqual('*@example.org');
}); });
tap.test('DcRouter class - Domain pattern matching', async () => { tap.test('DcRouter class - Domain pattern matching', async () => {

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@serve.zone/platformservice', name: '@serve.zone/platformservice',
version: '2.6.0', version: '2.8.0',
description: 'A multifaceted platform service handling mail, SMS, letter delivery, and AI services.' description: 'A multifaceted platform service handling mail, SMS, letter delivery, and AI services.'
} }

View File

@ -2,42 +2,17 @@ import * as plugins from '../plugins.js';
import * as paths from '../paths.js'; import * as paths from '../paths.js';
import { SmtpPortConfig, type ISmtpPortSettings } from './classes.smtp.portconfig.js'; import { SmtpPortConfig, type ISmtpPortSettings } from './classes.smtp.portconfig.js';
import { EmailDomainRouter, type IEmailDomainRoutingConfig } from './classes.email.domainrouter.js'; import { EmailDomainRouter, type IEmailDomainRoutingConfig } from './classes.email.domainrouter.js';
import { type IMtaConfig, MtaService } from '../mta/classes.mta.js';
// Import SMTP store-and-forward components
import { SmtpServer } from './classes.smtp.server.js';
import { EmailProcessor, type IProcessingResult } from './classes.email.processor.js';
import { DeliveryQueue } from './classes.delivery.queue.js';
import { DeliverySystem } from './classes.delivery.system.js';
// Certificate types are available via plugins.tsclass // Certificate types are available via plugins.tsclass
/** // Import the consolidated email config
* Configuration for SMTP forwarding functionality import type { IEmailConfig, IDomainRule } from './classes.email.config.js';
*/ import { DomainRouter } from './classes.domain.router.js';
export interface ISmtpForwardingConfig { import { UnifiedEmailServer } from './classes.unified.email.server.js';
/** Whether SMTP forwarding is enabled */ import { UnifiedDeliveryQueue, type IQueueOptions } from './classes.delivery.queue.js';
enabled?: boolean; import { MultiModeDeliverySystem, type IMultiModeDeliveryOptions } from './classes.delivery.system.js';
/** SMTP ports to listen on */ import { UnifiedRateLimiter, type IHierarchicalRateLimits } from './classes.rate.limiter.js';
ports?: number[]; import { logger } from '../logger.js';
/** Default SMTP server hostname */
defaultServer: string;
/** Default SMTP server port */
defaultPort?: number;
/** Whether to use TLS when connecting to the default server */
useTls?: boolean;
/** Preserve source IP address when forwarding */
preserveSourceIp?: boolean;
/** Domain-specific routing rules */
domainRoutes?: Array<{
domain: string;
server: string;
port?: number;
}>;
}
import type { ISmtpConfig } from './classes.smtp.config.js';
export interface IDcRouterOptions { export interface IDcRouterOptions {
/** /**
@ -46,24 +21,11 @@ export interface IDcRouterOptions {
*/ */
smartProxyConfig?: plugins.smartproxy.ISmartProxyOptions; smartProxyConfig?: plugins.smartproxy.ISmartProxyOptions;
/** /**
* SMTP store-and-forward configuration * Consolidated email configuration
* This enables advanced email processing capabilities (complementary to smartProxyConfig) * This enables all email handling with pattern-based routing
*/ */
smtpConfig?: ISmtpConfig; emailConfig?: IEmailConfig;
/**
* Legacy SMTP forwarding configuration
* If smtpConfig is provided, this will be ignored
*/
smtpForwarding?: ISmtpForwardingConfig;
/** MTA service configuration (if not using SMTP forwarding) */
mtaConfig?: IMtaConfig;
/** Existing MTA service instance to use (if not using SMTP forwarding) */
mtaServiceInstance?: MtaService;
/** TLS/certificate configuration */ /** TLS/certificate configuration */
tls?: { tls?: {
@ -100,14 +62,14 @@ export class DcRouter {
// Core services // Core services
public smartProxy?: plugins.smartproxy.SmartProxy; public smartProxy?: plugins.smartproxy.SmartProxy;
public mta?: MtaService;
public dnsServer?: plugins.smartdns.DnsServer; public dnsServer?: plugins.smartdns.DnsServer;
// SMTP store-and-forward components // Unified email components
public smtpServer?: SmtpServer; public domainRouter?: DomainRouter;
public emailProcessor?: EmailProcessor; public unifiedEmailServer?: UnifiedEmailServer;
public deliveryQueue?: DeliveryQueue; public deliveryQueue?: UnifiedDeliveryQueue;
public deliverySystem?: DeliverySystem; public deliverySystem?: MultiModeDeliverySystem;
public rateLimiter?: UnifiedRateLimiter;
// Environment access // Environment access
private qenv = new plugins.qenv.Qenv('./', '.nogit/'); private qenv = new plugins.qenv.Qenv('./', '.nogit/');
@ -128,16 +90,9 @@ export class DcRouter {
await this.setupSmartProxy(); await this.setupSmartProxy();
} }
// 2. Set up SMTP handling // Set up unified email handling if configured
if (this.options.smtpConfig) { if (this.options.emailConfig) {
// Set up store-and-forward SMTP processing await this.setupUnifiedEmailHandling();
await this.setupSmtpProcessing();
} else if (this.options.smtpForwarding?.enabled) {
// Fallback to simple SMTP forwarding for backward compatibility
await this.setupSmtpForwarding();
} else {
// Set up MTA service if no SMTP handling is configured
await this.setupMtaService();
} }
// 3. Set up DNS server if configured // 3. Set up DNS server if configured
@ -191,71 +146,6 @@ export class DcRouter {
} }
/**
* Set up the MTA service
*/
private async setupMtaService() {
// Use existing MTA service if provided
if (this.options.mtaServiceInstance) {
this.mta = this.options.mtaServiceInstance;
console.log('Using provided MTA service instance');
} else if (this.options.mtaConfig) {
// Create new MTA service with the provided configuration
this.mta = new MtaService(undefined, this.options.mtaConfig);
console.log('Created new MTA service instance');
// Start the MTA service
await this.mta.start();
console.log('MTA service started');
}
}
/**
* Set up SMTP forwarding with SmartProxy
*/
private async setupSmtpForwarding() {
if (!this.options.smtpForwarding) {
return;
}
const forwarding = this.options.smtpForwarding;
console.log('Setting up SMTP forwarding');
// Determine which ports to listen on
const smtpPorts = forwarding.ports || [25, 587, 465];
// Create SmartProxy instance for SMTP forwarding
const smtpProxyConfig: plugins.smartproxy.ISmartProxyOptions = {
// Listen on the first SMTP port
fromPort: smtpPorts[0],
// Forward to the default server
toPort: forwarding.defaultPort || 25,
targetIP: forwarding.defaultServer,
// Enable SNI if port 465 is included (implicit TLS)
sniEnabled: smtpPorts.includes(465),
// Preserve source IP if requested
preserveSourceIP: forwarding.preserveSourceIp || false,
// Create domain configs for SMTP routing
domainConfigs: forwarding.domainRoutes?.map(route => ({
domains: [route.domain],
allowedIPs: ['0.0.0.0/0'], // Allow from anywhere by default
targetIPs: [route.server]
})) || [],
// Include all SMTP ports in the global port ranges
globalPortRanges: smtpPorts.map(port => ({ from: port, to: port }))
};
// Create a separate SmartProxy instance for SMTP
const smtpProxy = new plugins.smartproxy.SmartProxy(smtpProxyConfig);
// Start the SMTP proxy
await smtpProxy.start();
// Store the SMTP proxy reference
this.smartProxy = smtpProxy;
console.log(`SMTP forwarding configured on ports ${smtpPorts.join(', ')}`);
}
/** /**
* Check if a domain matches a pattern (including wildcard support) * Check if a domain matches a pattern (including wildcard support)
@ -291,17 +181,12 @@ export class DcRouter {
try { try {
// Stop all services in parallel for faster shutdown // Stop all services in parallel for faster shutdown
await Promise.all([ await Promise.all([
// Stop SMTP components // Stop unified email components if running
this.stopSmtpComponents().catch(err => console.error('Error stopping SMTP components:', err)), this.domainRouter ? this.stopUnifiedEmailComponents().catch(err => console.error('Error stopping unified email components:', err)) : Promise.resolve(),
// Stop HTTP SmartProxy if running // Stop HTTP SmartProxy if running
this.smartProxy ? this.smartProxy.stop().catch(err => console.error('Error stopping SmartProxy:', err)) : Promise.resolve(), this.smartProxy ? this.smartProxy.stop().catch(err => console.error('Error stopping SmartProxy:', err)) : Promise.resolve(),
// Stop MTA service if it's our own (not an external instance)
(this.mta && !this.options.mtaServiceInstance) ?
this.mta.stop().catch(err => console.error('Error stopping MTA service:', err)) :
Promise.resolve(),
// Stop DNS server if running // Stop DNS server if running
this.dnsServer ? this.dnsServer ?
this.dnsServer.stop().catch(err => console.error('Error stopping DNS server:', err)) : this.dnsServer.stop().catch(err => console.error('Error stopping DNS server:', err)) :
@ -336,134 +221,196 @@ export class DcRouter {
} }
/** /**
* Set up SMTP store-and-forward processing * Set up unified email handling with pattern-based routing
* This implements the consolidated emailConfig approach
*/ */
private async setupSmtpProcessing(): Promise<void> { private async setupUnifiedEmailHandling(): Promise<void> {
if (!this.options.smtpConfig) { logger.log('info', 'Setting up unified email handling with pattern-based routing');
return;
if (!this.options.emailConfig) {
throw new Error('Email configuration is required for unified email handling');
} }
console.log('Setting up SMTP store-and-forward processing');
try { try {
// 1. Create SMTP server // Create domain router for pattern matching
this.smtpServer = new SmtpServer(this.options.smtpConfig); this.domainRouter = new DomainRouter({
domainRules: this.options.emailConfig.domainRules,
defaultMode: this.options.emailConfig.defaultMode,
defaultServer: this.options.emailConfig.defaultServer,
defaultPort: this.options.emailConfig.defaultPort,
defaultTls: this.options.emailConfig.defaultTls
});
// 2. Create email processor // Initialize the rate limiter
this.emailProcessor = new EmailProcessor(this.options.smtpConfig); this.rateLimiter = new UnifiedRateLimiter({
global: {
// 3. Create delivery queue maxMessagesPerMinute: 100,
this.deliveryQueue = new DeliveryQueue(this.options.smtpConfig.queue || {}); maxRecipientsPerMessage: 100,
await this.deliveryQueue.initialize(); maxConnectionsPerIP: 20,
maxErrorsPerIP: 10,
// 4. Create delivery system maxAuthFailuresPerIP: 5
this.deliverySystem = new DeliverySystem(this.deliveryQueue);
// 5. Connect components
// When a message is received by the SMTP server, process it
this.smtpServer.on('message', async ({ session, mail, rawData }) => {
try {
// Process the message
const processingResult = await this.emailProcessor.processEmail(mail, rawData, session);
// If action is queue, add to delivery queue
if (processingResult.action === 'queue') {
await this.deliveryQueue.enqueue(processingResult);
}
} catch (error) {
console.error('Error processing message:', error);
} }
}); });
// 6. Start components // Initialize the unified delivery queue
await this.smtpServer.start(); const queueOptions: IQueueOptions = {
storageType: this.options.emailConfig.queue?.storageType || 'memory',
persistentPath: this.options.emailConfig.queue?.persistentPath,
maxRetries: this.options.emailConfig.queue?.maxRetries,
baseRetryDelay: this.options.emailConfig.queue?.baseRetryDelay,
maxRetryDelay: this.options.emailConfig.queue?.maxRetryDelay
};
this.deliveryQueue = new UnifiedDeliveryQueue(queueOptions);
await this.deliveryQueue.initialize();
// Initialize the delivery system
const deliveryOptions: IMultiModeDeliveryOptions = {
globalRateLimit: 100, // Default to 100 emails per minute
concurrentDeliveries: 10
};
this.deliverySystem = new MultiModeDeliverySystem(this.deliveryQueue, deliveryOptions);
await this.deliverySystem.start(); await this.deliverySystem.start();
console.log(`SMTP processing started on ports ${this.options.smtpConfig.ports.join(', ')}`); // Initialize the unified email server
this.unifiedEmailServer = new UnifiedEmailServer({
ports: this.options.emailConfig.ports,
hostname: this.options.emailConfig.hostname,
maxMessageSize: this.options.emailConfig.maxMessageSize,
auth: this.options.emailConfig.auth,
tls: this.options.emailConfig.tls,
domainRules: this.options.emailConfig.domainRules,
defaultMode: this.options.emailConfig.defaultMode,
defaultServer: this.options.emailConfig.defaultServer,
defaultPort: this.options.emailConfig.defaultPort,
defaultTls: this.options.emailConfig.defaultTls
});
// Set up event listeners
this.unifiedEmailServer.on('error', (err) => {
logger.log('error', `UnifiedEmailServer error: ${err.message}`);
});
// Connect the unified email server with the delivery queue
this.unifiedEmailServer.on('emailProcessed', (email, mode, rule) => {
this.deliveryQueue!.enqueue(email, mode, rule).catch(err => {
logger.log('error', `Failed to enqueue email: ${err.message}`);
});
});
// Start the unified email server
await this.unifiedEmailServer.start();
logger.log('info', `Unified email handling configured with ${this.options.emailConfig.domainRules.length} domain rules`);
} catch (error) { } catch (error) {
console.error('Error setting up SMTP processing:', error); logger.log('error', `Error setting up unified email handling: ${error.message}`);
// Clean up any components that were started
if (this.deliverySystem) {
await this.deliverySystem.stop().catch(e => console.error('Error stopping delivery system:', e));
}
if (this.deliveryQueue) {
await this.deliveryQueue.shutdown().catch(e => console.error('Error shutting down delivery queue:', e));
}
if (this.smtpServer) {
await this.smtpServer.stop().catch(e => console.error('Error stopping SMTP server:', e));
}
throw error; throw error;
} }
} }
/** /**
* Update SMTP forwarding configuration * Update the unified email configuration
* @param config New SMTP forwarding configuration * @param config New email configuration
*/ */
public async updateSmtpForwarding(config: ISmtpForwardingConfig): Promise<void> { public async updateEmailConfig(config: IEmailConfig): Promise<void> {
// Stop existing SMTP components // Stop existing email components
await this.stopSmtpComponents(); await this.stopUnifiedEmailComponents();
// Update configuration // Update configuration
this.options.smtpForwarding = config; this.options.emailConfig = config;
this.options.smtpConfig = undefined; // Clear any store-and-forward config
// Restart SMTP forwarding if enabled // Start email handling with new configuration
if (config.enabled) { await this.setupUnifiedEmailHandling();
await this.setupSmtpForwarding();
}
console.log('SMTP forwarding configuration updated'); console.log('Unified email configuration updated');
} }
/** /**
* Update SMTP processing configuration * Stop all unified email components
* @param config New SMTP config
*/ */
public async updateSmtpConfig(config: ISmtpConfig): Promise<void> { private async stopUnifiedEmailComponents(): Promise<void> {
// Stop existing SMTP components try {
await this.stopSmtpComponents(); // Stop all components in the correct order
// Update configuration // 1. Stop the unified email server first
this.options.smtpConfig = config; if (this.unifiedEmailServer) {
this.options.smtpForwarding = undefined; // Clear any forwarding config await this.unifiedEmailServer.stop();
logger.log('info', 'Unified email server stopped');
// Start SMTP processing this.unifiedEmailServer = undefined;
await this.setupSmtpProcessing(); }
console.log('SMTP processing configuration updated'); // 2. Stop the delivery system
if (this.deliverySystem) {
await this.deliverySystem.stop();
logger.log('info', 'Delivery system stopped');
this.deliverySystem = undefined;
}
// 3. Stop the delivery queue
if (this.deliveryQueue) {
await this.deliveryQueue.shutdown();
logger.log('info', 'Delivery queue shut down');
this.deliveryQueue = undefined;
}
// 4. Stop the rate limiter
if (this.rateLimiter) {
this.rateLimiter.stop();
logger.log('info', 'Rate limiter stopped');
this.rateLimiter = undefined;
}
// 5. Clear the domain router
this.domainRouter = undefined;
logger.log('info', 'All unified email components stopped');
} catch (error) {
logger.log('error', `Error stopping unified email components: ${error.message}`);
throw error;
}
} }
/** /**
* Stop all SMTP components * Update domain rules for email routing
* @param rules New domain rules to apply
*/ */
private async stopSmtpComponents(): Promise<void> { public async updateDomainRules(rules: IDomainRule[]): Promise<void> {
// Stop delivery system // Validate that email config exists
if (this.deliverySystem) { if (!this.options.emailConfig) {
await this.deliverySystem.stop().catch(e => console.error('Error stopping delivery system:', e)); throw new Error('Email configuration is required before updating domain rules');
this.deliverySystem = undefined;
} }
// Stop delivery queue // Update the configuration
if (this.deliveryQueue) { this.options.emailConfig.domainRules = rules;
await this.deliveryQueue.shutdown().catch(e => console.error('Error shutting down delivery queue:', e));
this.deliveryQueue = undefined; // Update the domain router if it exists
if (this.domainRouter) {
this.domainRouter.updateRules(rules);
} }
// Stop SMTP server // Update the unified email server if it exists
if (this.smtpServer) { if (this.unifiedEmailServer) {
await this.smtpServer.stop().catch(e => console.error('Error stopping SMTP server:', e)); this.unifiedEmailServer.updateDomainRules(rules);
this.smtpServer = undefined;
} }
// For backward compatibility: legacy SMTP proxy implementation console.log(`Domain rules updated with ${rules.length} rules`);
// This is no longer used with the new implementation }
/**
* Get statistics from all components
*/
public getStats(): any {
const stats: any = {
unifiedEmailServer: this.unifiedEmailServer?.getStats(),
deliveryQueue: this.deliveryQueue?.getStats(),
deliverySystem: this.deliverySystem?.getStats(),
rateLimiter: this.rateLimiter?.getStats()
};
return stats;
} }
} }

View File

@ -1,9 +1,9 @@
import * as plugins from '../plugins.js'; import * as plugins from '../plugins.js';
import type { IQueueConfig } from './classes.smtp.config.js';
import type { IProcessingResult } from './classes.email.processor.js';
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
import * as fs from 'node:fs'; import * as fs from 'node:fs';
import * as path from 'node:path'; import * as path from 'node:path';
import { logger } from '../logger.js';
import { type EmailProcessingMode, type IDomainRule } from './classes.email.config.js';
/** /**
* Queue item status * Queue item status
@ -11,11 +11,13 @@ import * as path from 'node:path';
export type QueueItemStatus = 'pending' | 'processing' | 'delivered' | 'failed' | 'deferred'; export type QueueItemStatus = 'pending' | 'processing' | 'delivered' | 'failed' | 'deferred';
/** /**
* Queue item * Queue item interface
*/ */
export interface IQueueItem { export interface IQueueItem {
id: string; id: string;
processingResult: IProcessingResult; processingMode: EmailProcessingMode;
processingResult: any;
rule: IDomainRule;
status: QueueItemStatus; status: QueueItemStatus;
attempts: number; attempts: number;
nextAttempt: Date; nextAttempt: Date;
@ -26,28 +28,96 @@ export interface IQueueItem {
} }
/** /**
* Delivery queue component for store-and-forward functionality * Queue options interface
*/ */
export class DeliveryQueue extends EventEmitter { export interface IQueueOptions {
private config: IQueueConfig; // Storage options
storageType?: 'memory' | 'disk';
persistentPath?: string;
// Queue behavior
checkInterval?: number;
maxQueueSize?: number;
maxPerDestination?: number;
// Delivery attempts
maxRetries?: number;
baseRetryDelay?: number;
maxRetryDelay?: number;
}
/**
* Queue statistics interface
*/
export interface IQueueStats {
queueSize: number;
status: {
pending: number;
processing: number;
delivered: number;
failed: number;
deferred: number;
};
modes: {
forward: number;
mta: number;
process: number;
};
oldestItem?: Date;
newestItem?: Date;
averageAttempts: number;
totalProcessed: number;
processingActive: boolean;
}
/**
* A unified queue for all email modes
*/
export class UnifiedDeliveryQueue extends EventEmitter {
private options: Required<IQueueOptions>;
private queue: Map<string, IQueueItem> = new Map(); private queue: Map<string, IQueueItem> = new Map();
private isProcessing: boolean = false; private checkTimer?: NodeJS.Timeout;
private processingInterval: NodeJS.Timeout | null = null; private stats: IQueueStats;
private persistenceTimer: NodeJS.Timeout | null = null; private processing: boolean = false;
private totalProcessed: number = 0;
/** /**
* Create a new delivery queue * Create a new unified delivery queue
* @param config Queue configuration * @param options Queue options
*/ */
constructor(config: IQueueConfig) { constructor(options: IQueueOptions) {
super(); super();
this.config = {
storageType: 'memory', // Set default options
maxRetries: 5, this.options = {
baseRetryDelay: 60000, // 1 minute storageType: options.storageType || 'memory',
maxRetryDelay: 3600000, // 1 hour persistentPath: options.persistentPath || path.join(process.cwd(), 'email-queue'),
maxQueueSize: 10000, checkInterval: options.checkInterval || 30000, // 30 seconds
...config maxQueueSize: options.maxQueueSize || 10000,
maxPerDestination: options.maxPerDestination || 100,
maxRetries: options.maxRetries || 5,
baseRetryDelay: options.baseRetryDelay || 60000, // 1 minute
maxRetryDelay: options.maxRetryDelay || 3600000 // 1 hour
};
// Initialize statistics
this.stats = {
queueSize: 0,
status: {
pending: 0,
processing: 0,
delivered: 0,
failed: 0,
deferred: 0
},
modes: {
forward: 0,
mta: 0,
process: 0
},
averageAttempts: 0,
totalProcessed: 0,
processingActive: false
}; };
} }
@ -55,65 +125,116 @@ export class DeliveryQueue extends EventEmitter {
* Initialize the queue * Initialize the queue
*/ */
public async initialize(): Promise<void> { public async initialize(): Promise<void> {
logger.log('info', 'Initializing UnifiedDeliveryQueue');
try { try {
// Load queue from persistent storage if enabled // Create persistent storage directory if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) { if (this.options.storageType === 'disk') {
await this.load(); if (!fs.existsSync(this.options.persistentPath)) {
fs.mkdirSync(this.options.persistentPath, { recursive: true });
}
// Load existing items from disk
await this.loadFromDisk();
} }
// Set up processing interval // Start the queue processing timer
this.startProcessing(); this.startProcessing();
// Set up persistence interval if using disk storage // Emit initialized event
if (this.config.storageType === 'disk' && this.config.persistentPath) {
this.persistenceTimer = setInterval(() => {
this.save().catch(err => {
console.error('Error saving queue:', err);
});
}, 60000); // Save every minute
}
this.emit('initialized'); this.emit('initialized');
logger.log('info', 'UnifiedDeliveryQueue initialized successfully');
} catch (error) { } catch (error) {
console.error('Failed to initialize delivery queue:', error); logger.log('error', `Failed to initialize queue: ${error.message}`);
throw error; throw error;
} }
} }
/** /**
* Start processing the queue * Start queue processing
*/ */
private startProcessing(): void { private startProcessing(): void {
if (this.processingInterval) { if (this.checkTimer) {
clearInterval(this.processingInterval); clearInterval(this.checkTimer);
} }
this.processingInterval = setInterval(() => { this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval);
this.processQueue().catch(err => { this.processing = true;
console.error('Error processing queue:', err); this.stats.processingActive = true;
}); this.emit('processingStarted');
}, 1000); // Check every second logger.log('info', 'Queue processing started');
}
/**
* Stop queue processing
*/
private stopProcessing(): void {
if (this.checkTimer) {
clearInterval(this.checkTimer);
this.checkTimer = undefined;
}
this.processing = false;
this.stats.processingActive = false;
this.emit('processingStopped');
logger.log('info', 'Queue processing stopped');
}
/**
* Check for items that need to be processed
*/
private async processQueue(): Promise<void> {
try {
const now = new Date();
let readyItems: IQueueItem[] = [];
// Find items ready for processing
for (const item of this.queue.values()) {
if (item.status === 'pending' || (item.status === 'deferred' && item.nextAttempt <= now)) {
readyItems.push(item);
}
}
if (readyItems.length === 0) {
return;
}
// Sort by oldest first
readyItems.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
// Emit event for ready items
this.emit('itemsReady', readyItems);
logger.log('info', `Found ${readyItems.length} items ready for processing`);
// Update statistics
this.updateStats();
} catch (error) {
logger.log('error', `Error processing queue: ${error.message}`);
this.emit('error', error);
}
} }
/** /**
* Add an item to the queue * Add an item to the queue
* @param processingResult Processing result to queue * @param processingResult Processing result to queue
* @param mode Processing mode
* @param rule Domain rule
*/ */
public async enqueue(processingResult: IProcessingResult): Promise<string> { public async enqueue(processingResult: any, mode: EmailProcessingMode, rule: IDomainRule): Promise<string> {
// Skip if the action is reject
if (processingResult.action === 'reject') {
throw new Error('Cannot queue a rejected message');
}
// Check if queue is full // Check if queue is full
if (this.config.maxQueueSize && this.queue.size >= this.config.maxQueueSize) { if (this.queue.size >= this.options.maxQueueSize) {
throw new Error('Queue is full'); throw new Error('Queue is full');
} }
// Generate a unique ID
const id = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`;
// Create queue item // Create queue item
const queueItem: IQueueItem = { const item: IQueueItem = {
id: processingResult.id, id,
processingMode: mode,
processingResult, processingResult,
rule,
status: 'pending', status: 'pending',
attempts: 0, attempts: 0,
nextAttempt: new Date(), nextAttempt: new Date(),
@ -122,50 +243,21 @@ export class DeliveryQueue extends EventEmitter {
}; };
// Add to queue // Add to queue
this.queue.set(queueItem.id, queueItem); this.queue.set(id, item);
// Save queue if using disk storage // Persist to disk if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) { if (this.options.storageType === 'disk') {
await this.saveItem(queueItem); await this.persistItem(item);
} }
this.emit('enqueued', queueItem); // Update statistics
this.updateStats();
return queueItem.id; // Emit event
} this.emit('itemEnqueued', item);
logger.log('info', `Item enqueued with ID ${id}, mode: ${mode}`);
/**
* Process the queue
*/
private async processQueue(): Promise<void> {
// Skip if already processing
if (this.isProcessing) {
return;
}
this.isProcessing = true; return id;
try {
// Get items that are ready for delivery
const now = new Date();
const readyItems: IQueueItem[] = [];
for (const item of this.queue.values()) {
if (item.status === 'pending' && item.nextAttempt <= now) {
readyItems.push(item);
}
}
// If no items are ready, skip processing
if (!readyItems.length) {
return;
}
// Emit event with ready items
this.emit('itemsReady', readyItems);
} finally {
this.isProcessing = false;
}
} }
/** /**
@ -177,44 +269,32 @@ export class DeliveryQueue extends EventEmitter {
} }
/** /**
* Get all items in the queue * Mark an item as being processed
*/
public getAllItems(): IQueueItem[] {
return Array.from(this.queue.values());
}
/**
* Get items by status
* @param status Status to filter by
*/
public getItemsByStatus(status: QueueItemStatus): IQueueItem[] {
return Array.from(this.queue.values()).filter(item => item.status === status);
}
/**
* Update an item in the queue
* @param id Item ID * @param id Item ID
* @param updates Updates to apply
*/ */
public async updateItem(id: string, updates: Partial<IQueueItem>): Promise<boolean> { public async markProcessing(id: string): Promise<boolean> {
const item = this.queue.get(id); const item = this.queue.get(id);
if (!item) { if (!item) {
return false; return false;
} }
// Apply updates // Update status
Object.assign(item, { item.status = 'processing';
...updates, item.attempts++;
updatedAt: new Date() item.updatedAt = new Date();
});
// Save queue if using disk storage // Persist changes if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) { if (this.options.storageType === 'disk') {
await this.saveItem(item); await this.persistItem(item);
} }
this.emit('itemUpdated', item); // Update statistics
this.updateStats();
// Emit event
this.emit('itemProcessing', item);
logger.log('info', `Item ${id} marked as processing, attempt ${item.attempts}`);
return true; return true;
} }
@ -224,10 +304,31 @@ export class DeliveryQueue extends EventEmitter {
* @param id Item ID * @param id Item ID
*/ */
public async markDelivered(id: string): Promise<boolean> { public async markDelivered(id: string): Promise<boolean> {
return this.updateItem(id, { const item = this.queue.get(id);
status: 'delivered',
deliveredAt: new Date() if (!item) {
}); return false;
}
// Update status
item.status = 'delivered';
item.updatedAt = new Date();
item.deliveredAt = new Date();
// Persist changes if using disk storage
if (this.options.storageType === 'disk') {
await this.persistItem(item);
}
// Update statistics
this.totalProcessed++;
this.updateStats();
// Emit event
this.emit('itemDelivered', item);
logger.log('info', `Item ${id} marked as delivered after ${item.attempts} attempts`);
return true;
} }
/** /**
@ -242,32 +343,51 @@ export class DeliveryQueue extends EventEmitter {
return false; return false;
} }
// Check if max retries reached // Determine if we should retry
if (item.attempts >= (this.config.maxRetries || 5)) { if (item.attempts < this.options.maxRetries) {
return this.updateItem(id, { // Calculate next retry time with exponential backoff
status: 'failed', const delay = Math.min(
lastError: error this.options.baseRetryDelay * Math.pow(2, item.attempts - 1),
}); this.options.maxRetryDelay
);
// Update status
item.status = 'deferred';
item.lastError = error;
item.nextAttempt = new Date(Date.now() + delay);
item.updatedAt = new Date();
// Persist changes if using disk storage
if (this.options.storageType === 'disk') {
await this.persistItem(item);
}
// Emit event
this.emit('itemDeferred', item);
logger.log('info', `Item ${id} deferred for ${delay}ms, attempt ${item.attempts}, error: ${error}`);
} else {
// Mark as permanently failed
item.status = 'failed';
item.lastError = error;
item.updatedAt = new Date();
// Persist changes if using disk storage
if (this.options.storageType === 'disk') {
await this.persistItem(item);
}
// Update statistics
this.totalProcessed++;
// Emit event
this.emit('itemFailed', item);
logger.log('warn', `Item ${id} permanently failed after ${item.attempts} attempts, error: ${error}`);
} }
// Calculate next attempt time with exponential backoff // Update statistics
const attempts = item.attempts + 1; this.updateStats();
const baseDelay = this.config.baseRetryDelay || 60000; // 1 minute
const maxDelay = this.config.maxRetryDelay || 3600000; // 1 hour
const delay = Math.min( return true;
baseDelay * Math.pow(2, attempts - 1),
maxDelay
);
const nextAttempt = new Date(Date.now() + delay);
return this.updateItem(id, {
status: 'deferred',
attempts,
nextAttempt,
lastError: error
});
} }
/** /**
@ -275,179 +395,244 @@ export class DeliveryQueue extends EventEmitter {
* @param id Item ID * @param id Item ID
*/ */
public async removeItem(id: string): Promise<boolean> { public async removeItem(id: string): Promise<boolean> {
if (!this.queue.has(id)) { const item = this.queue.get(id);
if (!item) {
return false; return false;
} }
// Remove from queue
this.queue.delete(id); this.queue.delete(id);
// Remove from disk if using disk storage // Remove from disk if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) { if (this.options.storageType === 'disk') {
await this.removeItemFile(id); await this.removeItemFromDisk(id);
} }
this.emit('itemRemoved', id); // Update statistics
this.updateStats();
// Emit event
this.emit('itemRemoved', item);
logger.log('info', `Item ${id} removed from queue`);
return true; return true;
} }
/**
* Persist an item to disk
* @param item Item to persist
*/
private async persistItem(item: IQueueItem): Promise<void> {
try {
const filePath = path.join(this.options.persistentPath, `${item.id}.json`);
await fs.promises.writeFile(filePath, JSON.stringify(item, null, 2), 'utf8');
} catch (error) {
logger.log('error', `Failed to persist item ${item.id}: ${error.message}`);
this.emit('error', error);
}
}
/**
* Remove an item from disk
* @param id Item ID
*/
private async removeItemFromDisk(id: string): Promise<void> {
try {
const filePath = path.join(this.options.persistentPath, `${id}.json`);
if (fs.existsSync(filePath)) {
await fs.promises.unlink(filePath);
}
} catch (error) {
logger.log('error', `Failed to remove item ${id} from disk: ${error.message}`);
this.emit('error', error);
}
}
/**
* Load queue items from disk
*/
private async loadFromDisk(): Promise<void> {
try {
// Check if directory exists
if (!fs.existsSync(this.options.persistentPath)) {
return;
}
// Get all JSON files
const files = fs.readdirSync(this.options.persistentPath).filter(file => file.endsWith('.json'));
// Load each file
for (const file of files) {
try {
const filePath = path.join(this.options.persistentPath, file);
const data = await fs.promises.readFile(filePath, 'utf8');
const item = JSON.parse(data) as IQueueItem;
// Convert date strings to Date objects
item.createdAt = new Date(item.createdAt);
item.updatedAt = new Date(item.updatedAt);
item.nextAttempt = new Date(item.nextAttempt);
if (item.deliveredAt) {
item.deliveredAt = new Date(item.deliveredAt);
}
// Add to queue
this.queue.set(item.id, item);
} catch (error) {
logger.log('error', `Failed to load item from ${file}: ${error.message}`);
}
}
// Update statistics
this.updateStats();
logger.log('info', `Loaded ${this.queue.size} items from disk`);
} catch (error) {
logger.log('error', `Failed to load items from disk: ${error.message}`);
throw error;
}
}
/**
* Update queue statistics
*/
private updateStats(): void {
// Reset counters
this.stats.queueSize = this.queue.size;
this.stats.status = {
pending: 0,
processing: 0,
delivered: 0,
failed: 0,
deferred: 0
};
this.stats.modes = {
forward: 0,
mta: 0,
process: 0
};
let totalAttempts = 0;
let oldestTime = Date.now();
let newestTime = 0;
// Count by status and mode
for (const item of this.queue.values()) {
// Count by status
this.stats.status[item.status]++;
// Count by mode
this.stats.modes[item.processingMode]++;
// Track total attempts
totalAttempts += item.attempts;
// Track oldest and newest
const itemTime = item.createdAt.getTime();
if (itemTime < oldestTime) {
oldestTime = itemTime;
}
if (itemTime > newestTime) {
newestTime = itemTime;
}
}
// Calculate average attempts
this.stats.averageAttempts = this.queue.size > 0 ? totalAttempts / this.queue.size : 0;
// Set oldest and newest
this.stats.oldestItem = this.queue.size > 0 ? new Date(oldestTime) : undefined;
this.stats.newestItem = this.queue.size > 0 ? new Date(newestTime) : undefined;
// Set total processed
this.stats.totalProcessed = this.totalProcessed;
// Set processing active
this.stats.processingActive = this.processing;
// Emit statistics event
this.emit('statsUpdated', this.stats);
}
/**
* Get queue statistics
*/
public getStats(): IQueueStats {
return { ...this.stats };
}
/** /**
* Pause queue processing * Pause queue processing
*/ */
public pause(): void { public pause(): void {
if (this.processingInterval) { if (this.processing) {
clearInterval(this.processingInterval); this.stopProcessing();
this.processingInterval = null; logger.log('info', 'Queue processing paused');
} }
this.emit('paused');
} }
/** /**
* Resume queue processing * Resume queue processing
*/ */
public resume(): void { public resume(): void {
if (!this.processingInterval) { if (!this.processing) {
this.startProcessing(); this.startProcessing();
logger.log('info', 'Queue processing resumed');
}
}
/**
* Clean up old delivered and failed items
* @param maxAge Maximum age in milliseconds (default: 7 days)
*/
public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise<number> {
const cutoff = new Date(Date.now() - maxAge);
let removedCount = 0;
// Find old items
for (const item of this.queue.values()) {
if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) {
// Remove item
await this.removeItem(item.id);
removedCount++;
}
} }
this.emit('resumed'); logger.log('info', `Cleaned up ${removedCount} old items`);
return removedCount;
} }
/** /**
* Shutdown the queue * Shutdown the queue
*/ */
public async shutdown(): Promise<void> { public async shutdown(): Promise<void> {
logger.log('info', 'Shutting down UnifiedDeliveryQueue');
// Stop processing // Stop processing
if (this.processingInterval) { this.stopProcessing();
clearInterval(this.processingInterval);
this.processingInterval = null; // If using disk storage, make sure all items are persisted
if (this.options.storageType === 'disk') {
const pendingWrites: Promise<void>[] = [];
for (const item of this.queue.values()) {
pendingWrites.push(this.persistItem(item));
}
// Wait for all writes to complete
await Promise.all(pendingWrites);
} }
// Stop persistence timer // Clear the queue (memory only)
if (this.persistenceTimer) { this.queue.clear();
clearInterval(this.persistenceTimer);
this.persistenceTimer = null;
}
// Save queue if using disk storage // Update statistics
if (this.config.storageType === 'disk' && this.config.persistentPath) { this.updateStats();
await this.save();
}
// Emit shutdown event
this.emit('shutdown'); this.emit('shutdown');
} logger.log('info', 'UnifiedDeliveryQueue shut down successfully');
/**
* Load queue from disk
*/
private async load(): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
// Create directory if it doesn't exist
if (!fs.existsSync(this.config.persistentPath)) {
fs.mkdirSync(this.config.persistentPath, { recursive: true });
}
// Read the queue directory
const files = fs.readdirSync(this.config.persistentPath);
// Load each item
for (const file of files) {
if (file.endsWith('.json')) {
try {
const filePath = path.join(this.config.persistentPath, file);
const data = fs.readFileSync(filePath, 'utf8');
const item = JSON.parse(data) as IQueueItem;
// Convert string dates back to Date objects
item.nextAttempt = new Date(item.nextAttempt);
item.createdAt = new Date(item.createdAt);
item.updatedAt = new Date(item.updatedAt);
if (item.deliveredAt) {
item.deliveredAt = new Date(item.deliveredAt);
}
// Add to queue
this.queue.set(item.id, item);
} catch (err) {
console.error(`Error loading queue item ${file}:`, err);
}
}
}
console.log(`Loaded ${this.queue.size} items from queue`);
} catch (error) {
console.error('Error loading queue:', error);
throw error;
}
}
/**
* Save queue to disk
*/
private async save(): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
// Create directory if it doesn't exist
if (!fs.existsSync(this.config.persistentPath)) {
fs.mkdirSync(this.config.persistentPath, { recursive: true });
}
// Save each item
const savePromises = Array.from(this.queue.values()).map(item => this.saveItem(item));
await Promise.all(savePromises);
} catch (error) {
console.error('Error saving queue:', error);
throw error;
}
}
/**
* Save a single item to disk
* @param item Queue item to save
*/
private async saveItem(item: IQueueItem): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
const filePath = path.join(this.config.persistentPath, `${item.id}.json`);
const data = JSON.stringify(item, null, 2);
await fs.promises.writeFile(filePath, data, 'utf8');
} catch (error) {
console.error(`Error saving queue item ${item.id}:`, error);
throw error;
}
}
/**
* Remove a single item file from disk
* @param id Item ID
*/
private async removeItemFile(id: string): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
const filePath = path.join(this.config.persistentPath, `${id}.json`);
if (fs.existsSync(filePath)) {
await fs.promises.unlink(filePath);
}
} catch (error) {
console.error(`Error removing queue item file ${id}:`, error);
throw error;
}
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,369 @@
import * as plugins from '../plugins.js';
import { EventEmitter } from 'node:events';
import { type IDomainRule, type EmailProcessingMode } from './classes.email.config.js';
/**
* Options for the domain-based router
*/
export interface IDomainRouterOptions {
// Domain rules with glob pattern matching
domainRules: IDomainRule[];
// Default handling for unmatched domains
defaultMode: EmailProcessingMode;
defaultServer?: string;
defaultPort?: number;
defaultTls?: boolean;
// Pattern matching options
caseSensitive?: boolean;
priorityOrder?: 'most-specific' | 'first-match';
// Cache settings for pattern matching
enableCache?: boolean;
cacheSize?: number;
}
/**
* Result of a pattern match operation
*/
export interface IPatternMatchResult {
rule: IDomainRule;
exactMatch: boolean;
wildcardMatch: boolean;
specificity: number; // Higher is more specific
}
/**
* A pattern matching and routing class for email domains
*/
export class DomainRouter extends EventEmitter {
private options: IDomainRouterOptions;
private patternCache: Map<string, IDomainRule | null> = new Map();
/**
* Create a new domain router
* @param options Router options
*/
constructor(options: IDomainRouterOptions) {
super();
this.options = {
// Default options
caseSensitive: false,
priorityOrder: 'most-specific',
enableCache: true,
cacheSize: 1000,
...options
};
}
/**
* Match an email address against defined rules
* @param email Email address to match
* @returns The matching rule or null if no match
*/
public matchRule(email: string): IDomainRule | null {
// Check cache first if enabled
if (this.options.enableCache && this.patternCache.has(email)) {
return this.patternCache.get(email) || null;
}
// Normalize email if case-insensitive
const normalizedEmail = this.options.caseSensitive ? email : email.toLowerCase();
// Get all matching rules
const matches = this.getAllMatchingRules(normalizedEmail);
if (matches.length === 0) {
// Cache the result (null) if caching is enabled
if (this.options.enableCache) {
this.addToCache(email, null);
}
return null;
}
// Sort by specificity or order
let matchedRule: IDomainRule;
if (this.options.priorityOrder === 'most-specific') {
// Sort by specificity (most specific first)
const sortedMatches = matches.sort((a, b) => {
const aSpecificity = this.calculateSpecificity(a.pattern);
const bSpecificity = this.calculateSpecificity(b.pattern);
return bSpecificity - aSpecificity;
});
matchedRule = sortedMatches[0];
} else {
// First match in the list
matchedRule = matches[0];
}
// Cache the result if caching is enabled
if (this.options.enableCache) {
this.addToCache(email, matchedRule);
}
return matchedRule;
}
/**
* Calculate pattern specificity
* Higher is more specific
* @param pattern Pattern to calculate specificity for
*/
private calculateSpecificity(pattern: string): number {
let specificity = 0;
// Exact match is most specific
if (!pattern.includes('*')) {
return 100;
}
// Count characters that aren't wildcards
specificity += pattern.replace(/\*/g, '').length;
// Position of wildcards affects specificity
if (pattern.startsWith('*@')) {
// Wildcard in local part
specificity += 10;
} else if (pattern.includes('@*')) {
// Wildcard in domain part
specificity += 20;
}
return specificity;
}
/**
* Check if email matches a specific pattern
* @param email Email address to check
* @param pattern Pattern to check against
* @returns True if matching, false otherwise
*/
public matchesPattern(email: string, pattern: string): boolean {
// Normalize if case-insensitive
const normalizedEmail = this.options.caseSensitive ? email : email.toLowerCase();
const normalizedPattern = this.options.caseSensitive ? pattern : pattern.toLowerCase();
// Exact match
if (normalizedEmail === normalizedPattern) {
return true;
}
// Convert glob pattern to regex
const regexPattern = this.globToRegExp(normalizedPattern);
return regexPattern.test(normalizedEmail);
}
/**
* Convert a glob pattern to a regular expression
* @param pattern Glob pattern
* @returns Regular expression
*/
private globToRegExp(pattern: string): RegExp {
// Escape special regex characters except * and ?
let regexString = pattern
.replace(/[.+^${}()|[\]\\]/g, '\\$&')
.replace(/\*/g, '.*')
.replace(/\?/g, '.');
return new RegExp(`^${regexString}$`);
}
/**
* Get all rules that match an email address
* @param email Email address to match
* @returns Array of matching rules
*/
public getAllMatchingRules(email: string): IDomainRule[] {
return this.options.domainRules.filter(rule => this.matchesPattern(email, rule.pattern));
}
/**
* Add a new routing rule
* @param rule Domain rule to add
*/
public addRule(rule: IDomainRule): void {
// Validate the rule
this.validateRule(rule);
// Add the rule
this.options.domainRules.push(rule);
// Clear cache since rules have changed
this.clearCache();
// Emit event
this.emit('ruleAdded', rule);
}
/**
* Validate a domain rule
* @param rule Rule to validate
*/
private validateRule(rule: IDomainRule): void {
// Pattern is required
if (!rule.pattern) {
throw new Error('Domain rule pattern is required');
}
// Mode is required
if (!rule.mode) {
throw new Error('Domain rule mode is required');
}
// Forward mode requires target
if (rule.mode === 'forward' && !rule.target) {
throw new Error('Forward mode requires target configuration');
}
// Forward mode target requires server
if (rule.mode === 'forward' && rule.target && !rule.target.server) {
throw new Error('Forward mode target requires server');
}
}
/**
* Update an existing rule
* @param pattern Pattern to update
* @param updates Updates to apply
* @returns True if rule was found and updated, false otherwise
*/
public updateRule(pattern: string, updates: Partial<IDomainRule>): boolean {
const ruleIndex = this.options.domainRules.findIndex(r => r.pattern === pattern);
if (ruleIndex === -1) {
return false;
}
// Get current rule
const currentRule = this.options.domainRules[ruleIndex];
// Create updated rule
const updatedRule: IDomainRule = {
...currentRule,
...updates
};
// Validate the updated rule
this.validateRule(updatedRule);
// Update the rule
this.options.domainRules[ruleIndex] = updatedRule;
// Clear cache since rules have changed
this.clearCache();
// Emit event
this.emit('ruleUpdated', updatedRule);
return true;
}
/**
* Remove a rule
* @param pattern Pattern to remove
* @returns True if rule was found and removed, false otherwise
*/
public removeRule(pattern: string): boolean {
const initialLength = this.options.domainRules.length;
this.options.domainRules = this.options.domainRules.filter(r => r.pattern !== pattern);
const removed = initialLength > this.options.domainRules.length;
if (removed) {
// Clear cache since rules have changed
this.clearCache();
// Emit event
this.emit('ruleRemoved', pattern);
}
return removed;
}
/**
* Get rule by pattern
* @param pattern Pattern to find
* @returns Rule with matching pattern or null if not found
*/
public getRule(pattern: string): IDomainRule | null {
return this.options.domainRules.find(r => r.pattern === pattern) || null;
}
/**
* Get all rules
* @returns Array of all domain rules
*/
public getRules(): IDomainRule[] {
return [...this.options.domainRules];
}
/**
* Update options
* @param options New options
*/
public updateOptions(options: Partial<IDomainRouterOptions>): void {
this.options = {
...this.options,
...options
};
// Clear cache if cache settings changed
if ('enableCache' in options || 'cacheSize' in options) {
this.clearCache();
}
// Emit event
this.emit('optionsUpdated', this.options);
}
/**
* Add an item to the pattern cache
* @param email Email address
* @param rule Matching rule or null
*/
private addToCache(email: string, rule: IDomainRule | null): void {
// If cache is disabled, do nothing
if (!this.options.enableCache) {
return;
}
// Add to cache
this.patternCache.set(email, rule);
// Check if cache size exceeds limit
if (this.patternCache.size > (this.options.cacheSize || 1000)) {
// Remove oldest entry (first in the Map)
const firstKey = this.patternCache.keys().next().value;
this.patternCache.delete(firstKey);
}
}
/**
* Clear pattern matching cache
*/
public clearCache(): void {
this.patternCache.clear();
this.emit('cacheCleared');
}
/**
* Update all domain rules at once
* @param rules New set of domain rules to replace existing ones
*/
public updateRules(rules: IDomainRule[]): void {
// Validate all rules
rules.forEach(rule => this.validateRule(rule));
// Replace all rules
this.options.domainRules = [...rules];
// Clear cache since rules have changed
this.clearCache();
// Emit event
this.emit('rulesUpdated', rules);
}
}

View File

@ -0,0 +1,129 @@
import * as plugins from '../plugins.js';
/**
* Email processing modes
*/
export type EmailProcessingMode = 'forward' | 'mta' | 'process';
/**
* Consolidated email configuration interface
*/
export interface IEmailConfig {
// Email server settings
ports: number[];
hostname: string;
maxMessageSize?: number;
// TLS configuration for email server
tls?: {
certPath?: string;
keyPath?: string;
caPath?: string;
minVersion?: string;
};
// Authentication for inbound connections
auth?: {
required?: boolean;
methods?: ('PLAIN' | 'LOGIN' | 'OAUTH2')[];
users?: Array<{username: string, password: string}>;
};
// Default routing for unmatched domains
defaultMode: EmailProcessingMode;
defaultServer?: string;
defaultPort?: number;
defaultTls?: boolean;
// Domain rules with glob pattern support
domainRules: IDomainRule[];
// Queue configuration for all email processing
queue?: {
storageType?: 'memory' | 'disk';
persistentPath?: string;
maxRetries?: number;
baseRetryDelay?: number;
maxRetryDelay?: number;
};
// Advanced MTA settings
mtaGlobalOptions?: IMtaOptions;
}
/**
* Domain rule interface for pattern-based routing
*/
export interface IDomainRule {
// Domain pattern (e.g., "*@example.com", "*@*.example.net")
pattern: string;
// Handling mode for this pattern
mode: EmailProcessingMode;
// Forward mode configuration
target?: {
server: string;
port?: number;
useTls?: boolean;
authentication?: {
user?: string;
pass?: string;
};
};
// MTA mode configuration
mtaOptions?: IMtaOptions;
// Process mode configuration
contentScanning?: boolean;
scanners?: IContentScanner[];
transformations?: ITransformation[];
// Rate limits for this domain
rateLimits?: {
maxMessagesPerMinute?: number;
maxRecipientsPerMessage?: number;
};
}
/**
* MTA options interface
*/
export interface IMtaOptions {
domain?: string;
allowLocalDelivery?: boolean;
localDeliveryPath?: string;
dkimSign?: boolean;
dkimOptions?: {
domainName: string;
keySelector: string;
privateKey: string;
};
smtpBanner?: string;
maxConnections?: number;
connTimeout?: number;
spoolDir?: string;
}
/**
* Content scanner interface
*/
export interface IContentScanner {
type: 'spam' | 'virus' | 'attachment';
threshold?: number;
action: 'tag' | 'reject';
blockedExtensions?: string[];
}
/**
* Transformation interface
*/
export interface ITransformation {
type: string;
header?: string;
value?: string;
domains?: string[];
append?: boolean;
[key: string]: any;
}

View File

@ -1,495 +0,0 @@
import * as plugins from '../plugins.js';
import type { ISmtpConfig, IContentScannerConfig, ITransformationConfig } from './classes.smtp.config.js';
import type { ISmtpSession } from './classes.smtp.server.js';
import { EventEmitter } from 'node:events';
// Create standalone types to avoid interface compatibility issues
interface AddressObject {
address?: string;
name?: string;
[key: string]: any;
}
interface ExtendedAddressObject {
value: AddressObject | AddressObject[];
[key: string]: any;
}
// Don't extend ParsedMail directly to avoid type compatibility issues
interface ExtendedParsedMail {
// Basic properties from ParsedMail
subject?: string;
text?: string;
textAsHtml?: string;
html?: string;
attachments?: Array<any>;
headers?: Map<string, any>;
headerLines?: Array<{key: string; line: string}>;
messageId?: string;
date?: Date;
// Extended address objects
from?: ExtendedAddressObject;
to?: ExtendedAddressObject;
cc?: ExtendedAddressObject;
bcc?: ExtendedAddressObject;
// Add any other properties we need
[key: string]: any;
}
/**
* Email metadata extracted from parsed mail
*/
export interface IEmailMetadata {
id: string;
from: string;
fromDomain: string;
to: string[];
toDomains: string[];
subject?: string;
size: number;
hasAttachments: boolean;
receivedAt: Date;
clientIp: string;
authenticated: boolean;
authUser?: string;
}
/**
* Content scanning result
*/
export interface IScanResult {
id: string;
spamScore?: number;
hasVirus?: boolean;
blockedAttachments?: string[];
action: 'accept' | 'tag' | 'reject';
reason?: string;
}
/**
* Routing decision for an email
*/
export interface IRoutingDecision {
id: string;
targetServer: string;
port: number;
useTls: boolean;
authentication?: {
user?: string;
pass?: string;
};
headers?: Array<{
name: string;
value: string;
append?: boolean;
}>;
signDkim?: boolean;
dkimOptions?: {
domainName: string;
keySelector: string;
privateKey: string;
};
}
/**
* Complete processing result
*/
export interface IProcessingResult {
id: string;
metadata: IEmailMetadata;
scanResult: IScanResult;
routing: IRoutingDecision;
modifiedMessage?: ExtendedParsedMail;
originalMessage: ExtendedParsedMail;
rawData: string;
action: 'queue' | 'reject';
session: ISmtpSession;
}
/**
* Email Processor handles email processing pipeline
*/
export class EmailProcessor extends EventEmitter {
private config: ISmtpConfig;
private processingQueue: Map<string, IProcessingResult> = new Map();
/**
* Create a new email processor
* @param config SMTP configuration
*/
constructor(config: ISmtpConfig) {
super();
this.config = config;
}
/**
* Process an email message
* @param message Parsed email message
* @param rawData Raw email data
* @param session SMTP session
*/
public async processEmail(
message: ExtendedParsedMail,
rawData: string,
session: ISmtpSession
): Promise<IProcessingResult> {
try {
// Generate ID for this processing task
const id = plugins.uuid.v4();
// Extract metadata
const metadata = await this.extractMetadata(message, session, id);
// Scan content if enabled
const scanResult = await this.scanContent(message, metadata);
// If content scanning rejects the message, return early
if (scanResult.action === 'reject') {
const result: IProcessingResult = {
id,
metadata,
scanResult,
routing: {
id,
targetServer: '',
port: 0,
useTls: false
},
originalMessage: message,
rawData,
action: 'reject',
session
};
this.emit('rejected', result);
return result;
}
// Determine routing
const routing = await this.determineRouting(message, metadata);
// Apply transformations
const modifiedMessage = await this.applyTransformations(message, routing, scanResult);
// Create processing result
const result: IProcessingResult = {
id,
metadata,
scanResult,
routing,
modifiedMessage,
originalMessage: message,
rawData,
action: 'queue',
session
};
// Add to processing queue
this.processingQueue.set(id, result);
// Emit processed event
this.emit('processed', result);
return result;
} catch (error) {
console.error('Error processing email:', error);
throw error;
}
}
/**
* Extract metadata from email message
* @param message Parsed email
* @param session SMTP session
* @param id Processing ID
*/
private async extractMetadata(
message: ExtendedParsedMail,
session: ISmtpSession,
id: string
): Promise<IEmailMetadata> {
// Extract sender information
let from = '';
if (message.from && message.from.value) {
const fromValue = message.from.value;
if (Array.isArray(fromValue)) {
from = fromValue[0]?.address || '';
} else if (typeof fromValue === 'object' && 'address' in fromValue && fromValue.address) {
from = fromValue.address;
}
}
const fromDomain = from.split('@')[1] || '';
// Extract recipient information
let to: string[] = [];
if (message.to && message.to.value) {
const toValue = message.to.value;
if (Array.isArray(toValue)) {
to = toValue
.map(addr => (addr && 'address' in addr) ? addr.address || '' : '')
.filter(Boolean);
} else if (typeof toValue === 'object' && 'address' in toValue && toValue.address) {
to = [toValue.address];
}
}
const toDomains = to.map(addr => addr.split('@')[1] || '');
// Create metadata
return {
id,
from,
fromDomain,
to,
toDomains,
subject: message.subject,
size: Buffer.byteLength(message.html || message.textAsHtml || message.text || ''),
hasAttachments: message.attachments?.length > 0,
receivedAt: new Date(),
clientIp: session.remoteAddress,
authenticated: !!session.user,
authUser: session.user?.username
};
}
/**
* Scan email content
* @param message Parsed email
* @param metadata Email metadata
*/
private async scanContent(
message: ExtendedParsedMail,
metadata: IEmailMetadata
): Promise<IScanResult> {
// Skip if content scanning is disabled
if (!this.config.contentScanning || !this.config.scanners?.length) {
return {
id: metadata.id,
action: 'accept'
};
}
// Default result
const result: IScanResult = {
id: metadata.id,
action: 'accept'
};
// Placeholder for scanning results
let spamFound = false;
let virusFound = false;
const blockedAttachments: string[] = [];
// Apply each scanner
for (const scanner of this.config.scanners) {
switch (scanner.type) {
case 'spam':
// Placeholder for spam scanning
// In a real implementation, we would use a spam scanning library
const spamScore = Math.random() * 10; // Fake score between 0-10
result.spamScore = spamScore;
if (scanner.threshold && spamScore > scanner.threshold) {
spamFound = true;
if (scanner.action === 'reject') {
result.action = 'reject';
result.reason = `Spam score ${spamScore} exceeds threshold ${scanner.threshold}`;
} else if (scanner.action === 'tag') {
result.action = 'tag';
}
}
break;
case 'virus':
// Placeholder for virus scanning
// In a real implementation, we would use a virus scanning library
const hasVirus = false; // Fake result
result.hasVirus = hasVirus;
if (hasVirus) {
virusFound = true;
if (scanner.action === 'reject') {
result.action = 'reject';
result.reason = 'Message contains virus';
} else if (scanner.action === 'tag') {
result.action = 'tag';
}
}
break;
case 'attachment':
// Check attachments against blocked extensions
if (scanner.blockedExtensions && message.attachments?.length) {
for (const attachment of message.attachments) {
const filename = attachment.filename || '';
const extension = filename.substring(filename.lastIndexOf('.')).toLowerCase();
if (scanner.blockedExtensions.includes(extension)) {
blockedAttachments.push(filename);
if (scanner.action === 'reject') {
result.action = 'reject';
result.reason = `Blocked attachment type: ${extension}`;
} else if (scanner.action === 'tag') {
result.action = 'tag';
}
}
}
}
break;
}
// Set blocked attachments in result if any
if (blockedAttachments.length) {
result.blockedAttachments = blockedAttachments;
}
}
return result;
}
/**
* Determine routing for an email
* @param message Parsed email
* @param metadata Email metadata
*/
private async determineRouting(
message: ExtendedParsedMail,
metadata: IEmailMetadata
): Promise<IRoutingDecision> {
// Start with the default routing
const defaultRouting: IRoutingDecision = {
id: metadata.id,
targetServer: this.config.defaultServer,
port: this.config.defaultPort || 25,
useTls: this.config.useTls || false
};
// If no domain configs, use default routing
if (!this.config.domainConfigs?.length) {
return defaultRouting;
}
// Try to find matching domain config based on recipient domains
for (const domain of metadata.toDomains) {
for (const domainConfig of this.config.domainConfigs) {
// Check if domain matches any of the configured domains
if (domainConfig.domains.some(configDomain => this.domainMatches(domain, configDomain))) {
// Create routing from domain config
const routing: IRoutingDecision = {
id: metadata.id,
targetServer: domainConfig.targetIPs[0], // Use first target IP
port: domainConfig.port || 25,
useTls: domainConfig.useTls || false
};
// Add authentication if specified
if (domainConfig.authentication) {
routing.authentication = domainConfig.authentication;
}
// Add header modifications if specified
if (domainConfig.addHeaders && domainConfig.headerInfo?.length) {
routing.headers = domainConfig.headerInfo.map(h => ({
name: h.name,
value: h.value,
append: false
}));
}
// Add DKIM signing if specified
if (domainConfig.signDkim && domainConfig.dkimOptions) {
routing.signDkim = true;
routing.dkimOptions = domainConfig.dkimOptions;
}
return routing;
}
}
}
// No match found, use default routing
return defaultRouting;
}
/**
* Apply transformations to the email
* @param message Original parsed email
* @param routing Routing decision
* @param scanResult Scan result
*/
private async applyTransformations(
message: ExtendedParsedMail,
routing: IRoutingDecision,
scanResult: IScanResult
): Promise<ExtendedParsedMail> {
// Skip if no transformations configured
if (!this.config.transformations?.length) {
return message;
}
// Clone the message for modifications
// Note: In a real implementation, we would need to properly clone the message
const modifiedMessage = { ...message };
// Apply each transformation
for (const transformation of this.config.transformations) {
switch (transformation.type) {
case 'addHeader':
// Add a header to the message
if (transformation.header && transformation.value) {
// In a real implementation, we would modify the raw message headers
console.log(`Adding header ${transformation.header}: ${transformation.value}`);
}
break;
case 'dkimSign':
// Sign the message with DKIM
if (routing.signDkim && routing.dkimOptions) {
// In a real implementation, we would use mailauth.dkimSign
console.log(`Signing message with DKIM for domain ${routing.dkimOptions.domainName}`);
}
break;
}
}
return modifiedMessage;
}
/**
* Check if a domain matches a pattern (including wildcards)
* @param domain Domain to check
* @param pattern Pattern to match against
*/
private domainMatches(domain: string, pattern: string): boolean {
domain = domain.toLowerCase();
pattern = pattern.toLowerCase();
// Exact match
if (domain === pattern) {
return true;
}
// Wildcard match (*.example.com)
if (pattern.startsWith('*.')) {
const suffix = pattern.slice(2);
return domain.endsWith(suffix) && domain.length > suffix.length;
}
return false;
}
/**
* Update processor configuration
* @param config New configuration
*/
public updateConfig(config: Partial<ISmtpConfig>): void {
this.config = {
...this.config,
...config
};
this.emit('configUpdated', this.config);
}
}

View File

@ -0,0 +1,897 @@
import * as plugins from '../plugins.js';
import { EventEmitter } from 'node:events';
import { logger } from '../logger.js';
import { SecurityLogger, SecurityLogLevel, SecurityEventType } from '../security/index.js';
/**
* Interface for rate limit configuration
*/
export interface IRateLimitConfig {
maxMessagesPerMinute?: number;
maxRecipientsPerMessage?: number;
maxConnectionsPerIP?: number;
maxErrorsPerIP?: number;
maxAuthFailuresPerIP?: number;
blockDuration?: number; // in milliseconds
}
/**
* Interface for hierarchical rate limits
*/
export interface IHierarchicalRateLimits {
// Global rate limits (applied to all traffic)
global: IRateLimitConfig;
// Pattern-specific rate limits (applied to matching patterns)
patterns?: Record<string, IRateLimitConfig>;
// IP-specific rate limits (applied to specific IPs)
ips?: Record<string, IRateLimitConfig>;
// Temporary blocks list and their expiry times
blocks?: Record<string, number>; // IP to expiry timestamp
}
/**
* Counter interface for rate limiting
*/
interface ILimitCounter {
count: number;
lastReset: number;
recipients: number;
errors: number;
authFailures: number;
connections: number;
}
/**
* Rate limiter statistics
*/
export interface IRateLimiterStats {
activeCounters: number;
totalBlocked: number;
currentlyBlocked: number;
byPattern: Record<string, {
messagesPerMinute: number;
totalMessages: number;
totalBlocked: number;
}>;
byIp: Record<string, {
messagesPerMinute: number;
totalMessages: number;
totalBlocked: number;
connections: number;
errors: number;
authFailures: number;
blocked: boolean;
}>;
}
/**
* Result of a rate limit check
*/
export interface IRateLimitResult {
allowed: boolean;
reason?: string;
limit?: number;
current?: number;
resetIn?: number; // milliseconds until reset
}
/**
* Unified rate limiter for all email processing modes
*/
export class UnifiedRateLimiter extends EventEmitter {
private config: IHierarchicalRateLimits;
private counters: Map<string, ILimitCounter> = new Map();
private patternCounters: Map<string, ILimitCounter> = new Map();
private ipCounters: Map<string, ILimitCounter> = new Map();
private cleanupInterval?: NodeJS.Timeout;
private stats: IRateLimiterStats;
/**
* Create a new unified rate limiter
* @param config Rate limit configuration
*/
constructor(config: IHierarchicalRateLimits) {
super();
// Set default configuration
this.config = {
global: {
maxMessagesPerMinute: config.global.maxMessagesPerMinute || 100,
maxRecipientsPerMessage: config.global.maxRecipientsPerMessage || 100,
maxConnectionsPerIP: config.global.maxConnectionsPerIP || 20,
maxErrorsPerIP: config.global.maxErrorsPerIP || 10,
maxAuthFailuresPerIP: config.global.maxAuthFailuresPerIP || 5,
blockDuration: config.global.blockDuration || 3600000 // 1 hour
},
patterns: config.patterns || {},
ips: config.ips || {},
blocks: config.blocks || {}
};
// Initialize statistics
this.stats = {
activeCounters: 0,
totalBlocked: 0,
currentlyBlocked: 0,
byPattern: {},
byIp: {}
};
// Start cleanup interval
this.startCleanupInterval();
}
/**
* Start the cleanup interval
*/
private startCleanupInterval(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
}
// Run cleanup every minute
this.cleanupInterval = setInterval(() => this.cleanup(), 60000);
}
/**
* Stop the cleanup interval
*/
public stop(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = undefined;
}
}
/**
* Clean up expired counters and blocks
*/
private cleanup(): void {
const now = Date.now();
// Clean up expired blocks
if (this.config.blocks) {
for (const [ip, expiry] of Object.entries(this.config.blocks)) {
if (expiry <= now) {
delete this.config.blocks[ip];
logger.log('info', `Rate limit block expired for IP ${ip}`);
// Update statistics
if (this.stats.byIp[ip]) {
this.stats.byIp[ip].blocked = false;
}
this.stats.currentlyBlocked--;
}
}
}
// Clean up old counters (older than 10 minutes)
const cutoff = now - 600000;
// Clean global counters
for (const [key, counter] of this.counters.entries()) {
if (counter.lastReset < cutoff) {
this.counters.delete(key);
}
}
// Clean pattern counters
for (const [key, counter] of this.patternCounters.entries()) {
if (counter.lastReset < cutoff) {
this.patternCounters.delete(key);
}
}
// Clean IP counters
for (const [key, counter] of this.ipCounters.entries()) {
if (counter.lastReset < cutoff) {
this.ipCounters.delete(key);
}
}
// Update statistics
this.updateStats();
}
/**
* Check if a message is allowed by rate limits
* @param email Email address
* @param ip IP address
* @param recipients Number of recipients
* @param pattern Matched pattern
* @returns Result of rate limit check
*/
public checkMessageLimit(email: string, ip: string, recipients: number, pattern?: string): IRateLimitResult {
// Check if IP is blocked
if (this.isIpBlocked(ip)) {
return {
allowed: false,
reason: 'IP is blocked',
resetIn: this.getBlockReleaseTime(ip)
};
}
// Check global message rate limit
const globalResult = this.checkGlobalMessageLimit(email);
if (!globalResult.allowed) {
return globalResult;
}
// Check pattern-specific limit if pattern is provided
if (pattern) {
const patternResult = this.checkPatternMessageLimit(pattern);
if (!patternResult.allowed) {
return patternResult;
}
}
// Check IP-specific limit
const ipResult = this.checkIpMessageLimit(ip);
if (!ipResult.allowed) {
return ipResult;
}
// Check recipient limit
const recipientResult = this.checkRecipientLimit(email, recipients, pattern);
if (!recipientResult.allowed) {
return recipientResult;
}
// All checks passed
return { allowed: true };
}
/**
* Check global message rate limit
* @param email Email address
*/
private checkGlobalMessageLimit(email: string): IRateLimitResult {
const now = Date.now();
const limit = this.config.global.maxMessagesPerMinute!;
if (!limit) {
return { allowed: true };
}
// Get or create counter
const key = 'global';
let counter = this.counters.get(key);
if (!counter) {
counter = {
count: 0,
lastReset: now,
recipients: 0,
errors: 0,
authFailures: 0,
connections: 0
};
this.counters.set(key, counter);
}
// Check if counter needs to be reset
if (now - counter.lastReset >= 60000) {
counter.count = 0;
counter.lastReset = now;
}
// Check if limit is exceeded
if (counter.count >= limit) {
// Calculate reset time
const resetIn = 60000 - (now - counter.lastReset);
return {
allowed: false,
reason: 'Global message rate limit exceeded',
limit,
current: counter.count,
resetIn
};
}
// Increment counter
counter.count++;
// Update statistics
this.updateStats();
return { allowed: true };
}
/**
* Check pattern-specific message rate limit
* @param pattern Pattern to check
*/
private checkPatternMessageLimit(pattern: string): IRateLimitResult {
const now = Date.now();
// Get pattern-specific limit or use global
const patternConfig = this.config.patterns?.[pattern];
const limit = patternConfig?.maxMessagesPerMinute || this.config.global.maxMessagesPerMinute!;
if (!limit) {
return { allowed: true };
}
// Get or create counter
let counter = this.patternCounters.get(pattern);
if (!counter) {
counter = {
count: 0,
lastReset: now,
recipients: 0,
errors: 0,
authFailures: 0,
connections: 0
};
this.patternCounters.set(pattern, counter);
// Initialize pattern stats if needed
if (!this.stats.byPattern[pattern]) {
this.stats.byPattern[pattern] = {
messagesPerMinute: 0,
totalMessages: 0,
totalBlocked: 0
};
}
}
// Check if counter needs to be reset
if (now - counter.lastReset >= 60000) {
counter.count = 0;
counter.lastReset = now;
}
// Check if limit is exceeded
if (counter.count >= limit) {
// Calculate reset time
const resetIn = 60000 - (now - counter.lastReset);
// Update statistics
this.stats.byPattern[pattern].totalBlocked++;
this.stats.totalBlocked++;
return {
allowed: false,
reason: `Pattern "${pattern}" message rate limit exceeded`,
limit,
current: counter.count,
resetIn
};
}
// Increment counter
counter.count++;
// Update statistics
this.stats.byPattern[pattern].messagesPerMinute = counter.count;
this.stats.byPattern[pattern].totalMessages++;
return { allowed: true };
}
/**
* Check IP-specific message rate limit
* @param ip IP address
*/
private checkIpMessageLimit(ip: string): IRateLimitResult {
const now = Date.now();
// Get IP-specific limit or use global
const ipConfig = this.config.ips?.[ip];
const limit = ipConfig?.maxMessagesPerMinute || this.config.global.maxMessagesPerMinute!;
if (!limit) {
return { allowed: true };
}
// Get or create counter
let counter = this.ipCounters.get(ip);
if (!counter) {
counter = {
count: 0,
lastReset: now,
recipients: 0,
errors: 0,
authFailures: 0,
connections: 0
};
this.ipCounters.set(ip, counter);
// Initialize IP stats if needed
if (!this.stats.byIp[ip]) {
this.stats.byIp[ip] = {
messagesPerMinute: 0,
totalMessages: 0,
totalBlocked: 0,
connections: 0,
errors: 0,
authFailures: 0,
blocked: false
};
}
}
// Check if counter needs to be reset
if (now - counter.lastReset >= 60000) {
counter.count = 0;
counter.lastReset = now;
}
// Check if limit is exceeded
if (counter.count >= limit) {
// Calculate reset time
const resetIn = 60000 - (now - counter.lastReset);
// Update statistics
this.stats.byIp[ip].totalBlocked++;
this.stats.totalBlocked++;
return {
allowed: false,
reason: `IP ${ip} message rate limit exceeded`,
limit,
current: counter.count,
resetIn
};
}
// Increment counter
counter.count++;
// Update statistics
this.stats.byIp[ip].messagesPerMinute = counter.count;
this.stats.byIp[ip].totalMessages++;
return { allowed: true };
}
/**
* Check recipient limit
* @param email Email address
* @param recipients Number of recipients
* @param pattern Matched pattern
*/
private checkRecipientLimit(email: string, recipients: number, pattern?: string): IRateLimitResult {
// Get pattern-specific limit if available
let limit = this.config.global.maxRecipientsPerMessage!;
if (pattern && this.config.patterns?.[pattern]?.maxRecipientsPerMessage) {
limit = this.config.patterns[pattern].maxRecipientsPerMessage!;
}
if (!limit) {
return { allowed: true };
}
// Check if limit is exceeded
if (recipients > limit) {
return {
allowed: false,
reason: 'Recipient limit exceeded',
limit,
current: recipients
};
}
return { allowed: true };
}
/**
* Record a connection from an IP
* @param ip IP address
* @returns Result of rate limit check
*/
public recordConnection(ip: string): IRateLimitResult {
const now = Date.now();
// Check if IP is blocked
if (this.isIpBlocked(ip)) {
return {
allowed: false,
reason: 'IP is blocked',
resetIn: this.getBlockReleaseTime(ip)
};
}
// Get IP-specific limit or use global
const ipConfig = this.config.ips?.[ip];
const limit = ipConfig?.maxConnectionsPerIP || this.config.global.maxConnectionsPerIP!;
if (!limit) {
return { allowed: true };
}
// Get or create counter
let counter = this.ipCounters.get(ip);
if (!counter) {
counter = {
count: 0,
lastReset: now,
recipients: 0,
errors: 0,
authFailures: 0,
connections: 0
};
this.ipCounters.set(ip, counter);
// Initialize IP stats if needed
if (!this.stats.byIp[ip]) {
this.stats.byIp[ip] = {
messagesPerMinute: 0,
totalMessages: 0,
totalBlocked: 0,
connections: 0,
errors: 0,
authFailures: 0,
blocked: false
};
}
}
// Check if counter needs to be reset
if (now - counter.lastReset >= 60000) {
counter.connections = 0;
counter.lastReset = now;
}
// Check if limit is exceeded
if (counter.connections >= limit) {
// Calculate reset time
const resetIn = 60000 - (now - counter.lastReset);
// Update statistics
this.stats.byIp[ip].totalBlocked++;
this.stats.totalBlocked++;
return {
allowed: false,
reason: `IP ${ip} connection rate limit exceeded`,
limit,
current: counter.connections,
resetIn
};
}
// Increment counter
counter.connections++;
// Update statistics
this.stats.byIp[ip].connections = counter.connections;
return { allowed: true };
}
/**
* Record an error from an IP
* @param ip IP address
* @returns True if IP should be blocked
*/
public recordError(ip: string): boolean {
const now = Date.now();
// Get IP-specific limit or use global
const ipConfig = this.config.ips?.[ip];
const limit = ipConfig?.maxErrorsPerIP || this.config.global.maxErrorsPerIP!;
if (!limit) {
return false;
}
// Get or create counter
let counter = this.ipCounters.get(ip);
if (!counter) {
counter = {
count: 0,
lastReset: now,
recipients: 0,
errors: 0,
authFailures: 0,
connections: 0
};
this.ipCounters.set(ip, counter);
// Initialize IP stats if needed
if (!this.stats.byIp[ip]) {
this.stats.byIp[ip] = {
messagesPerMinute: 0,
totalMessages: 0,
totalBlocked: 0,
connections: 0,
errors: 0,
authFailures: 0,
blocked: false
};
}
}
// Check if counter needs to be reset
if (now - counter.lastReset >= 60000) {
counter.errors = 0;
counter.lastReset = now;
}
// Increment counter
counter.errors++;
// Update statistics
this.stats.byIp[ip].errors = counter.errors;
// Check if limit is exceeded
if (counter.errors >= limit) {
// Block the IP
this.blockIp(ip);
logger.log('warn', `IP ${ip} blocked due to excessive errors (${counter.errors}/${limit})`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.RATE_LIMITING,
message: 'IP blocked due to excessive errors',
ipAddress: ip,
details: {
errors: counter.errors,
limit
},
success: false
});
return true;
}
return false;
}
/**
* Record an authentication failure from an IP
* @param ip IP address
* @returns True if IP should be blocked
*/
public recordAuthFailure(ip: string): boolean {
const now = Date.now();
// Get IP-specific limit or use global
const ipConfig = this.config.ips?.[ip];
const limit = ipConfig?.maxAuthFailuresPerIP || this.config.global.maxAuthFailuresPerIP!;
if (!limit) {
return false;
}
// Get or create counter
let counter = this.ipCounters.get(ip);
if (!counter) {
counter = {
count: 0,
lastReset: now,
recipients: 0,
errors: 0,
authFailures: 0,
connections: 0
};
this.ipCounters.set(ip, counter);
// Initialize IP stats if needed
if (!this.stats.byIp[ip]) {
this.stats.byIp[ip] = {
messagesPerMinute: 0,
totalMessages: 0,
totalBlocked: 0,
connections: 0,
errors: 0,
authFailures: 0,
blocked: false
};
}
}
// Check if counter needs to be reset
if (now - counter.lastReset >= 60000) {
counter.authFailures = 0;
counter.lastReset = now;
}
// Increment counter
counter.authFailures++;
// Update statistics
this.stats.byIp[ip].authFailures = counter.authFailures;
// Check if limit is exceeded
if (counter.authFailures >= limit) {
// Block the IP
this.blockIp(ip);
logger.log('warn', `IP ${ip} blocked due to excessive authentication failures (${counter.authFailures}/${limit})`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'IP blocked due to excessive authentication failures',
ipAddress: ip,
details: {
authFailures: counter.authFailures,
limit
},
success: false
});
return true;
}
return false;
}
/**
* Block an IP address
* @param ip IP address to block
* @param duration Override the default block duration (milliseconds)
*/
public blockIp(ip: string, duration?: number): void {
if (!this.config.blocks) {
this.config.blocks = {};
}
// Set block expiry time
const expiry = Date.now() + (duration || this.config.global.blockDuration || 3600000);
this.config.blocks[ip] = expiry;
// Update statistics
if (!this.stats.byIp[ip]) {
this.stats.byIp[ip] = {
messagesPerMinute: 0,
totalMessages: 0,
totalBlocked: 0,
connections: 0,
errors: 0,
authFailures: 0,
blocked: false
};
}
this.stats.byIp[ip].blocked = true;
this.stats.currentlyBlocked++;
// Emit event
this.emit('ipBlocked', {
ip,
expiry,
duration: duration || this.config.global.blockDuration
});
logger.log('warn', `IP ${ip} blocked until ${new Date(expiry).toISOString()}`);
}
/**
* Unblock an IP address
* @param ip IP address to unblock
*/
public unblockIp(ip: string): void {
if (!this.config.blocks) {
return;
}
// Remove block
delete this.config.blocks[ip];
// Update statistics
if (this.stats.byIp[ip]) {
this.stats.byIp[ip].blocked = false;
this.stats.currentlyBlocked--;
}
// Emit event
this.emit('ipUnblocked', { ip });
logger.log('info', `IP ${ip} unblocked`);
}
/**
* Check if an IP is blocked
* @param ip IP address to check
*/
public isIpBlocked(ip: string): boolean {
if (!this.config.blocks) {
return false;
}
// Check if IP is in blocks
if (!(ip in this.config.blocks)) {
return false;
}
// Check if block has expired
const expiry = this.config.blocks[ip];
if (expiry <= Date.now()) {
// Remove expired block
delete this.config.blocks[ip];
// Update statistics
if (this.stats.byIp[ip]) {
this.stats.byIp[ip].blocked = false;
this.stats.currentlyBlocked--;
}
return false;
}
return true;
}
/**
* Get the time until a block is released
* @param ip IP address
* @returns Milliseconds until release or 0 if not blocked
*/
public getBlockReleaseTime(ip: string): number {
if (!this.config.blocks || !(ip in this.config.blocks)) {
return 0;
}
const expiry = this.config.blocks[ip];
const now = Date.now();
return expiry > now ? expiry - now : 0;
}
/**
* Update rate limiter statistics
*/
private updateStats(): void {
// Update active counters count
this.stats.activeCounters = this.counters.size + this.patternCounters.size + this.ipCounters.size;
// Emit statistics update
this.emit('statsUpdated', this.stats);
}
/**
* Get rate limiter statistics
*/
public getStats(): IRateLimiterStats {
return { ...this.stats };
}
/**
* Update rate limiter configuration
* @param config New configuration
*/
public updateConfig(config: Partial<IHierarchicalRateLimits>): void {
if (config.global) {
this.config.global = {
...this.config.global,
...config.global
};
}
if (config.patterns) {
this.config.patterns = {
...this.config.patterns,
...config.patterns
};
}
if (config.ips) {
this.config.ips = {
...this.config.ips,
...config.ips
};
}
logger.log('info', 'Rate limiter configuration updated');
}
/**
* Get configuration for debugging
*/
public getConfig(): IHierarchicalRateLimits {
return { ...this.config };
}
}

View File

@ -1,170 +0,0 @@
import * as plugins from '../plugins.js';
/**
* Configuration for SMTP authentication
*/
export interface ISmtpAuthConfig {
/** Whether authentication is required */
required?: boolean;
/** Supported authentication methods */
methods?: ('PLAIN' | 'LOGIN' | 'OAUTH2')[];
/** Static user credentials */
users?: Array<{username: string, password: string}>;
/** LDAP URL for authentication */
ldapUrl?: string;
}
/**
* Configuration for TLS in SMTP connections
*/
export interface ISmtpTlsConfig {
/** Path to certificate file */
certPath?: string;
/** Path to key file */
keyPath?: string;
/** Path to CA certificate */
caPath?: string;
/** Minimum TLS version */
minVersion?: string;
/** Whether to use STARTTLS upgrade or implicit TLS */
useStartTls?: boolean;
/** Cipher suite for TLS */
ciphers?: string;
}
/**
* Configuration for content scanning
*/
export interface IContentScannerConfig {
/** Type of scanner */
type: 'spam' | 'virus' | 'attachment';
/** Threshold for spam detection */
threshold?: number;
/** Action to take when content matches scanning criteria */
action: 'tag' | 'reject';
/** File extensions to block (for attachment scanner) */
blockedExtensions?: string[];
}
/**
* Configuration for email transformations
*/
export interface ITransformationConfig {
/** Type of transformation */
type: string;
/** Header name for adding/modifying headers */
header?: string;
/** Header value for adding/modifying headers */
value?: string;
/** Domains for DKIM signing */
domains?: string[];
/** Whether to append to existing header or replace */
append?: boolean;
/** Additional transformation parameters */
[key: string]: any;
}
/**
* Configuration for DKIM signing
*/
export interface IDkimConfig {
/** Domain name for DKIM signature */
domainName: string;
/** Selector for DKIM */
keySelector: string;
/** Private key for DKIM signing */
privateKey: string;
}
/**
* Domain-specific routing configuration
*/
export interface ISmtpDomainConfig {
/** Domains this configuration applies to */
domains: string[];
/** Target SMTP servers for this domain */
targetIPs: string[];
/** Target port */
port?: number;
/** Whether to use TLS when connecting to target */
useTls?: boolean;
/** Authentication credentials for target server */
authentication?: {
user?: string;
pass?: string;
};
/** Allowed client IPs */
allowedIPs?: string[];
/** Rate limits for this domain */
rateLimits?: {
maxMessagesPerMinute?: number;
maxRecipientsPerMessage?: number;
};
/** Whether to add custom headers */
addHeaders?: boolean;
/** Headers to add */
headerInfo?: Array<{
name: string;
value: string;
}>;
/** Whether to sign emails with DKIM */
signDkim?: boolean;
/** DKIM configuration */
dkimOptions?: IDkimConfig;
}
/**
* Queue configuration
*/
export interface IQueueConfig {
/** Storage type for queue */
storageType?: 'memory' | 'disk';
/** Path for disk storage */
persistentPath?: string;
/** Maximum retry attempts */
maxRetries?: number;
/** Base delay between retries (ms) */
baseRetryDelay?: number;
/** Maximum delay between retries (ms) */
maxRetryDelay?: number;
/** Maximum queue size */
maxQueueSize?: number;
}
/**
* Complete SMTP configuration
*/
export interface ISmtpConfig {
/** SMTP ports to listen on */
ports: number[];
/** Hostname for SMTP server */
hostname: string;
/** Banner text for SMTP server */
banner?: string;
/** Maximum message size in bytes */
maxMessageSize?: number;
/** TLS configuration */
tls?: ISmtpTlsConfig;
/** Authentication configuration */
auth?: ISmtpAuthConfig;
/** Domain-specific configurations */
domainConfigs: ISmtpDomainConfig[];
/** Default routing */
defaultServer: string;
defaultPort?: number;
useTls?: boolean;
/** Content scanning configuration */
contentScanning?: boolean;
scanners?: IContentScannerConfig[];
/** Message transformations */
transformations?: ITransformationConfig[];
/** Queue configuration */
queue?: IQueueConfig;
}

View File

@ -1,423 +0,0 @@
import * as plugins from '../plugins.js';
import { Readable } from 'node:stream';
import type { ISmtpConfig, ISmtpAuthConfig } from './classes.smtp.config.js';
import { EventEmitter } from 'node:events';
/**
* Connection session information
*/
export interface ISmtpSession {
id: string;
remoteAddress: string;
remotePort: number;
clientHostname?: string;
secure: boolean;
transmissionType?: 'SMTP' | 'ESMTP';
user?: {
username: string;
[key: string]: any;
};
envelope?: {
mailFrom: {
address: string;
args: any;
};
rcptTo: Array<{
address: string;
args: any;
}>;
};
}
/**
* Authentication data
*/
export interface IAuthData {
method: string;
username: string;
password: string;
}
/**
* SMTP Server class for receiving emails
*/
export class SmtpServer extends EventEmitter {
private config: ISmtpConfig;
private server: any; // Will be SMTPServer from smtp-server once we add the dependency
private incomingConnections: Map<string, ISmtpSession> = new Map();
/**
* Create a new SMTP server
* @param config SMTP server configuration
*/
constructor(config: ISmtpConfig) {
super();
this.config = config;
}
/**
* Initialize and start the SMTP server
*/
public async start(): Promise<void> {
try {
// This is a placeholder for the actual server creation
// In the real implementation, we would use the smtp-server package
console.log(`Starting SMTP server on ports ${this.config.ports.join(', ')}`);
// Setup TLS options if provided
const tlsOptions = this.config.tls ? {
key: this.config.tls.keyPath ? await plugins.fs.promises.readFile(this.config.tls.keyPath, 'utf8') : undefined,
cert: this.config.tls.certPath ? await plugins.fs.promises.readFile(this.config.tls.certPath, 'utf8') : undefined,
ca: this.config.tls.caPath ? await plugins.fs.promises.readFile(this.config.tls.caPath, 'utf8') : undefined,
minVersion: this.config.tls.minVersion || 'TLSv1.2',
ciphers: this.config.tls.ciphers
} : undefined;
// Create the server
// Note: In the actual implementation, this would use SMTPServer from smtp-server
this.server = {
// Placeholder for server instance
async close() {
console.log('SMTP server closed');
}
};
// Set up event handlers
this.setupEventHandlers();
// Listen on all specified ports
for (const port of this.config.ports) {
// In actual implementation, this would call server.listen(port)
console.log(`SMTP server listening on port ${port}`);
}
this.emit('started');
} catch (error) {
console.error('Failed to start SMTP server:', error);
throw error;
}
}
/**
* Stop the SMTP server
*/
public async stop(): Promise<void> {
try {
if (this.server) {
// Close the server
await this.server.close();
this.server = null;
// Clear connection tracking
this.incomingConnections.clear();
this.emit('stopped');
}
} catch (error) {
console.error('Error stopping SMTP server:', error);
throw error;
}
}
/**
* Set up event handlers for the SMTP server
*/
private setupEventHandlers(): void {
// These would be connected to actual server events in implementation
// Connection handler
this.onConnect((session, callback) => {
// Store connection in tracking map
this.incomingConnections.set(session.id, session);
// Check if connection is allowed based on IP
if (!this.isIpAllowed(session.remoteAddress)) {
return callback(new Error('Connection refused'));
}
// Accept the connection
callback();
});
// Authentication handler
this.onAuth((auth, session, callback) => {
// Skip auth check if not required
if (!this.config.auth?.required) {
return callback(null, { user: auth.username });
}
// Check authentication
if (this.authenticateUser(auth)) {
return callback(null, { user: auth.username });
}
// Authentication failed
callback(new Error('Invalid credentials'));
});
// Sender validation
this.onMailFrom((address, session, callback) => {
// Validate sender address if needed
// Accept the sender
callback();
});
// Recipient validation
this.onRcptTo((address, session, callback) => {
// Validate recipient address
// Check if we handle this domain
if (!this.isDomainHandled(address.address.split('@')[1])) {
return callback(new Error('Domain not handled by this server'));
}
// Accept the recipient
callback();
});
// Message data handler
this.onData((stream, session, callback) => {
// Process the incoming message
this.processMessageData(stream, session)
.then(() => callback())
.catch(err => callback(err));
});
}
/**
* Process incoming message data
* @param stream Message data stream
* @param session SMTP session
*/
private async processMessageData(stream: Readable, session: ISmtpSession): Promise<void> {
return new Promise<void>((resolve, reject) => {
// Collect the message data
let messageData = '';
let messageSize = 0;
stream.on('data', (chunk) => {
messageData += chunk;
messageSize += chunk.length;
// Check size limits
if (this.config.maxMessageSize && messageSize > this.config.maxMessageSize) {
stream.unpipe();
return reject(new Error('Message size exceeds limit'));
}
});
stream.on('end', async () => {
try {
// Parse the email using mailparser
const parsedMail = await this.parseEmail(messageData);
// Emit message received event
this.emit('message', {
session,
mail: parsedMail,
rawData: messageData
});
resolve();
} catch (error) {
reject(error);
}
});
stream.on('error', (error) => {
reject(error);
});
});
}
/**
* Parse raw email data using mailparser
* @param rawData Raw email data
*/
private async parseEmail(rawData: string): Promise<any> {
// Use mailparser to parse the email
// We return 'any' here which will be treated as ExtendedParsedMail by consumers
return plugins.mailparser.simpleParser(rawData);
}
/**
* Check if an IP address is allowed to connect
* @param ip IP address
*/
private isIpAllowed(ip: string): boolean {
// Default to allowing all IPs if no restrictions
const defaultAllowed = ['0.0.0.0/0'];
// Check domain configs for IP restrictions
for (const domainConfig of this.config.domainConfigs) {
if (domainConfig.allowedIPs && domainConfig.allowedIPs.length > 0) {
// Check if IP matches any of the allowed IPs
for (const allowedIp of domainConfig.allowedIPs) {
if (this.ipMatchesRange(ip, allowedIp)) {
return true;
}
}
}
}
// Check against default allowed IPs
for (const allowedIp of defaultAllowed) {
if (this.ipMatchesRange(ip, allowedIp)) {
return true;
}
}
return false;
}
/**
* Check if an IP matches a range
* @param ip IP address to check
* @param range IP range in CIDR notation
*/
private ipMatchesRange(ip: string, range: string): boolean {
try {
// Use the 'ip' package to check if IP is in range
return plugins.ip.cidrSubnet(range).contains(ip);
} catch (error) {
console.error(`Invalid IP range: ${range}`, error);
return false;
}
}
/**
* Check if a domain is handled by this server
* @param domain Domain to check
*/
private isDomainHandled(domain: string): boolean {
// Check if the domain is configured in any domain config
for (const domainConfig of this.config.domainConfigs) {
for (const configDomain of domainConfig.domains) {
if (this.domainMatches(domain, configDomain)) {
return true;
}
}
}
return false;
}
/**
* Check if a domain matches a pattern (including wildcards)
* @param domain Domain to check
* @param pattern Pattern to match against
*/
private domainMatches(domain: string, pattern: string): boolean {
domain = domain.toLowerCase();
pattern = pattern.toLowerCase();
// Exact match
if (domain === pattern) {
return true;
}
// Wildcard match (*.example.com)
if (pattern.startsWith('*.')) {
const suffix = pattern.slice(2);
return domain.endsWith(suffix) && domain.length > suffix.length;
}
return false;
}
/**
* Authenticate a user
* @param auth Authentication data
*/
private authenticateUser(auth: IAuthData): boolean {
// Skip if no auth config
if (!this.config.auth) {
return true;
}
// Check if auth method is supported
if (this.config.auth.methods && !this.config.auth.methods.includes(auth.method as any)) {
return false;
}
// Check static user credentials
if (this.config.auth.users) {
const user = this.config.auth.users.find(u =>
u.username === auth.username && u.password === auth.password);
if (user) {
return true;
}
}
// LDAP authentication would go here
return false;
}
/**
* Event handler for connection
* @param handler Function to handle connection
*/
public onConnect(handler: (session: ISmtpSession, callback: (err?: Error) => void) => void): void {
// In actual implementation, this would connect to the server's 'connection' event
this.on('connect', handler);
}
/**
* Event handler for authentication
* @param handler Function to handle authentication
*/
public onAuth(handler: (auth: IAuthData, session: ISmtpSession, callback: (err?: Error, user?: any) => void) => void): void {
// In actual implementation, this would connect to the server's 'auth' event
this.on('auth', handler);
}
/**
* Event handler for MAIL FROM command
* @param handler Function to handle MAIL FROM
*/
public onMailFrom(handler: (address: { address: string; args: any }, session: ISmtpSession, callback: (err?: Error) => void) => void): void {
// In actual implementation, this would connect to the server's 'mail' event
this.on('mail', handler);
}
/**
* Event handler for RCPT TO command
* @param handler Function to handle RCPT TO
*/
public onRcptTo(handler: (address: { address: string; args: any }, session: ISmtpSession, callback: (err?: Error) => void) => void): void {
// In actual implementation, this would connect to the server's 'rcpt' event
this.on('rcpt', handler);
}
/**
* Event handler for DATA command
* @param handler Function to handle DATA
*/
public onData(handler: (stream: Readable, session: ISmtpSession, callback: (err?: Error) => void) => void): void {
// In actual implementation, this would connect to the server's 'data' event
this.on('dataReady', handler);
}
/**
* Update the server configuration
* @param config New configuration
*/
public updateConfig(config: Partial<ISmtpConfig>): void {
this.config = {
...this.config,
...config
};
// In a real implementation, this might require restarting the server
this.emit('configUpdated', this.config);
}
/**
* Get server statistics
*/
public getStats(): any {
return {
connections: this.incomingConnections.size,
// Additional stats would be included here
};
}
}

View File

@ -0,0 +1,991 @@
import * as plugins from '../plugins.js';
import * as paths from '../paths.js';
import { EventEmitter } from 'events';
import { logger } from '../logger.js';
import {
SecurityLogger,
SecurityLogLevel,
SecurityEventType
} from '../security/index.js';
import { DomainRouter } from './classes.domain.router.js';
import type {
IEmailConfig,
EmailProcessingMode,
IDomainRule
} from './classes.email.config.js';
import { Email } from '../mta/classes.email.js';
import * as net from 'node:net';
import * as tls from 'node:tls';
import * as stream from 'node:stream';
import { SMTPServer as MtaSmtpServer } from '../mta/classes.smtpserver.js';
/**
* Options for the unified email server
*/
export interface IUnifiedEmailServerOptions {
// Base server options
ports: number[];
hostname: string;
banner?: string;
// Authentication options
auth?: {
required?: boolean;
methods?: ('PLAIN' | 'LOGIN' | 'OAUTH2')[];
users?: Array<{username: string, password: string}>;
};
// TLS options
tls?: {
certPath?: string;
keyPath?: string;
caPath?: string;
minVersion?: string;
ciphers?: string;
};
// Limits
maxMessageSize?: number;
maxClients?: number;
maxConnections?: number;
// Connection options
connectionTimeout?: number;
socketTimeout?: number;
// Domain rules
domainRules: IDomainRule[];
// Default handling for unmatched domains
defaultMode: EmailProcessingMode;
defaultServer?: string;
defaultPort?: number;
defaultTls?: boolean;
}
/**
* Interface describing SMTP session data
*/
export interface ISmtpSession {
id: string;
remoteAddress: string;
clientHostname: string;
secure: boolean;
authenticated: boolean;
user?: {
username: string;
[key: string]: any;
};
envelope: {
mailFrom: {
address: string;
args: any;
};
rcptTo: Array<{
address: string;
args: any;
}>;
};
processingMode?: EmailProcessingMode;
matchedRule?: IDomainRule;
}
/**
* Authentication data for SMTP
*/
export interface IAuthData {
method: string;
username: string;
password: string;
}
/**
* Server statistics
*/
export interface IServerStats {
startTime: Date;
connections: {
current: number;
total: number;
};
messages: {
processed: number;
delivered: number;
failed: number;
};
processingTime: {
avg: number;
max: number;
min: number;
};
}
/**
* Unified email server that handles all email traffic with pattern-based routing
*/
export class UnifiedEmailServer extends EventEmitter {
private options: IUnifiedEmailServerOptions;
private domainRouter: DomainRouter;
private servers: MtaSmtpServer[] = [];
private stats: IServerStats;
private processingTimes: number[] = [];
constructor(options: IUnifiedEmailServerOptions) {
super();
// Set default options
this.options = {
...options,
banner: options.banner || `${options.hostname} ESMTP UnifiedEmailServer`,
maxMessageSize: options.maxMessageSize || 10 * 1024 * 1024, // 10MB
maxClients: options.maxClients || 100,
maxConnections: options.maxConnections || 1000,
connectionTimeout: options.connectionTimeout || 60000, // 1 minute
socketTimeout: options.socketTimeout || 60000 // 1 minute
};
// Initialize domain router for pattern matching
this.domainRouter = new DomainRouter({
domainRules: options.domainRules,
defaultMode: options.defaultMode,
defaultServer: options.defaultServer,
defaultPort: options.defaultPort,
defaultTls: options.defaultTls,
enableCache: true,
cacheSize: 1000
});
// Initialize statistics
this.stats = {
startTime: new Date(),
connections: {
current: 0,
total: 0
},
messages: {
processed: 0,
delivered: 0,
failed: 0
},
processingTime: {
avg: 0,
max: 0,
min: 0
}
};
// We'll create the SMTP servers during the start() method
}
/**
* Start the unified email server
*/
public async start(): Promise<void> {
logger.log('info', `Starting UnifiedEmailServer on ports: ${(this.options.ports as number[]).join(', ')}`);
try {
// Ensure we have the necessary TLS options
const hasTlsConfig = this.options.tls?.keyPath && this.options.tls?.certPath;
// Prepare the certificate and key if available
let key: string | undefined;
let cert: string | undefined;
if (hasTlsConfig) {
try {
key = plugins.fs.readFileSync(this.options.tls.keyPath!, 'utf8');
cert = plugins.fs.readFileSync(this.options.tls.certPath!, 'utf8');
logger.log('info', 'TLS certificates loaded successfully');
} catch (error) {
logger.log('warn', `Failed to load TLS certificates: ${error.message}`);
}
}
// Create a SMTP server for each port
for (const port of this.options.ports as number[]) {
// Create a reference object to hold the MTA service during setup
const mtaRef = {
config: {
smtp: {
hostname: this.options.hostname
},
security: {
checkIPReputation: false,
verifyDkim: true,
verifySpf: true,
verifyDmarc: true
}
},
// These will be implemented in the real integration:
dkimVerifier: {
verify: async () => ({ isValid: true, domain: '' })
},
spfVerifier: {
verifyAndApply: async () => true
},
dmarcVerifier: {
verify: async () => ({}),
applyPolicy: () => true
},
processIncomingEmail: async (email: Email) => {
// This is where we'll process the email based on domain routing
const to = email.to[0]; // Email.to is an array, take the first recipient
const rule = this.domainRouter.matchRule(to);
const mode = rule?.mode || this.options.defaultMode;
// Process based on the mode
await this.processEmailByMode(email, {
id: 'session-' + Math.random().toString(36).substring(2),
remoteAddress: '127.0.0.1',
clientHostname: '',
secure: false,
authenticated: false,
envelope: {
mailFrom: { address: email.from, args: {} },
rcptTo: email.to.map(recipient => ({ address: recipient, args: {} }))
},
processingMode: mode,
matchedRule: rule
}, mode);
return true;
}
};
// Create server options
const serverOptions = {
port,
hostname: this.options.hostname,
key,
cert
};
// Create and start the SMTP server
const smtpServer = new MtaSmtpServer(mtaRef as any, serverOptions);
this.servers.push(smtpServer);
// Start the server
await new Promise<void>((resolve, reject) => {
try {
smtpServer.start();
logger.log('info', `UnifiedEmailServer listening on port ${port}`);
// Set up event handlers
(smtpServer as any).server.on('error', (err: Error) => {
logger.log('error', `SMTP server error on port ${port}: ${err.message}`);
this.emit('error', err);
});
resolve();
} catch (err) {
if ((err as any).code === 'EADDRINUSE') {
logger.log('error', `Port ${port} is already in use`);
reject(new Error(`Port ${port} is already in use`));
} else {
logger.log('error', `Error starting server on port ${port}: ${err.message}`);
reject(err);
}
}
});
}
logger.log('info', 'UnifiedEmailServer started successfully');
this.emit('started');
} catch (error) {
logger.log('error', `Failed to start UnifiedEmailServer: ${error.message}`);
throw error;
}
}
/**
* Stop the unified email server
*/
public async stop(): Promise<void> {
logger.log('info', 'Stopping UnifiedEmailServer');
try {
// Stop all SMTP servers
for (const server of this.servers) {
server.stop();
}
// Clear the servers array
this.servers = [];
logger.log('info', 'UnifiedEmailServer stopped successfully');
this.emit('stopped');
} catch (error) {
logger.log('error', `Error stopping UnifiedEmailServer: ${error.message}`);
throw error;
}
}
/**
* Handle new SMTP connection (stub implementation)
*/
private onConnect(session: ISmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `New connection from ${session.remoteAddress}`);
// Update connection statistics
this.stats.connections.current++;
this.stats.connections.total++;
// Log connection event
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.CONNECTION,
message: 'New SMTP connection established',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
secure: session.secure
}
});
// Optional IP reputation check would go here
// Continue with the connection
callback();
}
/**
* Handle authentication (stub implementation)
*/
private onAuth(auth: IAuthData, session: ISmtpSession, callback: (err?: Error, user?: any) => void): void {
if (!this.options.auth || !this.options.auth.users || this.options.auth.users.length === 0) {
// No authentication configured, reject
const error = new Error('Authentication not supported');
logger.log('warn', `Authentication attempt when not configured: ${auth.username}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'Authentication attempt when not configured',
ipAddress: session.remoteAddress,
details: {
username: auth.username,
method: auth.method,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Find matching user
const user = this.options.auth.users.find(u => u.username === auth.username && u.password === auth.password);
if (user) {
logger.log('info', `User ${auth.username} authenticated successfully`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.AUTHENTICATION,
message: 'SMTP authentication successful',
ipAddress: session.remoteAddress,
details: {
username: auth.username,
method: auth.method,
sessionId: session.id
},
success: true
});
return callback(null, { username: user.username });
} else {
const error = new Error('Invalid username or password');
logger.log('warn', `Failed authentication for ${auth.username}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'SMTP authentication failed',
ipAddress: session.remoteAddress,
details: {
username: auth.username,
method: auth.method,
sessionId: session.id
},
success: false
});
return callback(error);
}
}
/**
* Handle MAIL FROM command (stub implementation)
*/
private onMailFrom(address: {address: string}, session: ISmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `MAIL FROM: ${address.address}`);
// Validate the email address
if (!this.isValidEmail(address.address)) {
const error = new Error('Invalid sender address');
logger.log('warn', `Invalid sender address: ${address.address}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.EMAIL_VALIDATION,
message: 'Invalid sender email format',
ipAddress: session.remoteAddress,
details: {
address: address.address,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Authentication check if required
if (this.options.auth?.required && !session.authenticated) {
const error = new Error('Authentication required');
logger.log('warn', `Unauthenticated sender rejected: ${address.address}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.AUTHENTICATION,
message: 'Unauthenticated sender rejected',
ipAddress: session.remoteAddress,
details: {
address: address.address,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Continue processing
callback();
}
/**
* Handle RCPT TO command (stub implementation)
*/
private onRcptTo(address: {address: string}, session: ISmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `RCPT TO: ${address.address}`);
// Validate the email address
if (!this.isValidEmail(address.address)) {
const error = new Error('Invalid recipient address');
logger.log('warn', `Invalid recipient address: ${address.address}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.WARN,
type: SecurityEventType.EMAIL_VALIDATION,
message: 'Invalid recipient email format',
ipAddress: session.remoteAddress,
details: {
address: address.address,
sessionId: session.id
},
success: false
});
return callback(error);
}
// Pattern match the recipient to determine processing mode
const rule = this.domainRouter.matchRule(address.address);
if (rule) {
// Store the matched rule and processing mode in the session
session.matchedRule = rule;
session.processingMode = rule.mode;
logger.log('info', `Email ${address.address} matched rule: ${rule.pattern}, mode: ${rule.mode}`);
} else {
// Use default mode
session.processingMode = this.options.defaultMode;
logger.log('info', `Email ${address.address} using default mode: ${this.options.defaultMode}`);
}
// Continue processing
callback();
}
/**
* Handle incoming email data (stub implementation)
*/
private onData(stream: stream.Readable, session: ISmtpSession, callback: (err?: Error) => void): void {
logger.log('info', `Processing email data for session ${session.id}`);
const startTime = Date.now();
const chunks: Buffer[] = [];
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
stream.on('end', async () => {
try {
const data = Buffer.concat(chunks);
const mode = session.processingMode || this.options.defaultMode;
// Determine processing mode based on matched rule
const processedEmail = await this.processEmailByMode(data, session, mode);
// Update statistics
this.stats.messages.processed++;
this.stats.messages.delivered++;
// Calculate processing time
const processingTime = Date.now() - startTime;
this.processingTimes.push(processingTime);
this.updateProcessingTimeStats();
// Emit event for delivery queue
this.emit('emailProcessed', processedEmail, mode, session.matchedRule);
logger.log('info', `Email processed successfully in ${processingTime}ms, mode: ${mode}`);
callback();
} catch (error) {
logger.log('error', `Error processing email: ${error.message}`);
// Update statistics
this.stats.messages.processed++;
this.stats.messages.failed++;
// Calculate processing time for failed attempts too
const processingTime = Date.now() - startTime;
this.processingTimes.push(processingTime);
this.updateProcessingTimeStats();
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email processing failed',
ipAddress: session.remoteAddress,
details: {
error: error.message,
sessionId: session.id,
mode: session.processingMode,
processingTime
},
success: false
});
callback(error);
}
});
stream.on('error', (err) => {
logger.log('error', `Stream error: ${err.message}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email stream error',
ipAddress: session.remoteAddress,
details: {
error: err.message,
sessionId: session.id
},
success: false
});
callback(err);
});
}
/**
* Update processing time statistics
*/
private updateProcessingTimeStats(): void {
if (this.processingTimes.length === 0) return;
// Keep only the last 1000 processing times
if (this.processingTimes.length > 1000) {
this.processingTimes = this.processingTimes.slice(-1000);
}
// Calculate stats
const sum = this.processingTimes.reduce((acc, time) => acc + time, 0);
const avg = sum / this.processingTimes.length;
const max = Math.max(...this.processingTimes);
const min = Math.min(...this.processingTimes);
this.stats.processingTime = { avg, max, min };
}
/**
* Process email based on the determined mode
*/
private async processEmailByMode(emailData: Email | Buffer, session: ISmtpSession, mode: EmailProcessingMode): Promise<Email> {
// Convert Buffer to Email if needed
let email: Email;
if (Buffer.isBuffer(emailData)) {
// Parse the email data buffer into an Email object
try {
const parsed = await plugins.mailparser.simpleParser(emailData);
email = new Email({
from: parsed.from?.value[0]?.address || session.envelope.mailFrom.address,
to: session.envelope.rcptTo[0]?.address || '',
subject: parsed.subject || '',
text: parsed.text || '',
html: parsed.html || undefined,
attachments: parsed.attachments?.map(att => ({
filename: att.filename || '',
content: att.content,
contentType: att.contentType
})) || []
});
} catch (error) {
logger.log('error', `Error parsing email data: ${error.message}`);
throw new Error(`Error parsing email data: ${error.message}`);
}
} else {
email = emailData;
}
// Process based on mode
switch (mode) {
case 'forward':
await this.handleForwardMode(email, session);
break;
case 'mta':
await this.handleMtaMode(email, session);
break;
case 'process':
await this.handleProcessMode(email, session);
break;
default:
throw new Error(`Unknown processing mode: ${mode}`);
}
// Return the processed email
return email;
}
/**
* Handle email in forward mode (SMTP proxy)
*/
private async handleForwardMode(email: Email, session: ISmtpSession): Promise<void> {
logger.log('info', `Handling email in forward mode for session ${session.id}`);
// Get target server information
const rule = session.matchedRule;
const targetServer = rule?.target?.server || this.options.defaultServer;
const targetPort = rule?.target?.port || this.options.defaultPort || 25;
const useTls = rule?.target?.useTls ?? this.options.defaultTls ?? false;
if (!targetServer) {
throw new Error('No target server configured for forward mode');
}
logger.log('info', `Forwarding email to ${targetServer}:${targetPort}, TLS: ${useTls}`);
try {
// Create a simple SMTP client connection to the target server
const client = new net.Socket();
await new Promise<void>((resolve, reject) => {
// Connect to the target server
client.connect({
host: targetServer,
port: targetPort
});
client.on('data', (data) => {
const response = data.toString().trim();
logger.log('debug', `SMTP response: ${response}`);
// Handle SMTP response codes
if (response.startsWith('2')) {
// Success response
resolve();
} else if (response.startsWith('5')) {
// Permanent error
reject(new Error(`SMTP error: ${response}`));
}
});
client.on('error', (err) => {
logger.log('error', `SMTP client error: ${err.message}`);
reject(err);
});
// SMTP client commands would go here in a full implementation
// For now, just finish the connection
client.end();
resolve();
});
logger.log('info', `Email forwarded successfully to ${targetServer}:${targetPort}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.EMAIL_FORWARDING,
message: 'Email forwarded',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
targetServer,
targetPort,
useTls,
ruleName: rule?.pattern || 'default',
subject: email.subject
},
success: true
});
} catch (error) {
logger.log('error', `Failed to forward email: ${error.message}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_FORWARDING,
message: 'Email forwarding failed',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
targetServer,
targetPort,
useTls,
ruleName: rule?.pattern || 'default',
error: error.message
},
success: false
});
throw error;
}
}
/**
* Handle email in MTA mode (programmatic processing)
*/
private async handleMtaMode(email: Email, session: ISmtpSession): Promise<void> {
logger.log('info', `Handling email in MTA mode for session ${session.id}`);
try {
// Apply MTA rule options if provided
if (session.matchedRule?.mtaOptions) {
const options = session.matchedRule.mtaOptions;
// Apply DKIM signing if enabled
if (options.dkimSign && options.dkimOptions) {
// Sign the email with DKIM
logger.log('info', `Signing email with DKIM for domain ${options.dkimOptions.domainName}`);
// In a full implementation, this would use the DKIM signing library
}
}
// Get email content for logging/processing
const subject = email.subject;
const recipients = email.getAllRecipients().join(', ');
logger.log('info', `Email processed by MTA: ${subject} to ${recipients}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email processed by MTA',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
ruleName: session.matchedRule?.pattern || 'default',
subject,
recipients
},
success: true
});
} catch (error) {
logger.log('error', `Failed to process email in MTA mode: ${error.message}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'MTA processing failed',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
ruleName: session.matchedRule?.pattern || 'default',
error: error.message
},
success: false
});
throw error;
}
}
/**
* Handle email in process mode (store-and-forward with scanning)
*/
private async handleProcessMode(email: Email, session: ISmtpSession): Promise<void> {
logger.log('info', `Handling email in process mode for session ${session.id}`);
try {
const rule = session.matchedRule;
// Apply content scanning if enabled
if (rule?.contentScanning && rule.scanners && rule.scanners.length > 0) {
logger.log('info', 'Performing content scanning');
// Apply each scanner
for (const scanner of rule.scanners) {
switch (scanner.type) {
case 'spam':
logger.log('info', 'Scanning for spam content');
// Implement spam scanning
break;
case 'virus':
logger.log('info', 'Scanning for virus content');
// Implement virus scanning
break;
case 'attachment':
logger.log('info', 'Scanning attachments');
// Check for blocked extensions
if (scanner.blockedExtensions && scanner.blockedExtensions.length > 0) {
for (const attachment of email.attachments) {
const ext = this.getFileExtension(attachment.filename);
if (scanner.blockedExtensions.includes(ext)) {
if (scanner.action === 'reject') {
throw new Error(`Blocked attachment type: ${ext}`);
} else { // tag
email.addHeader('X-Attachment-Warning', `Potentially unsafe attachment: ${attachment.filename}`);
}
}
}
}
break;
}
}
}
// Apply transformations if defined
if (rule?.transformations && rule.transformations.length > 0) {
logger.log('info', 'Applying email transformations');
for (const transform of rule.transformations) {
switch (transform.type) {
case 'addHeader':
if (transform.header && transform.value) {
email.addHeader(transform.header, transform.value);
}
break;
}
}
}
logger.log('info', `Email successfully processed in store-and-forward mode`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.INFO,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email processed and queued',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
ruleName: rule?.pattern || 'default',
contentScanning: rule?.contentScanning || false,
subject: email.subject
},
success: true
});
} catch (error) {
logger.log('error', `Failed to process email: ${error.message}`);
SecurityLogger.getInstance().logEvent({
level: SecurityLogLevel.ERROR,
type: SecurityEventType.EMAIL_PROCESSING,
message: 'Email processing failed',
ipAddress: session.remoteAddress,
details: {
sessionId: session.id,
ruleName: session.matchedRule?.pattern || 'default',
error: error.message
},
success: false
});
throw error;
}
}
/**
* Get file extension from filename
*/
private getFileExtension(filename: string): string {
return filename.substring(filename.lastIndexOf('.')).toLowerCase();
}
/**
* Handle server errors
*/
private onError(err: Error): void {
logger.log('error', `Server error: ${err.message}`);
this.emit('error', err);
}
/**
* Handle server close
*/
private onClose(): void {
logger.log('info', 'Server closed');
this.emit('close');
// Update statistics
this.stats.connections.current = 0;
}
/**
* Update server configuration
*/
public updateOptions(options: Partial<IUnifiedEmailServerOptions>): void {
// Stop the server if changing ports
const portsChanged = options.ports &&
(!this.options.ports ||
JSON.stringify(options.ports) !== JSON.stringify(this.options.ports));
if (portsChanged) {
this.stop().then(() => {
this.options = { ...this.options, ...options };
this.start();
});
} else {
// Update options without restart
this.options = { ...this.options, ...options };
// Update domain router if rules changed
if (options.domainRules) {
this.domainRouter.updateRules(options.domainRules);
}
}
}
/**
* Update domain rules
*/
public updateDomainRules(rules: IDomainRule[]): void {
this.options.domainRules = rules;
this.domainRouter.updateRules(rules);
}
/**
* Get server statistics
*/
public getStats(): IServerStats {
return { ...this.stats };
}
/**
* Validate email address format
*/
private isValidEmail(email: string): boolean {
// Basic validation - a more comprehensive validation could be used
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return emailRegex.test(email);
}
}

View File

@ -3,9 +3,12 @@ export * from './classes.dcrouter.js';
export * from './classes.smtp.portconfig.js'; export * from './classes.smtp.portconfig.js';
export * from './classes.email.domainrouter.js'; export * from './classes.email.domainrouter.js';
// SMTP Store-and-Forward components // Unified Email Configuration
export * from './classes.smtp.config.js'; export * from './classes.email.config.js';
export * from './classes.smtp.server.js'; export * from './classes.domain.router.js';
export * from './classes.email.processor.js'; export * from './classes.unified.email.server.js';
// Shared Infrastructure Components
export * from './classes.delivery.queue.js'; export * from './classes.delivery.queue.js';
export * from './classes.delivery.system.js'; export * from './classes.delivery.system.js';
export * from './classes.rate.limiter.js';

View File

@ -18,10 +18,14 @@ export enum SecurityEventType {
AUTHENTICATION = 'authentication', AUTHENTICATION = 'authentication',
ACCESS_CONTROL = 'access_control', ACCESS_CONTROL = 'access_control',
EMAIL_VALIDATION = 'email_validation', EMAIL_VALIDATION = 'email_validation',
EMAIL_PROCESSING = 'email_processing',
EMAIL_FORWARDING = 'email_forwarding',
EMAIL_DELIVERY = 'email_delivery',
DKIM = 'dkim', DKIM = 'dkim',
SPF = 'spf', SPF = 'spf',
DMARC = 'dmarc', DMARC = 'dmarc',
RATE_LIMIT = 'rate_limit', RATE_LIMIT = 'rate_limit',
RATE_LIMITING = 'rate_limiting',
SPAM = 'spam', SPAM = 'spam',
MALWARE = 'malware', MALWARE = 'malware',
CONNECTION = 'connection', CONNECTION = 'connection',

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@serve.zone/platformservice', name: '@serve.zone/platformservice',
version: '2.6.0', version: '2.8.0',
description: 'A multifaceted platform service handling mail, SMS, letter delivery, and AI services.' description: 'A multifaceted platform service handling mail, SMS, letter delivery, and AI services.'
} }