platformservice/ts/dcrouter/classes.delivery.queue.ts

453 lines
11 KiB
TypeScript

import * as plugins from '../plugins.js';
import type { IQueueConfig } from './classes.smtp.config.js';
import type { IProcessingResult } from './classes.email.processor.js';
import { EventEmitter } from 'node:events';
import * as fs from 'node:fs';
import * as path from 'node:path';
/**
* Queue item status
*/
export type QueueItemStatus = 'pending' | 'processing' | 'delivered' | 'failed' | 'deferred';
/**
* Queue item
*/
export interface IQueueItem {
id: string;
processingResult: IProcessingResult;
status: QueueItemStatus;
attempts: number;
nextAttempt: Date;
lastError?: string;
createdAt: Date;
updatedAt: Date;
deliveredAt?: Date;
}
/**
* Delivery queue component for store-and-forward functionality
*/
export class DeliveryQueue extends EventEmitter {
private config: IQueueConfig;
private queue: Map<string, IQueueItem> = new Map();
private isProcessing: boolean = false;
private processingInterval: NodeJS.Timeout | null = null;
private persistenceTimer: NodeJS.Timeout | null = null;
/**
* Create a new delivery queue
* @param config Queue configuration
*/
constructor(config: IQueueConfig) {
super();
this.config = {
storageType: 'memory',
maxRetries: 5,
baseRetryDelay: 60000, // 1 minute
maxRetryDelay: 3600000, // 1 hour
maxQueueSize: 10000,
...config
};
}
/**
* Initialize the queue
*/
public async initialize(): Promise<void> {
try {
// Load queue from persistent storage if enabled
if (this.config.storageType === 'disk' && this.config.persistentPath) {
await this.load();
}
// Set up processing interval
this.startProcessing();
// Set up persistence interval if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) {
this.persistenceTimer = setInterval(() => {
this.save().catch(err => {
console.error('Error saving queue:', err);
});
}, 60000); // Save every minute
}
this.emit('initialized');
} catch (error) {
console.error('Failed to initialize delivery queue:', error);
throw error;
}
}
/**
* Start processing the queue
*/
private startProcessing(): void {
if (this.processingInterval) {
clearInterval(this.processingInterval);
}
this.processingInterval = setInterval(() => {
this.processQueue().catch(err => {
console.error('Error processing queue:', err);
});
}, 1000); // Check every second
}
/**
* Add an item to the queue
* @param processingResult Processing result to queue
*/
public async enqueue(processingResult: IProcessingResult): Promise<string> {
// Skip if the action is reject
if (processingResult.action === 'reject') {
throw new Error('Cannot queue a rejected message');
}
// Check if queue is full
if (this.config.maxQueueSize && this.queue.size >= this.config.maxQueueSize) {
throw new Error('Queue is full');
}
// Create queue item
const queueItem: IQueueItem = {
id: processingResult.id,
processingResult,
status: 'pending',
attempts: 0,
nextAttempt: new Date(),
createdAt: new Date(),
updatedAt: new Date()
};
// Add to queue
this.queue.set(queueItem.id, queueItem);
// Save queue if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) {
await this.saveItem(queueItem);
}
this.emit('enqueued', queueItem);
return queueItem.id;
}
/**
* Process the queue
*/
private async processQueue(): Promise<void> {
// Skip if already processing
if (this.isProcessing) {
return;
}
this.isProcessing = true;
try {
// Get items that are ready for delivery
const now = new Date();
const readyItems: IQueueItem[] = [];
for (const item of this.queue.values()) {
if (item.status === 'pending' && item.nextAttempt <= now) {
readyItems.push(item);
}
}
// If no items are ready, skip processing
if (!readyItems.length) {
return;
}
// Emit event with ready items
this.emit('itemsReady', readyItems);
} finally {
this.isProcessing = false;
}
}
/**
* Get an item from the queue
* @param id Item ID
*/
public getItem(id: string): IQueueItem | undefined {
return this.queue.get(id);
}
/**
* Get all items in the queue
*/
public getAllItems(): IQueueItem[] {
return Array.from(this.queue.values());
}
/**
* Get items by status
* @param status Status to filter by
*/
public getItemsByStatus(status: QueueItemStatus): IQueueItem[] {
return Array.from(this.queue.values()).filter(item => item.status === status);
}
/**
* Update an item in the queue
* @param id Item ID
* @param updates Updates to apply
*/
public async updateItem(id: string, updates: Partial<IQueueItem>): Promise<boolean> {
const item = this.queue.get(id);
if (!item) {
return false;
}
// Apply updates
Object.assign(item, {
...updates,
updatedAt: new Date()
});
// Save queue if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) {
await this.saveItem(item);
}
this.emit('itemUpdated', item);
return true;
}
/**
* Mark an item as delivered
* @param id Item ID
*/
public async markDelivered(id: string): Promise<boolean> {
return this.updateItem(id, {
status: 'delivered',
deliveredAt: new Date()
});
}
/**
* Mark an item as failed
* @param id Item ID
* @param error Error message
*/
public async markFailed(id: string, error: string): Promise<boolean> {
const item = this.queue.get(id);
if (!item) {
return false;
}
// Check if max retries reached
if (item.attempts >= (this.config.maxRetries || 5)) {
return this.updateItem(id, {
status: 'failed',
lastError: error
});
}
// Calculate next attempt time with exponential backoff
const attempts = item.attempts + 1;
const baseDelay = this.config.baseRetryDelay || 60000; // 1 minute
const maxDelay = this.config.maxRetryDelay || 3600000; // 1 hour
const delay = Math.min(
baseDelay * Math.pow(2, attempts - 1),
maxDelay
);
const nextAttempt = new Date(Date.now() + delay);
return this.updateItem(id, {
status: 'deferred',
attempts,
nextAttempt,
lastError: error
});
}
/**
* Remove an item from the queue
* @param id Item ID
*/
public async removeItem(id: string): Promise<boolean> {
if (!this.queue.has(id)) {
return false;
}
this.queue.delete(id);
// Remove from disk if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) {
await this.removeItemFile(id);
}
this.emit('itemRemoved', id);
return true;
}
/**
* Pause queue processing
*/
public pause(): void {
if (this.processingInterval) {
clearInterval(this.processingInterval);
this.processingInterval = null;
}
this.emit('paused');
}
/**
* Resume queue processing
*/
public resume(): void {
if (!this.processingInterval) {
this.startProcessing();
}
this.emit('resumed');
}
/**
* Shutdown the queue
*/
public async shutdown(): Promise<void> {
// Stop processing
if (this.processingInterval) {
clearInterval(this.processingInterval);
this.processingInterval = null;
}
// Stop persistence timer
if (this.persistenceTimer) {
clearInterval(this.persistenceTimer);
this.persistenceTimer = null;
}
// Save queue if using disk storage
if (this.config.storageType === 'disk' && this.config.persistentPath) {
await this.save();
}
this.emit('shutdown');
}
/**
* Load queue from disk
*/
private async load(): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
// Create directory if it doesn't exist
if (!fs.existsSync(this.config.persistentPath)) {
fs.mkdirSync(this.config.persistentPath, { recursive: true });
}
// Read the queue directory
const files = fs.readdirSync(this.config.persistentPath);
// Load each item
for (const file of files) {
if (file.endsWith('.json')) {
try {
const filePath = path.join(this.config.persistentPath, file);
const data = fs.readFileSync(filePath, 'utf8');
const item = JSON.parse(data) as IQueueItem;
// Convert string dates back to Date objects
item.nextAttempt = new Date(item.nextAttempt);
item.createdAt = new Date(item.createdAt);
item.updatedAt = new Date(item.updatedAt);
if (item.deliveredAt) {
item.deliveredAt = new Date(item.deliveredAt);
}
// Add to queue
this.queue.set(item.id, item);
} catch (err) {
console.error(`Error loading queue item ${file}:`, err);
}
}
}
console.log(`Loaded ${this.queue.size} items from queue`);
} catch (error) {
console.error('Error loading queue:', error);
throw error;
}
}
/**
* Save queue to disk
*/
private async save(): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
// Create directory if it doesn't exist
if (!fs.existsSync(this.config.persistentPath)) {
fs.mkdirSync(this.config.persistentPath, { recursive: true });
}
// Save each item
const savePromises = Array.from(this.queue.values()).map(item => this.saveItem(item));
await Promise.all(savePromises);
} catch (error) {
console.error('Error saving queue:', error);
throw error;
}
}
/**
* Save a single item to disk
* @param item Queue item to save
*/
private async saveItem(item: IQueueItem): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
const filePath = path.join(this.config.persistentPath, `${item.id}.json`);
const data = JSON.stringify(item, null, 2);
await fs.promises.writeFile(filePath, data, 'utf8');
} catch (error) {
console.error(`Error saving queue item ${item.id}:`, error);
throw error;
}
}
/**
* Remove a single item file from disk
* @param id Item ID
*/
private async removeItemFile(id: string): Promise<void> {
if (!this.config.persistentPath) {
return;
}
try {
const filePath = path.join(this.config.persistentPath, `${id}.json`);
if (fs.existsSync(filePath)) {
await fs.promises.unlink(filePath);
}
} catch (error) {
console.error(`Error removing queue item file ${id}:`, error);
throw error;
}
}
}