645 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
		
		
			
		
	
	
			645 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
|  | import * as plugins from '../../plugins.ts'; | ||
|  | import { EventEmitter } from 'node:events'; | ||
|  | import * as fs from 'node:fs'; | ||
|  | import * as path from 'node:path'; | ||
|  | import { logger } from '../../logger.ts'; | ||
|  | import { type EmailProcessingMode } from '../routing/classes.email.config.ts'; | ||
|  | import type { IEmailRoute } from '../routing/interfaces.ts'; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Queue item status | ||
|  |  */ | ||
|  | export type QueueItemStatus = 'pending' | 'processing' | 'delivered' | 'failed' | 'deferred'; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Queue item interface | ||
|  |  */ | ||
|  | export interface IQueueItem { | ||
|  |   id: string; | ||
|  |   processingMode: EmailProcessingMode; | ||
|  |   processingResult: any; | ||
|  |   route: IEmailRoute; | ||
|  |   status: QueueItemStatus; | ||
|  |   attempts: number; | ||
|  |   nextAttempt: Date; | ||
|  |   lastError?: string; | ||
|  |   createdAt: Date; | ||
|  |   updatedAt: Date; | ||
|  |   deliveredAt?: Date; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Queue options interface | ||
|  |  */ | ||
|  | export interface IQueueOptions { | ||
|  |   // Storage options
 | ||
|  |   storageType?: 'memory' | 'disk'; | ||
|  |   persistentPath?: string; | ||
|  |    | ||
|  |   // Queue behavior
 | ||
|  |   checkInterval?: number; | ||
|  |   maxQueueSize?: number; | ||
|  |   maxPerDestination?: number; | ||
|  |    | ||
|  |   // Delivery attempts
 | ||
|  |   maxRetries?: number; | ||
|  |   baseRetryDelay?: number; | ||
|  |   maxRetryDelay?: number; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Queue statistics interface | ||
|  |  */ | ||
|  | export interface IQueueStats { | ||
|  |   queueSize: number; | ||
|  |   status: { | ||
|  |     pending: number; | ||
|  |     processing: number; | ||
|  |     delivered: number; | ||
|  |     failed: number; | ||
|  |     deferred: number; | ||
|  |   }; | ||
|  |   modes: { | ||
|  |     forward: number; | ||
|  |     mta: number; | ||
|  |     process: number; | ||
|  |   }; | ||
|  |   oldestItem?: Date; | ||
|  |   newestItem?: Date; | ||
|  |   averageAttempts: number; | ||
|  |   totalProcessed: number; | ||
|  |   processingActive: boolean; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * A unified queue for all email modes | ||
|  |  */ | ||
|  | export class UnifiedDeliveryQueue extends EventEmitter { | ||
|  |   private options: Required<IQueueOptions>; | ||
|  |   private queue: Map<string, IQueueItem> = new Map(); | ||
|  |   private checkTimer?: NodeJS.Timeout; | ||
|  |   private stats: IQueueStats; | ||
|  |   private processing: boolean = false; | ||
|  |   private totalProcessed: number = 0; | ||
|  |    | ||
|  |   /** | ||
|  |    * Create a new unified delivery queue | ||
|  |    * @param options Queue options | ||
|  |    */ | ||
|  |   constructor(options: IQueueOptions) { | ||
|  |     super(); | ||
|  |      | ||
|  |     // Set default options
 | ||
|  |     this.options = { | ||
|  |       storageType: options.storageType || 'memory', | ||
|  |       persistentPath: options.persistentPath || path.join(process.cwd(), 'email-queue'), | ||
|  |       checkInterval: options.checkInterval || 30000, // 30 seconds
 | ||
|  |       maxQueueSize: options.maxQueueSize || 10000, | ||
|  |       maxPerDestination: options.maxPerDestination || 100, | ||
|  |       maxRetries: options.maxRetries || 5, | ||
|  |       baseRetryDelay: options.baseRetryDelay || 60000, // 1 minute
 | ||
|  |       maxRetryDelay: options.maxRetryDelay || 3600000 // 1 hour
 | ||
|  |     }; | ||
|  |      | ||
|  |     // Initialize statistics
 | ||
|  |     this.stats = { | ||
|  |       queueSize: 0, | ||
|  |       status: { | ||
|  |         pending: 0, | ||
|  |         processing: 0, | ||
|  |         delivered: 0, | ||
|  |         failed: 0, | ||
|  |         deferred: 0 | ||
|  |       }, | ||
|  |       modes: { | ||
|  |         forward: 0, | ||
|  |         mta: 0, | ||
|  |         process: 0 | ||
|  |       }, | ||
|  |       averageAttempts: 0, | ||
|  |       totalProcessed: 0, | ||
|  |       processingActive: false | ||
|  |     }; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Initialize the queue | ||
|  |    */ | ||
|  |   public async initialize(): Promise<void> { | ||
|  |     logger.log('info', 'Initializing UnifiedDeliveryQueue'); | ||
|  |      | ||
|  |     try { | ||
|  |       // Create persistent storage directory if using disk storage
 | ||
|  |       if (this.options.storageType === 'disk') { | ||
|  |         if (!fs.existsSync(this.options.persistentPath)) { | ||
|  |           fs.mkdirSync(this.options.persistentPath, { recursive: true }); | ||
|  |         } | ||
|  |          | ||
|  |         // Load existing items from disk
 | ||
|  |         await this.loadFromDisk(); | ||
|  |       } | ||
|  |        | ||
|  |       // Start the queue processing timer
 | ||
|  |       this.startProcessing(); | ||
|  |        | ||
|  |       // Emit initialized event
 | ||
|  |       this.emit('initialized'); | ||
|  |       logger.log('info', 'UnifiedDeliveryQueue initialized successfully'); | ||
|  |     } catch (error) { | ||
|  |       logger.log('error', `Failed to initialize queue: ${error.message}`); | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Start queue processing | ||
|  |    */ | ||
|  |   private startProcessing(): void { | ||
|  |     if (this.checkTimer) { | ||
|  |       clearInterval(this.checkTimer); | ||
|  |     } | ||
|  |      | ||
|  |     this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval); | ||
|  |     this.processing = true; | ||
|  |     this.stats.processingActive = true; | ||
|  |     this.emit('processingStarted'); | ||
|  |     logger.log('info', 'Queue processing started'); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Stop queue processing | ||
|  |    */ | ||
|  |   private stopProcessing(): void { | ||
|  |     if (this.checkTimer) { | ||
|  |       clearInterval(this.checkTimer); | ||
|  |       this.checkTimer = undefined; | ||
|  |     } | ||
|  |      | ||
|  |     this.processing = false; | ||
|  |     this.stats.processingActive = false; | ||
|  |     this.emit('processingStopped'); | ||
|  |     logger.log('info', 'Queue processing stopped'); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Check for items that need to be processed | ||
|  |    */ | ||
|  |   private async processQueue(): Promise<void> { | ||
|  |     try { | ||
|  |       const now = new Date(); | ||
|  |       let readyItems: IQueueItem[] = []; | ||
|  |        | ||
|  |       // Find items ready for processing
 | ||
|  |       for (const item of this.queue.values()) { | ||
|  |         if (item.status === 'pending' || (item.status === 'deferred' && item.nextAttempt <= now)) { | ||
|  |           readyItems.push(item); | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       if (readyItems.length === 0) { | ||
|  |         return; | ||
|  |       } | ||
|  |        | ||
|  |       // Sort by oldest first
 | ||
|  |       readyItems.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()); | ||
|  |        | ||
|  |       // Emit event for ready items
 | ||
|  |       this.emit('itemsReady', readyItems); | ||
|  |       logger.log('info', `Found ${readyItems.length} items ready for processing`); | ||
|  |        | ||
|  |       // Update statistics
 | ||
|  |       this.updateStats(); | ||
|  |     } catch (error) { | ||
|  |       logger.log('error', `Error processing queue: ${error.message}`); | ||
|  |       this.emit('error', error); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Add an item to the queue | ||
|  |    * @param processingResult Processing result to queue | ||
|  |    * @param mode Processing mode | ||
|  |    * @param route Email route | ||
|  |    */ | ||
|  |   public async enqueue(processingResult: any, mode: EmailProcessingMode, route: IEmailRoute): Promise<string> { | ||
|  |     // Check if queue is full
 | ||
|  |     if (this.queue.size >= this.options.maxQueueSize) { | ||
|  |       throw new Error('Queue is full'); | ||
|  |     } | ||
|  |      | ||
|  |     // Generate a unique ID
 | ||
|  |     const id = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`; | ||
|  |      | ||
|  |     // Create queue item
 | ||
|  |     const item: IQueueItem = { | ||
|  |       id, | ||
|  |       processingMode: mode, | ||
|  |       processingResult, | ||
|  |       route, | ||
|  |       status: 'pending', | ||
|  |       attempts: 0, | ||
|  |       nextAttempt: new Date(), | ||
|  |       createdAt: new Date(), | ||
|  |       updatedAt: new Date() | ||
|  |     }; | ||
|  |      | ||
|  |     // Add to queue
 | ||
|  |     this.queue.set(id, item); | ||
|  |      | ||
|  |     // Persist to disk if using disk storage
 | ||
|  |     if (this.options.storageType === 'disk') { | ||
|  |       await this.persistItem(item); | ||
|  |     } | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.updateStats(); | ||
|  |      | ||
|  |     // Emit event
 | ||
|  |     this.emit('itemEnqueued', item); | ||
|  |     logger.log('info', `Item enqueued with ID ${id}, mode: ${mode}`); | ||
|  |      | ||
|  |     return id; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Get an item from the queue | ||
|  |    * @param id Item ID | ||
|  |    */ | ||
|  |   public getItem(id: string): IQueueItem | undefined { | ||
|  |     return this.queue.get(id); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Mark an item as being processed | ||
|  |    * @param id Item ID | ||
|  |    */ | ||
|  |   public async markProcessing(id: string): Promise<boolean> { | ||
|  |     const item = this.queue.get(id); | ||
|  |      | ||
|  |     if (!item) { | ||
|  |       return false; | ||
|  |     } | ||
|  |      | ||
|  |     // Update status
 | ||
|  |     item.status = 'processing'; | ||
|  |     item.attempts++; | ||
|  |     item.updatedAt = new Date(); | ||
|  |      | ||
|  |     // Persist changes if using disk storage
 | ||
|  |     if (this.options.storageType === 'disk') { | ||
|  |       await this.persistItem(item); | ||
|  |     } | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.updateStats(); | ||
|  |      | ||
|  |     // Emit event
 | ||
|  |     this.emit('itemProcessing', item); | ||
|  |     logger.log('info', `Item ${id} marked as processing, attempt ${item.attempts}`); | ||
|  |      | ||
|  |     return true; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Mark an item as delivered | ||
|  |    * @param id Item ID | ||
|  |    */ | ||
|  |   public async markDelivered(id: string): Promise<boolean> { | ||
|  |     const item = this.queue.get(id); | ||
|  |      | ||
|  |     if (!item) { | ||
|  |       return false; | ||
|  |     } | ||
|  |      | ||
|  |     // Update status
 | ||
|  |     item.status = 'delivered'; | ||
|  |     item.updatedAt = new Date(); | ||
|  |     item.deliveredAt = new Date(); | ||
|  |      | ||
|  |     // Persist changes if using disk storage
 | ||
|  |     if (this.options.storageType === 'disk') { | ||
|  |       await this.persistItem(item); | ||
|  |     } | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.totalProcessed++; | ||
|  |     this.updateStats(); | ||
|  |      | ||
|  |     // Emit event
 | ||
|  |     this.emit('itemDelivered', item); | ||
|  |     logger.log('info', `Item ${id} marked as delivered after ${item.attempts} attempts`); | ||
|  |      | ||
|  |     return true; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Mark an item as failed | ||
|  |    * @param id Item ID | ||
|  |    * @param error Error message | ||
|  |    */ | ||
|  |   public async markFailed(id: string, error: string): Promise<boolean> { | ||
|  |     const item = this.queue.get(id); | ||
|  |      | ||
|  |     if (!item) { | ||
|  |       return false; | ||
|  |     } | ||
|  |      | ||
|  |     // Determine if we should retry
 | ||
|  |     if (item.attempts < this.options.maxRetries) { | ||
|  |       // Calculate next retry time with exponential backoff
 | ||
|  |       const delay = Math.min( | ||
|  |         this.options.baseRetryDelay * Math.pow(2, item.attempts - 1), | ||
|  |         this.options.maxRetryDelay | ||
|  |       ); | ||
|  |        | ||
|  |       // Update status
 | ||
|  |       item.status = 'deferred'; | ||
|  |       item.lastError = error; | ||
|  |       item.nextAttempt = new Date(Date.now() + delay); | ||
|  |       item.updatedAt = new Date(); | ||
|  |        | ||
|  |       // Persist changes if using disk storage
 | ||
|  |       if (this.options.storageType === 'disk') { | ||
|  |         await this.persistItem(item); | ||
|  |       } | ||
|  |        | ||
|  |       // Emit event
 | ||
|  |       this.emit('itemDeferred', item); | ||
|  |       logger.log('info', `Item ${id} deferred for ${delay}ms, attempt ${item.attempts}, error: ${error}`); | ||
|  |     } else { | ||
|  |       // Mark as permanently failed
 | ||
|  |       item.status = 'failed'; | ||
|  |       item.lastError = error; | ||
|  |       item.updatedAt = new Date(); | ||
|  |        | ||
|  |       // Persist changes if using disk storage
 | ||
|  |       if (this.options.storageType === 'disk') { | ||
|  |         await this.persistItem(item); | ||
|  |       } | ||
|  |        | ||
|  |       // Update statistics
 | ||
|  |       this.totalProcessed++; | ||
|  |        | ||
|  |       // Emit event
 | ||
|  |       this.emit('itemFailed', item); | ||
|  |       logger.log('warn', `Item ${id} permanently failed after ${item.attempts} attempts, error: ${error}`); | ||
|  |     } | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.updateStats(); | ||
|  |      | ||
|  |     return true; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Remove an item from the queue | ||
|  |    * @param id Item ID | ||
|  |    */ | ||
|  |   public async removeItem(id: string): Promise<boolean> { | ||
|  |     const item = this.queue.get(id); | ||
|  |      | ||
|  |     if (!item) { | ||
|  |       return false; | ||
|  |     } | ||
|  |      | ||
|  |     // Remove from queue
 | ||
|  |     this.queue.delete(id); | ||
|  |      | ||
|  |     // Remove from disk if using disk storage
 | ||
|  |     if (this.options.storageType === 'disk') { | ||
|  |       await this.removeItemFromDisk(id); | ||
|  |     } | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.updateStats(); | ||
|  |      | ||
|  |     // Emit event
 | ||
|  |     this.emit('itemRemoved', item); | ||
|  |     logger.log('info', `Item ${id} removed from queue`); | ||
|  |      | ||
|  |     return true; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Persist an item to disk | ||
|  |    * @param item Item to persist | ||
|  |    */ | ||
|  |   private async persistItem(item: IQueueItem): Promise<void> { | ||
|  |     try { | ||
|  |       const filePath = path.join(this.options.persistentPath, `${item.id}.tson`); | ||
|  |       await fs.promises.writeFile(filePath, JSON.stringify(item, null, 2), 'utf8'); | ||
|  |     } catch (error) { | ||
|  |       logger.log('error', `Failed to persist item ${item.id}: ${error.message}`); | ||
|  |       this.emit('error', error); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Remove an item from disk | ||
|  |    * @param id Item ID | ||
|  |    */ | ||
|  |   private async removeItemFromDisk(id: string): Promise<void> { | ||
|  |     try { | ||
|  |       const filePath = path.join(this.options.persistentPath, `${id}.tson`); | ||
|  |        | ||
|  |       if (fs.existsSync(filePath)) { | ||
|  |         await fs.promises.unlink(filePath); | ||
|  |       } | ||
|  |     } catch (error) { | ||
|  |       logger.log('error', `Failed to remove item ${id} from disk: ${error.message}`); | ||
|  |       this.emit('error', error); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Load queue items from disk | ||
|  |    */ | ||
|  |   private async loadFromDisk(): Promise<void> { | ||
|  |     try { | ||
|  |       // Check if directory exists
 | ||
|  |       if (!fs.existsSync(this.options.persistentPath)) { | ||
|  |         return; | ||
|  |       } | ||
|  |        | ||
|  |       // Get all JSON files
 | ||
|  |       const files = fs.readdirSync(this.options.persistentPath).filter(file => file.endsWith('.tson')); | ||
|  |        | ||
|  |       // Load each file
 | ||
|  |       for (const file of files) { | ||
|  |         try { | ||
|  |           const filePath = path.join(this.options.persistentPath, file); | ||
|  |           const data = await fs.promises.readFile(filePath, 'utf8'); | ||
|  |           const item = JSON.parse(data) as IQueueItem; | ||
|  |            | ||
|  |           // Convert date strings to Date objects
 | ||
|  |           item.createdAt = new Date(item.createdAt); | ||
|  |           item.updatedAt = new Date(item.updatedAt); | ||
|  |           item.nextAttempt = new Date(item.nextAttempt); | ||
|  |           if (item.deliveredAt) { | ||
|  |             item.deliveredAt = new Date(item.deliveredAt); | ||
|  |           } | ||
|  |            | ||
|  |           // Add to queue
 | ||
|  |           this.queue.set(item.id, item); | ||
|  |         } catch (error) { | ||
|  |           logger.log('error', `Failed to load item from ${file}: ${error.message}`); | ||
|  |         } | ||
|  |       } | ||
|  |        | ||
|  |       // Update statistics
 | ||
|  |       this.updateStats(); | ||
|  |        | ||
|  |       logger.log('info', `Loaded ${this.queue.size} items from disk`); | ||
|  |     } catch (error) { | ||
|  |       logger.log('error', `Failed to load items from disk: ${error.message}`); | ||
|  |       throw error; | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Update queue statistics | ||
|  |    */ | ||
|  |   private updateStats(): void { | ||
|  |     // Reset counters
 | ||
|  |     this.stats.queueSize = this.queue.size; | ||
|  |     this.stats.status = { | ||
|  |       pending: 0, | ||
|  |       processing: 0, | ||
|  |       delivered: 0, | ||
|  |       failed: 0, | ||
|  |       deferred: 0 | ||
|  |     }; | ||
|  |     this.stats.modes = { | ||
|  |       forward: 0, | ||
|  |       mta: 0, | ||
|  |       process: 0 | ||
|  |     }; | ||
|  |      | ||
|  |     let totalAttempts = 0; | ||
|  |     let oldestTime = Date.now(); | ||
|  |     let newestTime = 0; | ||
|  |      | ||
|  |     // Count by status and mode
 | ||
|  |     for (const item of this.queue.values()) { | ||
|  |       // Count by status
 | ||
|  |       this.stats.status[item.status]++; | ||
|  |        | ||
|  |       // Count by mode
 | ||
|  |       this.stats.modes[item.processingMode]++; | ||
|  |        | ||
|  |       // Track total attempts
 | ||
|  |       totalAttempts += item.attempts; | ||
|  |        | ||
|  |       // Track oldest and newest
 | ||
|  |       const itemTime = item.createdAt.getTime(); | ||
|  |       if (itemTime < oldestTime) { | ||
|  |         oldestTime = itemTime; | ||
|  |       } | ||
|  |       if (itemTime > newestTime) { | ||
|  |         newestTime = itemTime; | ||
|  |       } | ||
|  |     } | ||
|  |      | ||
|  |     // Calculate average attempts
 | ||
|  |     this.stats.averageAttempts = this.queue.size > 0 ? totalAttempts / this.queue.size : 0; | ||
|  |      | ||
|  |     // Set oldest and newest
 | ||
|  |     this.stats.oldestItem = this.queue.size > 0 ? new Date(oldestTime) : undefined; | ||
|  |     this.stats.newestItem = this.queue.size > 0 ? new Date(newestTime) : undefined; | ||
|  |      | ||
|  |     // Set total processed
 | ||
|  |     this.stats.totalProcessed = this.totalProcessed; | ||
|  |      | ||
|  |     // Set processing active
 | ||
|  |     this.stats.processingActive = this.processing; | ||
|  |      | ||
|  |     // Emit statistics event
 | ||
|  |     this.emit('statsUpdated', this.stats); | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Get queue statistics | ||
|  |    */ | ||
|  |   public getStats(): IQueueStats { | ||
|  |     return { ...this.stats }; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Pause queue processing | ||
|  |    */ | ||
|  |   public pause(): void { | ||
|  |     if (this.processing) { | ||
|  |       this.stopProcessing(); | ||
|  |       logger.log('info', 'Queue processing paused'); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Resume queue processing | ||
|  |    */ | ||
|  |   public resume(): void { | ||
|  |     if (!this.processing) { | ||
|  |       this.startProcessing(); | ||
|  |       logger.log('info', 'Queue processing resumed'); | ||
|  |     } | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Clean up old delivered and failed items | ||
|  |    * @param maxAge Maximum age in milliseconds (default: 7 days) | ||
|  |    */ | ||
|  |   public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise<number> { | ||
|  |     const cutoff = new Date(Date.now() - maxAge); | ||
|  |     let removedCount = 0; | ||
|  |      | ||
|  |     // Find old items
 | ||
|  |     for (const item of this.queue.values()) { | ||
|  |       if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) { | ||
|  |         // Remove item
 | ||
|  |         await this.removeItem(item.id); | ||
|  |         removedCount++; | ||
|  |       } | ||
|  |     } | ||
|  |      | ||
|  |     logger.log('info', `Cleaned up ${removedCount} old items`); | ||
|  |     return removedCount; | ||
|  |   } | ||
|  |    | ||
|  |   /** | ||
|  |    * Shutdown the queue | ||
|  |    */ | ||
|  |   public async shutdown(): Promise<void> { | ||
|  |     logger.log('info', 'Shutting down UnifiedDeliveryQueue'); | ||
|  |      | ||
|  |     // Stop processing
 | ||
|  |     this.stopProcessing(); | ||
|  |      | ||
|  |     // Clear the check timer to prevent memory leaks
 | ||
|  |     if (this.checkTimer) { | ||
|  |       clearInterval(this.checkTimer); | ||
|  |       this.checkTimer = undefined; | ||
|  |     } | ||
|  |      | ||
|  |     // If using disk storage, make sure all items are persisted
 | ||
|  |     if (this.options.storageType === 'disk') { | ||
|  |       const pendingWrites: Promise<void>[] = []; | ||
|  |        | ||
|  |       for (const item of this.queue.values()) { | ||
|  |         pendingWrites.push(this.persistItem(item)); | ||
|  |       } | ||
|  |        | ||
|  |       // Wait for all writes to complete
 | ||
|  |       await Promise.all(pendingWrites); | ||
|  |     } | ||
|  |      | ||
|  |     // Clear the queue (memory only)
 | ||
|  |     this.queue.clear(); | ||
|  |      | ||
|  |     // Update statistics
 | ||
|  |     this.updateStats(); | ||
|  |      | ||
|  |     // Emit shutdown event
 | ||
|  |     this.emit('shutdown'); | ||
|  |     logger.log('info', 'UnifiedDeliveryQueue shut down successfully'); | ||
|  |   } | ||
|  | } |