import * as plugins from '../plugins.js'; import { EventEmitter } from 'node:events'; import * as fs from 'node:fs'; import * as path from 'node:path'; import { logger } from '../logger.js'; import { type EmailProcessingMode, type IDomainRule } from './classes.email.config.js'; /** * Queue item status */ export type QueueItemStatus = 'pending' | 'processing' | 'delivered' | 'failed' | 'deferred'; /** * Queue item interface */ export interface IQueueItem { id: string; processingMode: EmailProcessingMode; processingResult: any; rule: IDomainRule; status: QueueItemStatus; attempts: number; nextAttempt: Date; lastError?: string; createdAt: Date; updatedAt: Date; deliveredAt?: Date; } /** * Queue options interface */ export interface IQueueOptions { // Storage options storageType?: 'memory' | 'disk'; persistentPath?: string; // Queue behavior checkInterval?: number; maxQueueSize?: number; maxPerDestination?: number; // Delivery attempts maxRetries?: number; baseRetryDelay?: number; maxRetryDelay?: number; } /** * Queue statistics interface */ export interface IQueueStats { queueSize: number; status: { pending: number; processing: number; delivered: number; failed: number; deferred: number; }; modes: { forward: number; mta: number; process: number; }; oldestItem?: Date; newestItem?: Date; averageAttempts: number; totalProcessed: number; processingActive: boolean; } /** * A unified queue for all email modes */ export class UnifiedDeliveryQueue extends EventEmitter { private options: Required; private queue: Map = new Map(); private checkTimer?: NodeJS.Timeout; private stats: IQueueStats; private processing: boolean = false; private totalProcessed: number = 0; /** * Create a new unified delivery queue * @param options Queue options */ constructor(options: IQueueOptions) { super(); // Set default options this.options = { storageType: options.storageType || 'memory', persistentPath: options.persistentPath || path.join(process.cwd(), 'email-queue'), checkInterval: options.checkInterval || 30000, // 30 seconds maxQueueSize: options.maxQueueSize || 10000, maxPerDestination: options.maxPerDestination || 100, maxRetries: options.maxRetries || 5, baseRetryDelay: options.baseRetryDelay || 60000, // 1 minute maxRetryDelay: options.maxRetryDelay || 3600000 // 1 hour }; // Initialize statistics this.stats = { queueSize: 0, status: { pending: 0, processing: 0, delivered: 0, failed: 0, deferred: 0 }, modes: { forward: 0, mta: 0, process: 0 }, averageAttempts: 0, totalProcessed: 0, processingActive: false }; } /** * Initialize the queue */ public async initialize(): Promise { logger.log('info', 'Initializing UnifiedDeliveryQueue'); try { // Create persistent storage directory if using disk storage if (this.options.storageType === 'disk') { if (!fs.existsSync(this.options.persistentPath)) { fs.mkdirSync(this.options.persistentPath, { recursive: true }); } // Load existing items from disk await this.loadFromDisk(); } // Start the queue processing timer this.startProcessing(); // Emit initialized event this.emit('initialized'); logger.log('info', 'UnifiedDeliveryQueue initialized successfully'); } catch (error) { logger.log('error', `Failed to initialize queue: ${error.message}`); throw error; } } /** * Start queue processing */ private startProcessing(): void { if (this.checkTimer) { clearInterval(this.checkTimer); } this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval); this.processing = true; this.stats.processingActive = true; this.emit('processingStarted'); logger.log('info', 'Queue processing started'); } /** * Stop queue processing */ private stopProcessing(): void { if (this.checkTimer) { clearInterval(this.checkTimer); this.checkTimer = undefined; } this.processing = false; this.stats.processingActive = false; this.emit('processingStopped'); logger.log('info', 'Queue processing stopped'); } /** * Check for items that need to be processed */ private async processQueue(): Promise { try { const now = new Date(); let readyItems: IQueueItem[] = []; // Find items ready for processing for (const item of this.queue.values()) { if (item.status === 'pending' || (item.status === 'deferred' && item.nextAttempt <= now)) { readyItems.push(item); } } if (readyItems.length === 0) { return; } // Sort by oldest first readyItems.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()); // Emit event for ready items this.emit('itemsReady', readyItems); logger.log('info', `Found ${readyItems.length} items ready for processing`); // Update statistics this.updateStats(); } catch (error) { logger.log('error', `Error processing queue: ${error.message}`); this.emit('error', error); } } /** * Add an item to the queue * @param processingResult Processing result to queue * @param mode Processing mode * @param rule Domain rule */ public async enqueue(processingResult: any, mode: EmailProcessingMode, rule: IDomainRule): Promise { // Check if queue is full if (this.queue.size >= this.options.maxQueueSize) { throw new Error('Queue is full'); } // Generate a unique ID const id = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`; // Create queue item const item: IQueueItem = { id, processingMode: mode, processingResult, rule, status: 'pending', attempts: 0, nextAttempt: new Date(), createdAt: new Date(), updatedAt: new Date() }; // Add to queue this.queue.set(id, item); // Persist to disk if using disk storage if (this.options.storageType === 'disk') { await this.persistItem(item); } // Update statistics this.updateStats(); // Emit event this.emit('itemEnqueued', item); logger.log('info', `Item enqueued with ID ${id}, mode: ${mode}`); return id; } /** * Get an item from the queue * @param id Item ID */ public getItem(id: string): IQueueItem | undefined { return this.queue.get(id); } /** * Mark an item as being processed * @param id Item ID */ public async markProcessing(id: string): Promise { const item = this.queue.get(id); if (!item) { return false; } // Update status item.status = 'processing'; item.attempts++; item.updatedAt = new Date(); // Persist changes if using disk storage if (this.options.storageType === 'disk') { await this.persistItem(item); } // Update statistics this.updateStats(); // Emit event this.emit('itemProcessing', item); logger.log('info', `Item ${id} marked as processing, attempt ${item.attempts}`); return true; } /** * Mark an item as delivered * @param id Item ID */ public async markDelivered(id: string): Promise { const item = this.queue.get(id); if (!item) { return false; } // Update status item.status = 'delivered'; item.updatedAt = new Date(); item.deliveredAt = new Date(); // Persist changes if using disk storage if (this.options.storageType === 'disk') { await this.persistItem(item); } // Update statistics this.totalProcessed++; this.updateStats(); // Emit event this.emit('itemDelivered', item); logger.log('info', `Item ${id} marked as delivered after ${item.attempts} attempts`); return true; } /** * Mark an item as failed * @param id Item ID * @param error Error message */ public async markFailed(id: string, error: string): Promise { const item = this.queue.get(id); if (!item) { return false; } // Determine if we should retry if (item.attempts < this.options.maxRetries) { // Calculate next retry time with exponential backoff const delay = Math.min( this.options.baseRetryDelay * Math.pow(2, item.attempts - 1), this.options.maxRetryDelay ); // Update status item.status = 'deferred'; item.lastError = error; item.nextAttempt = new Date(Date.now() + delay); item.updatedAt = new Date(); // Persist changes if using disk storage if (this.options.storageType === 'disk') { await this.persistItem(item); } // Emit event this.emit('itemDeferred', item); logger.log('info', `Item ${id} deferred for ${delay}ms, attempt ${item.attempts}, error: ${error}`); } else { // Mark as permanently failed item.status = 'failed'; item.lastError = error; item.updatedAt = new Date(); // Persist changes if using disk storage if (this.options.storageType === 'disk') { await this.persistItem(item); } // Update statistics this.totalProcessed++; // Emit event this.emit('itemFailed', item); logger.log('warn', `Item ${id} permanently failed after ${item.attempts} attempts, error: ${error}`); } // Update statistics this.updateStats(); return true; } /** * Remove an item from the queue * @param id Item ID */ public async removeItem(id: string): Promise { const item = this.queue.get(id); if (!item) { return false; } // Remove from queue this.queue.delete(id); // Remove from disk if using disk storage if (this.options.storageType === 'disk') { await this.removeItemFromDisk(id); } // Update statistics this.updateStats(); // Emit event this.emit('itemRemoved', item); logger.log('info', `Item ${id} removed from queue`); return true; } /** * Persist an item to disk * @param item Item to persist */ private async persistItem(item: IQueueItem): Promise { try { const filePath = path.join(this.options.persistentPath, `${item.id}.json`); await fs.promises.writeFile(filePath, JSON.stringify(item, null, 2), 'utf8'); } catch (error) { logger.log('error', `Failed to persist item ${item.id}: ${error.message}`); this.emit('error', error); } } /** * Remove an item from disk * @param id Item ID */ private async removeItemFromDisk(id: string): Promise { try { const filePath = path.join(this.options.persistentPath, `${id}.json`); if (fs.existsSync(filePath)) { await fs.promises.unlink(filePath); } } catch (error) { logger.log('error', `Failed to remove item ${id} from disk: ${error.message}`); this.emit('error', error); } } /** * Load queue items from disk */ private async loadFromDisk(): Promise { try { // Check if directory exists if (!fs.existsSync(this.options.persistentPath)) { return; } // Get all JSON files const files = fs.readdirSync(this.options.persistentPath).filter(file => file.endsWith('.json')); // Load each file for (const file of files) { try { const filePath = path.join(this.options.persistentPath, file); const data = await fs.promises.readFile(filePath, 'utf8'); const item = JSON.parse(data) as IQueueItem; // Convert date strings to Date objects item.createdAt = new Date(item.createdAt); item.updatedAt = new Date(item.updatedAt); item.nextAttempt = new Date(item.nextAttempt); if (item.deliveredAt) { item.deliveredAt = new Date(item.deliveredAt); } // Add to queue this.queue.set(item.id, item); } catch (error) { logger.log('error', `Failed to load item from ${file}: ${error.message}`); } } // Update statistics this.updateStats(); logger.log('info', `Loaded ${this.queue.size} items from disk`); } catch (error) { logger.log('error', `Failed to load items from disk: ${error.message}`); throw error; } } /** * Update queue statistics */ private updateStats(): void { // Reset counters this.stats.queueSize = this.queue.size; this.stats.status = { pending: 0, processing: 0, delivered: 0, failed: 0, deferred: 0 }; this.stats.modes = { forward: 0, mta: 0, process: 0 }; let totalAttempts = 0; let oldestTime = Date.now(); let newestTime = 0; // Count by status and mode for (const item of this.queue.values()) { // Count by status this.stats.status[item.status]++; // Count by mode this.stats.modes[item.processingMode]++; // Track total attempts totalAttempts += item.attempts; // Track oldest and newest const itemTime = item.createdAt.getTime(); if (itemTime < oldestTime) { oldestTime = itemTime; } if (itemTime > newestTime) { newestTime = itemTime; } } // Calculate average attempts this.stats.averageAttempts = this.queue.size > 0 ? totalAttempts / this.queue.size : 0; // Set oldest and newest this.stats.oldestItem = this.queue.size > 0 ? new Date(oldestTime) : undefined; this.stats.newestItem = this.queue.size > 0 ? new Date(newestTime) : undefined; // Set total processed this.stats.totalProcessed = this.totalProcessed; // Set processing active this.stats.processingActive = this.processing; // Emit statistics event this.emit('statsUpdated', this.stats); } /** * Get queue statistics */ public getStats(): IQueueStats { return { ...this.stats }; } /** * Pause queue processing */ public pause(): void { if (this.processing) { this.stopProcessing(); logger.log('info', 'Queue processing paused'); } } /** * Resume queue processing */ public resume(): void { if (!this.processing) { this.startProcessing(); logger.log('info', 'Queue processing resumed'); } } /** * Clean up old delivered and failed items * @param maxAge Maximum age in milliseconds (default: 7 days) */ public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise { const cutoff = new Date(Date.now() - maxAge); let removedCount = 0; // Find old items for (const item of this.queue.values()) { if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) { // Remove item await this.removeItem(item.id); removedCount++; } } logger.log('info', `Cleaned up ${removedCount} old items`); return removedCount; } /** * Shutdown the queue */ public async shutdown(): Promise { logger.log('info', 'Shutting down UnifiedDeliveryQueue'); // Stop processing this.stopProcessing(); // If using disk storage, make sure all items are persisted if (this.options.storageType === 'disk') { const pendingWrites: Promise[] = []; for (const item of this.queue.values()) { pendingWrites.push(this.persistItem(item)); } // Wait for all writes to complete await Promise.all(pendingWrites); } // Clear the queue (memory only) this.queue.clear(); // Update statistics this.updateStats(); // Emit shutdown event this.emit('shutdown'); logger.log('info', 'UnifiedDeliveryQueue shut down successfully'); } }