272 lines
6.9 KiB
TypeScript
272 lines
6.9 KiB
TypeScript
|
import * as plugins from '../plugins.js';
|
||
|
import { DeliveryQueue } from './classes.delivery.queue.js';
|
||
|
import type { IQueueItem } from './classes.delivery.queue.js';
|
||
|
import type { IProcessingResult, IRoutingDecision } from './classes.email.processor.js';
|
||
|
import { EventEmitter } from 'node:events';
|
||
|
import { Readable } from 'node:stream';
|
||
|
|
||
|
/**
|
||
|
* Result of a delivery attempt
|
||
|
*/
|
||
|
export interface IDeliveryResult {
|
||
|
id: string;
|
||
|
success: boolean;
|
||
|
error?: string;
|
||
|
timestamp: Date;
|
||
|
destination: string;
|
||
|
messageId?: string;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Delivery system statistics
|
||
|
*/
|
||
|
export interface IDeliveryStats {
|
||
|
delivered: number;
|
||
|
failed: number;
|
||
|
pending: number;
|
||
|
inProgress: number;
|
||
|
totalAttempts: number;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Email delivery system with retry logic
|
||
|
*/
|
||
|
export class DeliverySystem extends EventEmitter {
|
||
|
private queue: DeliveryQueue;
|
||
|
private isRunning: boolean = false;
|
||
|
private stats: IDeliveryStats = {
|
||
|
delivered: 0,
|
||
|
failed: 0,
|
||
|
pending: 0,
|
||
|
inProgress: 0,
|
||
|
totalAttempts: 0
|
||
|
};
|
||
|
private connections: Map<string, any> = new Map();
|
||
|
private maxConcurrent: number = 5;
|
||
|
|
||
|
/**
|
||
|
* Create a new delivery system
|
||
|
* @param queue Delivery queue to process
|
||
|
* @param maxConcurrent Maximum concurrent deliveries
|
||
|
*/
|
||
|
constructor(queue: DeliveryQueue, maxConcurrent: number = 5) {
|
||
|
super();
|
||
|
this.queue = queue;
|
||
|
this.maxConcurrent = maxConcurrent;
|
||
|
|
||
|
// Listen for queue events
|
||
|
this.setupQueueListeners();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Set up queue event listeners
|
||
|
*/
|
||
|
private setupQueueListeners(): void {
|
||
|
// Listen for items ready to be delivered
|
||
|
this.queue.on('itemsReady', (items: IQueueItem[]) => {
|
||
|
if (this.isRunning) {
|
||
|
this.processItems(items).catch(err => {
|
||
|
console.error('Error processing queue items:', err);
|
||
|
});
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Start the delivery system
|
||
|
*/
|
||
|
public async start(): Promise<void> {
|
||
|
this.isRunning = true;
|
||
|
this.emit('started');
|
||
|
|
||
|
// Update stats
|
||
|
this.updateStats();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Stop the delivery system
|
||
|
*/
|
||
|
public async stop(): Promise<void> {
|
||
|
this.isRunning = false;
|
||
|
|
||
|
// Close all connections
|
||
|
for (const connection of this.connections.values()) {
|
||
|
try {
|
||
|
if (connection.close) {
|
||
|
await connection.close();
|
||
|
}
|
||
|
} catch (error) {
|
||
|
console.error('Error closing connection:', error);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
this.connections.clear();
|
||
|
|
||
|
this.emit('stopped');
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Process items from the queue
|
||
|
* @param items Queue items to process
|
||
|
*/
|
||
|
private async processItems(items: IQueueItem[]): Promise<void> {
|
||
|
// Skip if not running
|
||
|
if (!this.isRunning) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Count in-progress items
|
||
|
const inProgress = Array.from(this.queue.getAllItems()).filter(item =>
|
||
|
item.status === 'processing'
|
||
|
).length;
|
||
|
|
||
|
// Calculate how many items we can process concurrently
|
||
|
const availableSlots = Math.max(0, this.maxConcurrent - inProgress);
|
||
|
|
||
|
if (availableSlots === 0) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Process up to availableSlots items
|
||
|
const itemsToProcess = items.slice(0, availableSlots);
|
||
|
|
||
|
// Process each item
|
||
|
for (const item of itemsToProcess) {
|
||
|
// Mark item as processing
|
||
|
await this.queue.updateItem(item.id, {
|
||
|
status: 'processing'
|
||
|
});
|
||
|
|
||
|
// Deliver the item
|
||
|
this.deliverItem(item).catch(error => {
|
||
|
console.error(`Error delivering item ${item.id}:`, error);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
// Update stats
|
||
|
this.updateStats();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Deliver a single queue item
|
||
|
* @param item Queue item to deliver
|
||
|
*/
|
||
|
private async deliverItem(item: IQueueItem): Promise<void> {
|
||
|
try {
|
||
|
// Update stats
|
||
|
this.stats.inProgress++;
|
||
|
this.stats.totalAttempts++;
|
||
|
|
||
|
// Get processing result
|
||
|
const result = item.processingResult;
|
||
|
|
||
|
// Attempt delivery
|
||
|
const deliveryResult = await this.deliverEmail(result);
|
||
|
|
||
|
if (deliveryResult.success) {
|
||
|
// Mark as delivered
|
||
|
await this.queue.markDelivered(item.id);
|
||
|
|
||
|
// Update stats
|
||
|
this.stats.delivered++;
|
||
|
this.stats.inProgress--;
|
||
|
|
||
|
// Emit delivery event
|
||
|
this.emit('delivered', {
|
||
|
item,
|
||
|
result: deliveryResult
|
||
|
});
|
||
|
} else {
|
||
|
// Mark as failed (will retry if attempts < maxRetries)
|
||
|
await this.queue.markFailed(item.id, deliveryResult.error || 'Unknown error');
|
||
|
|
||
|
// Update stats
|
||
|
this.stats.inProgress--;
|
||
|
|
||
|
// Emit failure event
|
||
|
this.emit('deliveryFailed', {
|
||
|
item,
|
||
|
result: deliveryResult
|
||
|
});
|
||
|
}
|
||
|
|
||
|
// Update stats
|
||
|
this.updateStats();
|
||
|
} catch (error) {
|
||
|
console.error(`Error in deliverItem for ${item.id}:`, error);
|
||
|
|
||
|
// Mark as failed
|
||
|
await this.queue.markFailed(item.id, error.message || 'Internal error');
|
||
|
|
||
|
// Update stats
|
||
|
this.stats.inProgress--;
|
||
|
this.updateStats();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Deliver an email to its destination
|
||
|
* @param result Processing result containing the email to deliver
|
||
|
*/
|
||
|
private async deliverEmail(result: IProcessingResult): Promise<IDeliveryResult> {
|
||
|
const { routing, metadata, rawData } = result;
|
||
|
const { id, targetServer, port, useTls, authentication } = routing;
|
||
|
|
||
|
try {
|
||
|
// Create a transport for delivery
|
||
|
// In a real implementation, this would use nodemailer or a similar library
|
||
|
console.log(`Delivering email ${id} to ${targetServer}:${port} (TLS: ${useTls})`);
|
||
|
|
||
|
// Simulate delivery
|
||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||
|
|
||
|
// Simulate success
|
||
|
// In a real implementation, we would actually send the email
|
||
|
const success = Math.random() > 0.1; // 90% success rate for simulation
|
||
|
|
||
|
if (!success) {
|
||
|
throw new Error('Simulated delivery failure');
|
||
|
}
|
||
|
|
||
|
// Return success result
|
||
|
return {
|
||
|
id,
|
||
|
success: true,
|
||
|
timestamp: new Date(),
|
||
|
destination: `${targetServer}:${port}`,
|
||
|
messageId: `${id}@example.com`
|
||
|
};
|
||
|
} catch (error) {
|
||
|
console.error(`Delivery error for ${id}:`, error);
|
||
|
|
||
|
// Return failure result
|
||
|
return {
|
||
|
id,
|
||
|
success: false,
|
||
|
error: error.message || 'Unknown error',
|
||
|
timestamp: new Date(),
|
||
|
destination: `${targetServer}:${port}`
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Update delivery system statistics
|
||
|
*/
|
||
|
private updateStats(): void {
|
||
|
// Get pending items
|
||
|
this.stats.pending = Array.from(this.queue.getAllItems()).filter(item =>
|
||
|
item.status === 'pending' || item.status === 'deferred'
|
||
|
).length;
|
||
|
|
||
|
// Emit stats update
|
||
|
this.emit('statsUpdated', this.getStats());
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Get current delivery statistics
|
||
|
*/
|
||
|
public getStats(): IDeliveryStats {
|
||
|
return { ...this.stats };
|
||
|
}
|
||
|
}
|