From 8851d614664cb1d65377a73e8bf2234b775d5a25 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 2 Mar 2026 14:06:47 +0000 Subject: [PATCH] fix(mail): add periodic cleanup timers and proper shutdown handling for bounce manager and delivery queue; avoid mutating maps during iteration and prune stale rate-limiter stats to prevent memory growth --- changelog.md | 8 +++ ts/00_commitinfo_data.ts | 2 +- ts/mail/core/classes.bouncemanager.ts | 28 ++++++++-- ts/mail/delivery/classes.delivery.queue.ts | 51 ++++++++++++------- .../delivery/classes.unified.rate.limiter.ts | 16 +++++- 5 files changed, 82 insertions(+), 23 deletions(-) diff --git a/changelog.md b/changelog.md index 6c89002..be81a44 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-02 - 5.3.1 - fix(mail) +add periodic cleanup timers and proper shutdown handling for bounce manager and delivery queue; avoid mutating maps during iteration and prune stale rate-limiter stats to prevent memory growth + +- BounceManager: add cleanupInterval to periodically remove bounce records older than 7 days and log removals; add stop() to clear the interval and prevent leaks +- UnifiedDeliveryQueue: introduce cleanupTimer started in startProcessing() and cleared in stopProcessing(); cleanupOldItems now collects IDs first to avoid mutating the Map while iterating and logs cleaned items; shutdown now relies on stopProcessing to clear timers +- UnifiedRateLimiter: prune stale stats.byIp and stats.byPattern entries for IPs/patterns that no longer have active counters or blocks to reduce memory usage and keep stats accurate +- Auto-cleanup tasks log errors rather than throwing to avoid crashing processing loops + ## 2026-02-26 - 5.3.0 - feat(mailer-bin) use mimalloc as the global allocator for mailer-bin diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 19ff7aa..fa0f489 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartmta', - version: '5.3.0', + version: '5.3.1', description: 'A high-performance, enterprise-grade Mail Transfer Agent (MTA) built from scratch in TypeScript with Rust acceleration.' } diff --git a/ts/mail/core/classes.bouncemanager.ts b/ts/mail/core/classes.bouncemanager.ts index a94e4ac..b2860b3 100644 --- a/ts/mail/core/classes.bouncemanager.ts +++ b/ts/mail/core/classes.bouncemanager.ts @@ -88,7 +88,10 @@ export class BounceManager { // Store of bounced emails private bounceStore: BounceRecord[] = []; - + + // Periodic cleanup timer for old bounce records + private cleanupInterval?: NodeJS.Timeout; + // Cache of recently bounced email addresses to avoid sending to known bad addresses private bounceCache: LRUCache { logger.log('error', `Failed to load suppression list on startup: ${error.message}`); }); + + // Start periodic cleanup of old bounce records (every 1 hour, removes records older than 7 days) + this.cleanupInterval = setInterval(() => { + const sevenDaysAgo = Date.now() - 7 * 24 * 60 * 60 * 1000; + const removed = this.clearOldBounceRecords(sevenDaysAgo); + if (removed > 0) { + logger.log('info', `Auto-cleanup removed ${removed} old bounce records`); + } + }, 60 * 60 * 1000); } /** @@ -717,7 +729,7 @@ export class BounceManager { */ public clearOldBounceRecords(olderThan: number): number { let removed = 0; - + this.bounceStore = this.bounceStore.filter(bounce => { if (bounce.timestamp < olderThan) { removed++; @@ -725,7 +737,17 @@ export class BounceManager { } return true; }); - + return removed; } + + /** + * Stop the bounce manager and clear cleanup timers + */ + public stop(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = undefined; + } + } } \ No newline at end of file diff --git a/ts/mail/delivery/classes.delivery.queue.ts b/ts/mail/delivery/classes.delivery.queue.ts index 5728a8f..6b86c89 100644 --- a/ts/mail/delivery/classes.delivery.queue.ts +++ b/ts/mail/delivery/classes.delivery.queue.ts @@ -78,6 +78,7 @@ export class UnifiedDeliveryQueue extends EventEmitter { private options: Required; private queue: Map = new Map(); private checkTimer?: NodeJS.Timeout; + private cleanupTimer?: NodeJS.Timeout; private stats: IQueueStats; private processing: boolean = false; private totalProcessed: number = 0; @@ -158,8 +159,19 @@ export class UnifiedDeliveryQueue extends EventEmitter { if (this.checkTimer) { clearInterval(this.checkTimer); } - + this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval); + + // Start periodic cleanup of delivered/failed items (every 30 minutes) + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + } + this.cleanupTimer = setInterval(() => { + this.cleanupOldItems(24 * 60 * 60 * 1000).catch((err) => { + logger.log('error', `Auto-cleanup failed: ${err.message}`); + }); + }, 30 * 60 * 1000); + this.processing = true; this.stats.processingActive = true; this.emit('processingStarted'); @@ -174,7 +186,11 @@ export class UnifiedDeliveryQueue extends EventEmitter { clearInterval(this.checkTimer); this.checkTimer = undefined; } - + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + this.cleanupTimer = undefined; + } + this.processing = false; this.stats.processingActive = false; this.emit('processingStopped'); @@ -590,19 +606,24 @@ export class UnifiedDeliveryQueue extends EventEmitter { */ public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise { const cutoff = new Date(Date.now() - maxAge); - let removedCount = 0; - - // Find old items + + // Collect IDs first to avoid modifying the Map during iteration + const idsToRemove: string[] = []; for (const item of this.queue.values()) { if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) { - // Remove item - await this.removeItem(item.id); - removedCount++; + idsToRemove.push(item.id); } } - - logger.log('info', `Cleaned up ${removedCount} old items`); - return removedCount; + + // Remove collected items + for (const id of idsToRemove) { + await this.removeItem(id); + } + + if (idsToRemove.length > 0) { + logger.log('info', `Cleaned up ${idsToRemove.length} old items from delivery queue`); + } + return idsToRemove.length; } /** @@ -611,15 +632,9 @@ export class UnifiedDeliveryQueue extends EventEmitter { public async shutdown(): Promise { logger.log('info', 'Shutting down UnifiedDeliveryQueue'); - // Stop processing + // Stop processing (clears both check and cleanup timers) this.stopProcessing(); - // Clear the check timer to prevent memory leaks - if (this.checkTimer) { - clearInterval(this.checkTimer); - this.checkTimer = undefined; - } - // If using disk storage, make sure all items are persisted if (this.options.storageType === 'disk') { const pendingWrites: Promise[] = []; diff --git a/ts/mail/delivery/classes.unified.rate.limiter.ts b/ts/mail/delivery/classes.unified.rate.limiter.ts index 5ea8385..a637f0b 100644 --- a/ts/mail/delivery/classes.unified.rate.limiter.ts +++ b/ts/mail/delivery/classes.unified.rate.limiter.ts @@ -231,7 +231,21 @@ export class UnifiedRateLimiter extends EventEmitter { this.domainCounters.delete(key); } } - + + // Clean stale stats.byIp entries for IPs that no longer have active counters or blocks + for (const ip of Object.keys(this.stats.byIp)) { + if (!this.ipCounters.has(ip) && !(this.config.blocks && ip in this.config.blocks)) { + delete this.stats.byIp[ip]; + } + } + + // Clean stale stats.byPattern entries for patterns that no longer have active counters + for (const pattern of Object.keys(this.stats.byPattern)) { + if (!this.patternCounters.has(pattern)) { + delete this.stats.byPattern[pattern]; + } + } + // Update statistics this.updateStats(); }