fix(mail): add periodic cleanup timers and proper shutdown handling for bounce manager and delivery queue; avoid mutating maps during iteration and prune stale rate-limiter stats to prevent memory growth
This commit is contained in:
@@ -78,6 +78,7 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
private options: Required<IQueueOptions>;
|
||||
private queue: Map<string, IQueueItem> = new Map();
|
||||
private checkTimer?: NodeJS.Timeout;
|
||||
private cleanupTimer?: NodeJS.Timeout;
|
||||
private stats: IQueueStats;
|
||||
private processing: boolean = false;
|
||||
private totalProcessed: number = 0;
|
||||
@@ -158,8 +159,19 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
if (this.checkTimer) {
|
||||
clearInterval(this.checkTimer);
|
||||
}
|
||||
|
||||
|
||||
this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval);
|
||||
|
||||
// Start periodic cleanup of delivered/failed items (every 30 minutes)
|
||||
if (this.cleanupTimer) {
|
||||
clearInterval(this.cleanupTimer);
|
||||
}
|
||||
this.cleanupTimer = setInterval(() => {
|
||||
this.cleanupOldItems(24 * 60 * 60 * 1000).catch((err) => {
|
||||
logger.log('error', `Auto-cleanup failed: ${err.message}`);
|
||||
});
|
||||
}, 30 * 60 * 1000);
|
||||
|
||||
this.processing = true;
|
||||
this.stats.processingActive = true;
|
||||
this.emit('processingStarted');
|
||||
@@ -174,7 +186,11 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
clearInterval(this.checkTimer);
|
||||
this.checkTimer = undefined;
|
||||
}
|
||||
|
||||
if (this.cleanupTimer) {
|
||||
clearInterval(this.cleanupTimer);
|
||||
this.cleanupTimer = undefined;
|
||||
}
|
||||
|
||||
this.processing = false;
|
||||
this.stats.processingActive = false;
|
||||
this.emit('processingStopped');
|
||||
@@ -590,19 +606,24 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
*/
|
||||
public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise<number> {
|
||||
const cutoff = new Date(Date.now() - maxAge);
|
||||
let removedCount = 0;
|
||||
|
||||
// Find old items
|
||||
|
||||
// Collect IDs first to avoid modifying the Map during iteration
|
||||
const idsToRemove: string[] = [];
|
||||
for (const item of this.queue.values()) {
|
||||
if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) {
|
||||
// Remove item
|
||||
await this.removeItem(item.id);
|
||||
removedCount++;
|
||||
idsToRemove.push(item.id);
|
||||
}
|
||||
}
|
||||
|
||||
logger.log('info', `Cleaned up ${removedCount} old items`);
|
||||
return removedCount;
|
||||
|
||||
// Remove collected items
|
||||
for (const id of idsToRemove) {
|
||||
await this.removeItem(id);
|
||||
}
|
||||
|
||||
if (idsToRemove.length > 0) {
|
||||
logger.log('info', `Cleaned up ${idsToRemove.length} old items from delivery queue`);
|
||||
}
|
||||
return idsToRemove.length;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -611,15 +632,9 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
||||
public async shutdown(): Promise<void> {
|
||||
logger.log('info', 'Shutting down UnifiedDeliveryQueue');
|
||||
|
||||
// Stop processing
|
||||
// Stop processing (clears both check and cleanup timers)
|
||||
this.stopProcessing();
|
||||
|
||||
// Clear the check timer to prevent memory leaks
|
||||
if (this.checkTimer) {
|
||||
clearInterval(this.checkTimer);
|
||||
this.checkTimer = undefined;
|
||||
}
|
||||
|
||||
// If using disk storage, make sure all items are persisted
|
||||
if (this.options.storageType === 'disk') {
|
||||
const pendingWrites: Promise<void>[] = [];
|
||||
|
||||
Reference in New Issue
Block a user