642 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			642 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| import * as plugins from '../../plugins.ts';
 | |
| import { logger } from '../../logger.ts';
 | |
| import { type EmailProcessingMode } from '../routing/classes.email.config.ts';
 | |
| import type { IEmailRoute } from '../routing/interfaces.ts';
 | |
| 
 | |
| /**
 | |
|  * Queue item status
 | |
|  */
 | |
| export type QueueItemStatus = 'pending' | 'processing' | 'delivered' | 'failed' | 'deferred';
 | |
| 
 | |
| /**
 | |
|  * Queue item interface
 | |
|  */
 | |
| export interface IQueueItem {
 | |
|   id: string;
 | |
|   processingMode: EmailProcessingMode;
 | |
|   processingResult: any;
 | |
|   route: IEmailRoute;
 | |
|   status: QueueItemStatus;
 | |
|   attempts: number;
 | |
|   nextAttempt: Date;
 | |
|   lastError?: string;
 | |
|   createdAt: Date;
 | |
|   updatedAt: Date;
 | |
|   deliveredAt?: Date;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Queue options interface
 | |
|  */
 | |
| export interface IQueueOptions {
 | |
|   // Storage options
 | |
|   storageType?: 'memory' | 'disk';
 | |
|   persistentPath?: string;
 | |
|   
 | |
|   // Queue behavior
 | |
|   checkInterval?: number;
 | |
|   maxQueueSize?: number;
 | |
|   maxPerDestination?: number;
 | |
|   
 | |
|   // Delivery attempts
 | |
|   maxRetries?: number;
 | |
|   baseRetryDelay?: number;
 | |
|   maxRetryDelay?: number;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Queue statistics interface
 | |
|  */
 | |
| export interface IQueueStats {
 | |
|   queueSize: number;
 | |
|   status: {
 | |
|     pending: number;
 | |
|     processing: number;
 | |
|     delivered: number;
 | |
|     failed: number;
 | |
|     deferred: number;
 | |
|   };
 | |
|   modes: {
 | |
|     forward: number;
 | |
|     mta: number;
 | |
|     process: number;
 | |
|   };
 | |
|   oldestItem?: Date;
 | |
|   newestItem?: Date;
 | |
|   averageAttempts: number;
 | |
|   totalProcessed: number;
 | |
|   processingActive: boolean;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * A unified queue for all email modes
 | |
|  */
 | |
| export class UnifiedDeliveryQueue extends plugins.EventEmitter {
 | |
|   private options: Required<IQueueOptions>;
 | |
|   private queue: Map<string, IQueueItem> = new Map();
 | |
|   private checkTimer?: NodeJS.Timeout;
 | |
|   private stats: IQueueStats;
 | |
|   private processing: boolean = false;
 | |
|   private totalProcessed: number = 0;
 | |
|   
 | |
|   /**
 | |
|    * Create a new unified delivery queue
 | |
|    * @param options Queue options
 | |
|    */
 | |
|   constructor(options: IQueueOptions) {
 | |
|     super();
 | |
|     
 | |
|     // Set default options
 | |
|     this.options = {
 | |
|       storageType: options.storageType || 'memory',
 | |
|       persistentPath: options.persistentPath || path.join(process.cwd(), 'email-queue'),
 | |
|       checkInterval: options.checkInterval || 30000, // 30 seconds
 | |
|       maxQueueSize: options.maxQueueSize || 10000,
 | |
|       maxPerDestination: options.maxPerDestination || 100,
 | |
|       maxRetries: options.maxRetries || 5,
 | |
|       baseRetryDelay: options.baseRetryDelay || 60000, // 1 minute
 | |
|       maxRetryDelay: options.maxRetryDelay || 3600000 // 1 hour
 | |
|     };
 | |
|     
 | |
|     // Initialize statistics
 | |
|     this.stats = {
 | |
|       queueSize: 0,
 | |
|       status: {
 | |
|         pending: 0,
 | |
|         processing: 0,
 | |
|         delivered: 0,
 | |
|         failed: 0,
 | |
|         deferred: 0
 | |
|       },
 | |
|       modes: {
 | |
|         forward: 0,
 | |
|         mta: 0,
 | |
|         process: 0
 | |
|       },
 | |
|       averageAttempts: 0,
 | |
|       totalProcessed: 0,
 | |
|       processingActive: false
 | |
|     };
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Initialize the queue
 | |
|    */
 | |
|   public async initialize(): Promise<void> {
 | |
|     logger.log('info', 'Initializing UnifiedDeliveryQueue');
 | |
|     
 | |
|     try {
 | |
|       // Create persistent storage directory if using disk storage
 | |
|       if (this.options.storageType === 'disk') {
 | |
|         if (!fs.existsSync(this.options.persistentPath)) {
 | |
|           fs.mkdirSync(this.options.persistentPath, { recursive: true });
 | |
|         }
 | |
|         
 | |
|         // Load existing items from disk
 | |
|         await this.loadFromDisk();
 | |
|       }
 | |
|       
 | |
|       // Start the queue processing timer
 | |
|       this.startProcessing();
 | |
|       
 | |
|       // Emit initialized event
 | |
|       this.emit('initialized');
 | |
|       logger.log('info', 'UnifiedDeliveryQueue initialized successfully');
 | |
|     } catch (error) {
 | |
|       logger.log('error', `Failed to initialize queue: ${error.message}`);
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Start queue processing
 | |
|    */
 | |
|   private startProcessing(): void {
 | |
|     if (this.checkTimer) {
 | |
|       clearInterval(this.checkTimer);
 | |
|     }
 | |
|     
 | |
|     this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval);
 | |
|     this.processing = true;
 | |
|     this.stats.processingActive = true;
 | |
|     this.emit('processingStarted');
 | |
|     logger.log('info', 'Queue processing started');
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Stop queue processing
 | |
|    */
 | |
|   private stopProcessing(): void {
 | |
|     if (this.checkTimer) {
 | |
|       clearInterval(this.checkTimer);
 | |
|       this.checkTimer = undefined;
 | |
|     }
 | |
|     
 | |
|     this.processing = false;
 | |
|     this.stats.processingActive = false;
 | |
|     this.emit('processingStopped');
 | |
|     logger.log('info', 'Queue processing stopped');
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Check for items that need to be processed
 | |
|    */
 | |
|   private async processQueue(): Promise<void> {
 | |
|     try {
 | |
|       const now = new Date();
 | |
|       let readyItems: IQueueItem[] = [];
 | |
|       
 | |
|       // Find items ready for processing
 | |
|       for (const item of this.queue.values()) {
 | |
|         if (item.status === 'pending' || (item.status === 'deferred' && item.nextAttempt <= now)) {
 | |
|           readyItems.push(item);
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       if (readyItems.length === 0) {
 | |
|         return;
 | |
|       }
 | |
|       
 | |
|       // Sort by oldest first
 | |
|       readyItems.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
 | |
|       
 | |
|       // Emit event for ready items
 | |
|       this.emit('itemsReady', readyItems);
 | |
|       logger.log('info', `Found ${readyItems.length} items ready for processing`);
 | |
|       
 | |
|       // Update statistics
 | |
|       this.updateStats();
 | |
|     } catch (error) {
 | |
|       logger.log('error', `Error processing queue: ${error.message}`);
 | |
|       this.emit('error', error);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Add an item to the queue
 | |
|    * @param processingResult Processing result to queue
 | |
|    * @param mode Processing mode
 | |
|    * @param route Email route
 | |
|    */
 | |
|   public async enqueue(processingResult: any, mode: EmailProcessingMode, route: IEmailRoute): Promise<string> {
 | |
|     // Check if queue is full
 | |
|     if (this.queue.size >= this.options.maxQueueSize) {
 | |
|       throw new Error('Queue is full');
 | |
|     }
 | |
|     
 | |
|     // Generate a unique ID
 | |
|     const id = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`;
 | |
|     
 | |
|     // Create queue item
 | |
|     const item: IQueueItem = {
 | |
|       id,
 | |
|       processingMode: mode,
 | |
|       processingResult,
 | |
|       route,
 | |
|       status: 'pending',
 | |
|       attempts: 0,
 | |
|       nextAttempt: new Date(),
 | |
|       createdAt: new Date(),
 | |
|       updatedAt: new Date()
 | |
|     };
 | |
|     
 | |
|     // Add to queue
 | |
|     this.queue.set(id, item);
 | |
|     
 | |
|     // Persist to disk if using disk storage
 | |
|     if (this.options.storageType === 'disk') {
 | |
|       await this.persistItem(item);
 | |
|     }
 | |
|     
 | |
|     // Update statistics
 | |
|     this.updateStats();
 | |
|     
 | |
|     // Emit event
 | |
|     this.emit('itemEnqueued', item);
 | |
|     logger.log('info', `Item enqueued with ID ${id}, mode: ${mode}`);
 | |
|     
 | |
|     return id;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Get an item from the queue
 | |
|    * @param id Item ID
 | |
|    */
 | |
|   public getItem(id: string): IQueueItem | undefined {
 | |
|     return this.queue.get(id);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Mark an item as being processed
 | |
|    * @param id Item ID
 | |
|    */
 | |
|   public async markProcessing(id: string): Promise<boolean> {
 | |
|     const item = this.queue.get(id);
 | |
|     
 | |
|     if (!item) {
 | |
|       return false;
 | |
|     }
 | |
|     
 | |
|     // Update status
 | |
|     item.status = 'processing';
 | |
|     item.attempts++;
 | |
|     item.updatedAt = new Date();
 | |
|     
 | |
|     // Persist changes if using disk storage
 | |
|     if (this.options.storageType === 'disk') {
 | |
|       await this.persistItem(item);
 | |
|     }
 | |
|     
 | |
|     // Update statistics
 | |
|     this.updateStats();
 | |
|     
 | |
|     // Emit event
 | |
|     this.emit('itemProcessing', item);
 | |
|     logger.log('info', `Item ${id} marked as processing, attempt ${item.attempts}`);
 | |
|     
 | |
|     return true;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Mark an item as delivered
 | |
|    * @param id Item ID
 | |
|    */
 | |
|   public async markDelivered(id: string): Promise<boolean> {
 | |
|     const item = this.queue.get(id);
 | |
|     
 | |
|     if (!item) {
 | |
|       return false;
 | |
|     }
 | |
|     
 | |
|     // Update status
 | |
|     item.status = 'delivered';
 | |
|     item.updatedAt = new Date();
 | |
|     item.deliveredAt = new Date();
 | |
|     
 | |
|     // Persist changes if using disk storage
 | |
|     if (this.options.storageType === 'disk') {
 | |
|       await this.persistItem(item);
 | |
|     }
 | |
|     
 | |
|     // Update statistics
 | |
|     this.totalProcessed++;
 | |
|     this.updateStats();
 | |
|     
 | |
|     // Emit event
 | |
|     this.emit('itemDelivered', item);
 | |
|     logger.log('info', `Item ${id} marked as delivered after ${item.attempts} attempts`);
 | |
|     
 | |
|     return true;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Mark an item as failed
 | |
|    * @param id Item ID
 | |
|    * @param error Error message
 | |
|    */
 | |
|   public async markFailed(id: string, error: string): Promise<boolean> {
 | |
|     const item = this.queue.get(id);
 | |
|     
 | |
|     if (!item) {
 | |
|       return false;
 | |
|     }
 | |
|     
 | |
|     // Determine if we should retry
 | |
|     if (item.attempts < this.options.maxRetries) {
 | |
|       // Calculate next retry time with exponential backoff
 | |
|       const delay = Math.min(
 | |
|         this.options.baseRetryDelay * Math.pow(2, item.attempts - 1),
 | |
|         this.options.maxRetryDelay
 | |
|       );
 | |
|       
 | |
|       // Update status
 | |
|       item.status = 'deferred';
 | |
|       item.lastError = error;
 | |
|       item.nextAttempt = new Date(Date.now() + delay);
 | |
|       item.updatedAt = new Date();
 | |
|       
 | |
|       // Persist changes if using disk storage
 | |
|       if (this.options.storageType === 'disk') {
 | |
|         await this.persistItem(item);
 | |
|       }
 | |
|       
 | |
|       // Emit event
 | |
|       this.emit('itemDeferred', item);
 | |
|       logger.log('info', `Item ${id} deferred for ${delay}ms, attempt ${item.attempts}, error: ${error}`);
 | |
|     } else {
 | |
|       // Mark as permanently failed
 | |
|       item.status = 'failed';
 | |
|       item.lastError = error;
 | |
|       item.updatedAt = new Date();
 | |
|       
 | |
|       // Persist changes if using disk storage
 | |
|       if (this.options.storageType === 'disk') {
 | |
|         await this.persistItem(item);
 | |
|       }
 | |
|       
 | |
|       // Update statistics
 | |
|       this.totalProcessed++;
 | |
|       
 | |
|       // Emit event
 | |
|       this.emit('itemFailed', item);
 | |
|       logger.log('warn', `Item ${id} permanently failed after ${item.attempts} attempts, error: ${error}`);
 | |
|     }
 | |
|     
 | |
|     // Update statistics
 | |
|     this.updateStats();
 | |
|     
 | |
|     return true;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Remove an item from the queue
 | |
|    * @param id Item ID
 | |
|    */
 | |
|   public async removeItem(id: string): Promise<boolean> {
 | |
|     const item = this.queue.get(id);
 | |
|     
 | |
|     if (!item) {
 | |
|       return false;
 | |
|     }
 | |
|     
 | |
|     // Remove from queue
 | |
|     this.queue.delete(id);
 | |
|     
 | |
|     // Remove from disk if using disk storage
 | |
|     if (this.options.storageType === 'disk') {
 | |
|       await this.removeItemFromDisk(id);
 | |
|     }
 | |
|     
 | |
|     // Update statistics
 | |
|     this.updateStats();
 | |
|     
 | |
|     // Emit event
 | |
|     this.emit('itemRemoved', item);
 | |
|     logger.log('info', `Item ${id} removed from queue`);
 | |
|     
 | |
|     return true;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Persist an item to disk
 | |
|    * @param item Item to persist
 | |
|    */
 | |
|   private async persistItem(item: IQueueItem): Promise<void> {
 | |
|     try {
 | |
|       const filePath = path.join(this.options.persistentPath, `${item.id}.tson`);
 | |
|       await fs.promises.writeFile(filePath, JSON.stringify(item, null, 2), 'utf8');
 | |
|     } catch (error) {
 | |
|       logger.log('error', `Failed to persist item ${item.id}: ${error.message}`);
 | |
|       this.emit('error', error);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Remove an item from disk
 | |
|    * @param id Item ID
 | |
|    */
 | |
|   private async removeItemFromDisk(id: string): Promise<void> {
 | |
|     try {
 | |
|       const filePath = path.join(this.options.persistentPath, `${id}.tson`);
 | |
|       
 | |
|       if (fs.existsSync(filePath)) {
 | |
|         await fs.promises.unlink(filePath);
 | |
|       }
 | |
|     } catch (error) {
 | |
|       logger.log('error', `Failed to remove item ${id} from disk: ${error.message}`);
 | |
|       this.emit('error', error);
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Load queue items from disk
 | |
|    */
 | |
|   private async loadFromDisk(): Promise<void> {
 | |
|     try {
 | |
|       // Check if directory exists
 | |
|       if (!fs.existsSync(this.options.persistentPath)) {
 | |
|         return;
 | |
|       }
 | |
|       
 | |
|       // Get all JSON files
 | |
|       const files = fs.readdirSync(this.options.persistentPath).filter(file => file.endsWith('.tson'));
 | |
|       
 | |
|       // Load each file
 | |
|       for (const file of files) {
 | |
|         try {
 | |
|           const filePath = path.join(this.options.persistentPath, file);
 | |
|           const data = await fs.promises.readFile(filePath, 'utf8');
 | |
|           const item = JSON.parse(data) as IQueueItem;
 | |
|           
 | |
|           // Convert date strings to Date objects
 | |
|           item.createdAt = new Date(item.createdAt);
 | |
|           item.updatedAt = new Date(item.updatedAt);
 | |
|           item.nextAttempt = new Date(item.nextAttempt);
 | |
|           if (item.deliveredAt) {
 | |
|             item.deliveredAt = new Date(item.deliveredAt);
 | |
|           }
 | |
|           
 | |
|           // Add to queue
 | |
|           this.queue.set(item.id, item);
 | |
|         } catch (error) {
 | |
|           logger.log('error', `Failed to load item from ${file}: ${error.message}`);
 | |
|         }
 | |
|       }
 | |
|       
 | |
|       // Update statistics
 | |
|       this.updateStats();
 | |
|       
 | |
|       logger.log('info', `Loaded ${this.queue.size} items from disk`);
 | |
|     } catch (error) {
 | |
|       logger.log('error', `Failed to load items from disk: ${error.message}`);
 | |
|       throw error;
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Update queue statistics
 | |
|    */
 | |
|   private updateStats(): void {
 | |
|     // Reset counters
 | |
|     this.stats.queueSize = this.queue.size;
 | |
|     this.stats.status = {
 | |
|       pending: 0,
 | |
|       processing: 0,
 | |
|       delivered: 0,
 | |
|       failed: 0,
 | |
|       deferred: 0
 | |
|     };
 | |
|     this.stats.modes = {
 | |
|       forward: 0,
 | |
|       mta: 0,
 | |
|       process: 0
 | |
|     };
 | |
|     
 | |
|     let totalAttempts = 0;
 | |
|     let oldestTime = Date.now();
 | |
|     let newestTime = 0;
 | |
|     
 | |
|     // Count by status and mode
 | |
|     for (const item of this.queue.values()) {
 | |
|       // Count by status
 | |
|       this.stats.status[item.status]++;
 | |
|       
 | |
|       // Count by mode
 | |
|       this.stats.modes[item.processingMode]++;
 | |
|       
 | |
|       // Track total attempts
 | |
|       totalAttempts += item.attempts;
 | |
|       
 | |
|       // Track oldest and newest
 | |
|       const itemTime = item.createdAt.getTime();
 | |
|       if (itemTime < oldestTime) {
 | |
|         oldestTime = itemTime;
 | |
|       }
 | |
|       if (itemTime > newestTime) {
 | |
|         newestTime = itemTime;
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     // Calculate average attempts
 | |
|     this.stats.averageAttempts = this.queue.size > 0 ? totalAttempts / this.queue.size : 0;
 | |
|     
 | |
|     // Set oldest and newest
 | |
|     this.stats.oldestItem = this.queue.size > 0 ? new Date(oldestTime) : undefined;
 | |
|     this.stats.newestItem = this.queue.size > 0 ? new Date(newestTime) : undefined;
 | |
|     
 | |
|     // Set total processed
 | |
|     this.stats.totalProcessed = this.totalProcessed;
 | |
|     
 | |
|     // Set processing active
 | |
|     this.stats.processingActive = this.processing;
 | |
|     
 | |
|     // Emit statistics event
 | |
|     this.emit('statsUpdated', this.stats);
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Get queue statistics
 | |
|    */
 | |
|   public getStats(): IQueueStats {
 | |
|     return { ...this.stats };
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Pause queue processing
 | |
|    */
 | |
|   public pause(): void {
 | |
|     if (this.processing) {
 | |
|       this.stopProcessing();
 | |
|       logger.log('info', 'Queue processing paused');
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Resume queue processing
 | |
|    */
 | |
|   public resume(): void {
 | |
|     if (!this.processing) {
 | |
|       this.startProcessing();
 | |
|       logger.log('info', 'Queue processing resumed');
 | |
|     }
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Clean up old delivered and failed items
 | |
|    * @param maxAge Maximum age in milliseconds (default: 7 days)
 | |
|    */
 | |
|   public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise<number> {
 | |
|     const cutoff = new Date(Date.now() - maxAge);
 | |
|     let removedCount = 0;
 | |
|     
 | |
|     // Find old items
 | |
|     for (const item of this.queue.values()) {
 | |
|       if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) {
 | |
|         // Remove item
 | |
|         await this.removeItem(item.id);
 | |
|         removedCount++;
 | |
|       }
 | |
|     }
 | |
|     
 | |
|     logger.log('info', `Cleaned up ${removedCount} old items`);
 | |
|     return removedCount;
 | |
|   }
 | |
|   
 | |
|   /**
 | |
|    * Shutdown the queue
 | |
|    */
 | |
|   public async shutdown(): Promise<void> {
 | |
|     logger.log('info', 'Shutting down UnifiedDeliveryQueue');
 | |
|     
 | |
|     // Stop processing
 | |
|     this.stopProcessing();
 | |
|     
 | |
|     // Clear the check timer to prevent memory leaks
 | |
|     if (this.checkTimer) {
 | |
|       clearInterval(this.checkTimer);
 | |
|       this.checkTimer = undefined;
 | |
|     }
 | |
|     
 | |
|     // If using disk storage, make sure all items are persisted
 | |
|     if (this.options.storageType === 'disk') {
 | |
|       const pendingWrites: Promise<void>[] = [];
 | |
|       
 | |
|       for (const item of this.queue.values()) {
 | |
|         pendingWrites.push(this.persistItem(item));
 | |
|       }
 | |
|       
 | |
|       // Wait for all writes to complete
 | |
|       await Promise.all(pendingWrites);
 | |
|     }
 | |
|     
 | |
|     // Clear the queue (memory only)
 | |
|     this.queue.clear();
 | |
|     
 | |
|     // Update statistics
 | |
|     this.updateStats();
 | |
|     
 | |
|     // Emit shutdown event
 | |
|     this.emit('shutdown');
 | |
|     logger.log('info', 'UnifiedDeliveryQueue shut down successfully');
 | |
|   }
 | |
| } |