Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 04e73c366c | |||
| 8851d61466 | |||
| b465b01790 | |||
| 6ed3252485 | |||
| fc88555790 | |||
| 4eb2fe7934 |
23
changelog.md
23
changelog.md
@@ -1,5 +1,28 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-02 - 5.3.1 - fix(mail)
|
||||||
|
add periodic cleanup timers and proper shutdown handling for bounce manager and delivery queue; avoid mutating maps during iteration and prune stale rate-limiter stats to prevent memory growth
|
||||||
|
|
||||||
|
- BounceManager: add cleanupInterval to periodically remove bounce records older than 7 days and log removals; add stop() to clear the interval and prevent leaks
|
||||||
|
- UnifiedDeliveryQueue: introduce cleanupTimer started in startProcessing() and cleared in stopProcessing(); cleanupOldItems now collects IDs first to avoid mutating the Map while iterating and logs cleaned items; shutdown now relies on stopProcessing to clear timers
|
||||||
|
- UnifiedRateLimiter: prune stale stats.byIp and stats.byPattern entries for IPs/patterns that no longer have active counters or blocks to reduce memory usage and keep stats accurate
|
||||||
|
- Auto-cleanup tasks log errors rather than throwing to avoid crashing processing loops
|
||||||
|
|
||||||
|
## 2026-02-26 - 5.3.0 - feat(mailer-bin)
|
||||||
|
use mimalloc as the global allocator for mailer-bin
|
||||||
|
|
||||||
|
- Add mimalloc dependency to workspace Cargo.toml
|
||||||
|
- Enable workspace mimalloc in rust/crates/mailer-bin/Cargo.toml
|
||||||
|
- Register mimalloc as the #[global_allocator] in mailer-bin/src/main.rs
|
||||||
|
- Update Cargo.lock with new mimalloc and libmimalloc-sys entries
|
||||||
|
|
||||||
|
## 2026-02-26 - 5.2.6 - fix(postinstall)
|
||||||
|
remove legacy postinstall binary installer and packaging entry
|
||||||
|
|
||||||
|
- Deleted scripts/install-binary.js (legacy postinstall script that downloaded platform-specific binaries).
|
||||||
|
- Removed reference to scripts/install-binary.js from package.json "files" array so the installer is no longer included in published packages.
|
||||||
|
- This prevents automatic binary downloads during npm install and reduces package size; recommend a patch version bump.
|
||||||
|
|
||||||
## 2026-02-26 - 5.2.5 - fix(package)
|
## 2026-02-26 - 5.2.5 - fix(package)
|
||||||
remove CLI bin wrapper and exclude bin/ from published files
|
remove CLI bin wrapper and exclude bin/ from published files
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartmta",
|
"name": "@push.rocks/smartmta",
|
||||||
"version": "5.2.5",
|
"version": "5.3.1",
|
||||||
"description": "A high-performance, enterprise-grade Mail Transfer Agent (MTA) built from scratch in TypeScript with Rust acceleration.",
|
"description": "A high-performance, enterprise-grade Mail Transfer Agent (MTA) built from scratch in TypeScript with Rust acceleration.",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"mta",
|
"mta",
|
||||||
@@ -58,7 +58,6 @@
|
|||||||
"files": [
|
"files": [
|
||||||
"ts/**/*",
|
"ts/**/*",
|
||||||
"dist_ts/**/*",
|
"dist_ts/**/*",
|
||||||
"scripts/install-binary.js",
|
|
||||||
"dist_rust/**/*",
|
"dist_rust/**/*",
|
||||||
"readme.md",
|
"readme.md",
|
||||||
"license",
|
"license",
|
||||||
|
|||||||
20
rust/Cargo.lock
generated
20
rust/Cargo.lock
generated
@@ -894,6 +894,16 @@ version = "0.2.181"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5"
|
checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libmimalloc-sys"
|
||||||
|
version = "0.1.44"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "linux-raw-sys"
|
name = "linux-raw-sys"
|
||||||
version = "0.11.0"
|
version = "0.11.0"
|
||||||
@@ -982,6 +992,7 @@ dependencies = [
|
|||||||
"mailer-core",
|
"mailer-core",
|
||||||
"mailer-security",
|
"mailer-security",
|
||||||
"mailer-smtp",
|
"mailer-smtp",
|
||||||
|
"mimalloc",
|
||||||
"rustls",
|
"rustls",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -1063,6 +1074,15 @@ version = "2.8.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
|
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mimalloc"
|
||||||
|
version = "0.1.48"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8"
|
||||||
|
dependencies = [
|
||||||
|
"libmimalloc-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "miniz_oxide"
|
name = "miniz_oxide"
|
||||||
version = "0.8.9"
|
version = "0.8.9"
|
||||||
|
|||||||
@@ -32,3 +32,4 @@ clap = { version = "4", features = ["derive"] }
|
|||||||
sha2 = "0.10"
|
sha2 = "0.10"
|
||||||
hmac = "0.12"
|
hmac = "0.12"
|
||||||
pbkdf2 = { version = "0.12", default-features = false }
|
pbkdf2 = { version = "0.12", default-features = false }
|
||||||
|
mimalloc = "0.1"
|
||||||
|
|||||||
@@ -22,3 +22,4 @@ dashmap.workspace = true
|
|||||||
base64.workspace = true
|
base64.workspace = true
|
||||||
uuid.workspace = true
|
uuid.workspace = true
|
||||||
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
|
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
|
||||||
|
mimalloc = { workspace = true }
|
||||||
|
|||||||
@@ -5,6 +5,9 @@
|
|||||||
//! 2. **Management mode** (`--management`) — JSON-over-stdin/stdout IPC for
|
//! 2. **Management mode** (`--management`) — JSON-over-stdin/stdout IPC for
|
||||||
//! integration with `@push.rocks/smartrust` from TypeScript
|
//! integration with `@push.rocks/smartrust` from TypeScript
|
||||||
|
|
||||||
|
#[global_allocator]
|
||||||
|
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|||||||
@@ -1,230 +0,0 @@
|
|||||||
#!/usr/bin/env node
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MAILER npm postinstall script
|
|
||||||
* Downloads the appropriate binary for the current platform from GitHub releases
|
|
||||||
*/
|
|
||||||
|
|
||||||
import { platform, arch } from 'os';
|
|
||||||
import { existsSync, mkdirSync, writeFileSync, chmodSync, unlinkSync } from 'fs';
|
|
||||||
import { join, dirname } from 'path';
|
|
||||||
import { fileURLToPath } from 'url';
|
|
||||||
import https from 'https';
|
|
||||||
import { pipeline } from 'stream';
|
|
||||||
import { promisify } from 'util';
|
|
||||||
import { createWriteStream } from 'fs';
|
|
||||||
|
|
||||||
const __filename = fileURLToPath(import.meta.url);
|
|
||||||
const __dirname = dirname(__filename);
|
|
||||||
const streamPipeline = promisify(pipeline);
|
|
||||||
|
|
||||||
// Configuration
|
|
||||||
const REPO_BASE = 'https://code.foss.global/serve.zone/mailer';
|
|
||||||
const VERSION = process.env.npm_package_version || '1.0.0';
|
|
||||||
|
|
||||||
function getBinaryInfo() {
|
|
||||||
const plat = platform();
|
|
||||||
const architecture = arch();
|
|
||||||
|
|
||||||
const platformMap = {
|
|
||||||
'darwin': 'macos',
|
|
||||||
'linux': 'linux',
|
|
||||||
'win32': 'windows'
|
|
||||||
};
|
|
||||||
|
|
||||||
const archMap = {
|
|
||||||
'x64': 'x64',
|
|
||||||
'arm64': 'arm64'
|
|
||||||
};
|
|
||||||
|
|
||||||
const mappedPlatform = platformMap[plat];
|
|
||||||
const mappedArch = archMap[architecture];
|
|
||||||
|
|
||||||
if (!mappedPlatform || !mappedArch) {
|
|
||||||
return { supported: false, platform: plat, arch: architecture };
|
|
||||||
}
|
|
||||||
|
|
||||||
let binaryName = `mailer-${mappedPlatform}-${mappedArch}`;
|
|
||||||
if (plat === 'win32') {
|
|
||||||
binaryName += '.exe';
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
supported: true,
|
|
||||||
platform: mappedPlatform,
|
|
||||||
arch: mappedArch,
|
|
||||||
binaryName,
|
|
||||||
originalPlatform: plat
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
function downloadFile(url, destination) {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
console.log(`Downloading from: ${url}`);
|
|
||||||
|
|
||||||
// Follow redirects
|
|
||||||
const download = (url, redirectCount = 0) => {
|
|
||||||
if (redirectCount > 5) {
|
|
||||||
reject(new Error('Too many redirects'));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
https.get(url, (response) => {
|
|
||||||
if (response.statusCode === 301 || response.statusCode === 302) {
|
|
||||||
console.log(`Following redirect to: ${response.headers.location}`);
|
|
||||||
download(response.headers.location, redirectCount + 1);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (response.statusCode !== 200) {
|
|
||||||
reject(new Error(`Failed to download: ${response.statusCode} ${response.statusMessage}`));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const totalSize = parseInt(response.headers['content-length'], 10);
|
|
||||||
let downloadedSize = 0;
|
|
||||||
let lastProgress = 0;
|
|
||||||
|
|
||||||
response.on('data', (chunk) => {
|
|
||||||
downloadedSize += chunk.length;
|
|
||||||
const progress = Math.round((downloadedSize / totalSize) * 100);
|
|
||||||
|
|
||||||
// Only log every 10% to reduce noise
|
|
||||||
if (progress >= lastProgress + 10) {
|
|
||||||
console.log(`Download progress: ${progress}%`);
|
|
||||||
lastProgress = progress;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
const file = createWriteStream(destination);
|
|
||||||
|
|
||||||
pipeline(response, file, (err) => {
|
|
||||||
if (err) {
|
|
||||||
reject(err);
|
|
||||||
} else {
|
|
||||||
console.log('Download complete!');
|
|
||||||
resolve();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}).on('error', reject);
|
|
||||||
};
|
|
||||||
|
|
||||||
download(url);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async function main() {
|
|
||||||
console.log('===========================================');
|
|
||||||
console.log(' MAILER - Binary Installation');
|
|
||||||
console.log('===========================================');
|
|
||||||
console.log('');
|
|
||||||
|
|
||||||
const binaryInfo = getBinaryInfo();
|
|
||||||
|
|
||||||
if (!binaryInfo.supported) {
|
|
||||||
console.error(`❌ Error: Unsupported platform/architecture: ${binaryInfo.platform}/${binaryInfo.arch}`);
|
|
||||||
console.error('');
|
|
||||||
console.error('Supported platforms:');
|
|
||||||
console.error(' • Linux (x64, arm64)');
|
|
||||||
console.error(' • macOS (x64, arm64)');
|
|
||||||
console.error(' • Windows (x64)');
|
|
||||||
console.error('');
|
|
||||||
console.error('If you believe your platform should be supported, please file an issue:');
|
|
||||||
console.error(' https://code.foss.global/serve.zone/mailer/issues');
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log(`Platform: ${binaryInfo.platform} (${binaryInfo.originalPlatform})`);
|
|
||||||
console.log(`Architecture: ${binaryInfo.arch}`);
|
|
||||||
console.log(`Binary: ${binaryInfo.binaryName}`);
|
|
||||||
console.log(`Version: ${VERSION}`);
|
|
||||||
console.log('');
|
|
||||||
|
|
||||||
// Create dist/binaries directory if it doesn't exist
|
|
||||||
const binariesDir = join(__dirname, '..', 'dist', 'binaries');
|
|
||||||
if (!existsSync(binariesDir)) {
|
|
||||||
console.log('Creating binaries directory...');
|
|
||||||
mkdirSync(binariesDir, { recursive: true });
|
|
||||||
}
|
|
||||||
|
|
||||||
const binaryPath = join(binariesDir, binaryInfo.binaryName);
|
|
||||||
|
|
||||||
// Check if binary already exists and skip download
|
|
||||||
if (existsSync(binaryPath)) {
|
|
||||||
console.log('✓ Binary already exists, skipping download');
|
|
||||||
} else {
|
|
||||||
// Construct download URL
|
|
||||||
// Try release URL first, fall back to raw branch if needed
|
|
||||||
const releaseUrl = `${REPO_BASE}/releases/download/v${VERSION}/${binaryInfo.binaryName}`;
|
|
||||||
const fallbackUrl = `${REPO_BASE}/raw/branch/main/dist/binaries/${binaryInfo.binaryName}`;
|
|
||||||
|
|
||||||
console.log('Downloading platform-specific binary...');
|
|
||||||
console.log('This may take a moment depending on your connection speed.');
|
|
||||||
console.log('');
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Try downloading from release
|
|
||||||
await downloadFile(releaseUrl, binaryPath);
|
|
||||||
} catch (err) {
|
|
||||||
console.log(`Release download failed: ${err.message}`);
|
|
||||||
console.log('Trying fallback URL...');
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Try fallback URL
|
|
||||||
await downloadFile(fallbackUrl, binaryPath);
|
|
||||||
} catch (fallbackErr) {
|
|
||||||
console.error(`❌ Error: Failed to download binary`);
|
|
||||||
console.error(` Primary URL: ${releaseUrl}`);
|
|
||||||
console.error(` Fallback URL: ${fallbackUrl}`);
|
|
||||||
console.error('');
|
|
||||||
console.error('This might be because:');
|
|
||||||
console.error('1. The release has not been created yet');
|
|
||||||
console.error('2. Network connectivity issues');
|
|
||||||
console.error('3. The version specified does not exist');
|
|
||||||
console.error('');
|
|
||||||
console.error('You can try:');
|
|
||||||
console.error('1. Installing from source: https://code.foss.global/serve.zone/mailer');
|
|
||||||
console.error('2. Downloading the binary manually from the releases page');
|
|
||||||
|
|
||||||
// Clean up partial download
|
|
||||||
if (existsSync(binaryPath)) {
|
|
||||||
unlinkSync(binaryPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log(`✓ Binary downloaded successfully`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// On Unix-like systems, ensure the binary is executable
|
|
||||||
if (binaryInfo.originalPlatform !== 'win32') {
|
|
||||||
try {
|
|
||||||
console.log('Setting executable permissions...');
|
|
||||||
chmodSync(binaryPath, 0o755);
|
|
||||||
console.log('✓ Binary permissions updated');
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`⚠️ Warning: Could not set executable permissions: ${err.message}`);
|
|
||||||
console.error(' You may need to manually run:');
|
|
||||||
console.error(` chmod +x ${binaryPath}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log('');
|
|
||||||
console.log('✅ MAILER installation completed successfully!');
|
|
||||||
console.log('');
|
|
||||||
console.log('You can now use MAILER by running:');
|
|
||||||
console.log(' mailer --help');
|
|
||||||
console.log('');
|
|
||||||
console.log('For initial setup, run:');
|
|
||||||
console.log(' sudo mailer service enable');
|
|
||||||
console.log('');
|
|
||||||
console.log('===========================================');
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run the installation
|
|
||||||
main().catch(err => {
|
|
||||||
console.error(`❌ Installation failed: ${err.message}`);
|
|
||||||
process.exit(1);
|
|
||||||
});
|
|
||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartmta',
|
name: '@push.rocks/smartmta',
|
||||||
version: '5.2.5',
|
version: '5.3.1',
|
||||||
description: 'A high-performance, enterprise-grade Mail Transfer Agent (MTA) built from scratch in TypeScript with Rust acceleration.'
|
description: 'A high-performance, enterprise-grade Mail Transfer Agent (MTA) built from scratch in TypeScript with Rust acceleration.'
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -89,6 +89,9 @@ export class BounceManager {
|
|||||||
// Store of bounced emails
|
// Store of bounced emails
|
||||||
private bounceStore: BounceRecord[] = [];
|
private bounceStore: BounceRecord[] = [];
|
||||||
|
|
||||||
|
// Periodic cleanup timer for old bounce records
|
||||||
|
private cleanupInterval?: NodeJS.Timeout;
|
||||||
|
|
||||||
// Cache of recently bounced email addresses to avoid sending to known bad addresses
|
// Cache of recently bounced email addresses to avoid sending to known bad addresses
|
||||||
private bounceCache: LRUCache<string, {
|
private bounceCache: LRUCache<string, {
|
||||||
lastBounce: number;
|
lastBounce: number;
|
||||||
@@ -135,6 +138,15 @@ export class BounceManager {
|
|||||||
this.loadSuppressionList().catch(error => {
|
this.loadSuppressionList().catch(error => {
|
||||||
logger.log('error', `Failed to load suppression list on startup: ${error.message}`);
|
logger.log('error', `Failed to load suppression list on startup: ${error.message}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Start periodic cleanup of old bounce records (every 1 hour, removes records older than 7 days)
|
||||||
|
this.cleanupInterval = setInterval(() => {
|
||||||
|
const sevenDaysAgo = Date.now() - 7 * 24 * 60 * 60 * 1000;
|
||||||
|
const removed = this.clearOldBounceRecords(sevenDaysAgo);
|
||||||
|
if (removed > 0) {
|
||||||
|
logger.log('info', `Auto-cleanup removed ${removed} old bounce records`);
|
||||||
|
}
|
||||||
|
}, 60 * 60 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -728,4 +740,14 @@ export class BounceManager {
|
|||||||
|
|
||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the bounce manager and clear cleanup timers
|
||||||
|
*/
|
||||||
|
public stop(): void {
|
||||||
|
if (this.cleanupInterval) {
|
||||||
|
clearInterval(this.cleanupInterval);
|
||||||
|
this.cleanupInterval = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -78,6 +78,7 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
|||||||
private options: Required<IQueueOptions>;
|
private options: Required<IQueueOptions>;
|
||||||
private queue: Map<string, IQueueItem> = new Map();
|
private queue: Map<string, IQueueItem> = new Map();
|
||||||
private checkTimer?: NodeJS.Timeout;
|
private checkTimer?: NodeJS.Timeout;
|
||||||
|
private cleanupTimer?: NodeJS.Timeout;
|
||||||
private stats: IQueueStats;
|
private stats: IQueueStats;
|
||||||
private processing: boolean = false;
|
private processing: boolean = false;
|
||||||
private totalProcessed: number = 0;
|
private totalProcessed: number = 0;
|
||||||
@@ -160,6 +161,17 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval);
|
this.checkTimer = setInterval(() => this.processQueue(), this.options.checkInterval);
|
||||||
|
|
||||||
|
// Start periodic cleanup of delivered/failed items (every 30 minutes)
|
||||||
|
if (this.cleanupTimer) {
|
||||||
|
clearInterval(this.cleanupTimer);
|
||||||
|
}
|
||||||
|
this.cleanupTimer = setInterval(() => {
|
||||||
|
this.cleanupOldItems(24 * 60 * 60 * 1000).catch((err) => {
|
||||||
|
logger.log('error', `Auto-cleanup failed: ${err.message}`);
|
||||||
|
});
|
||||||
|
}, 30 * 60 * 1000);
|
||||||
|
|
||||||
this.processing = true;
|
this.processing = true;
|
||||||
this.stats.processingActive = true;
|
this.stats.processingActive = true;
|
||||||
this.emit('processingStarted');
|
this.emit('processingStarted');
|
||||||
@@ -174,6 +186,10 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
|||||||
clearInterval(this.checkTimer);
|
clearInterval(this.checkTimer);
|
||||||
this.checkTimer = undefined;
|
this.checkTimer = undefined;
|
||||||
}
|
}
|
||||||
|
if (this.cleanupTimer) {
|
||||||
|
clearInterval(this.cleanupTimer);
|
||||||
|
this.cleanupTimer = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
this.processing = false;
|
this.processing = false;
|
||||||
this.stats.processingActive = false;
|
this.stats.processingActive = false;
|
||||||
@@ -590,19 +606,24 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
|||||||
*/
|
*/
|
||||||
public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise<number> {
|
public async cleanupOldItems(maxAge: number = 7 * 24 * 60 * 60 * 1000): Promise<number> {
|
||||||
const cutoff = new Date(Date.now() - maxAge);
|
const cutoff = new Date(Date.now() - maxAge);
|
||||||
let removedCount = 0;
|
|
||||||
|
|
||||||
// Find old items
|
// Collect IDs first to avoid modifying the Map during iteration
|
||||||
|
const idsToRemove: string[] = [];
|
||||||
for (const item of this.queue.values()) {
|
for (const item of this.queue.values()) {
|
||||||
if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) {
|
if (['delivered', 'failed'].includes(item.status) && item.updatedAt < cutoff) {
|
||||||
// Remove item
|
idsToRemove.push(item.id);
|
||||||
await this.removeItem(item.id);
|
|
||||||
removedCount++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.log('info', `Cleaned up ${removedCount} old items`);
|
// Remove collected items
|
||||||
return removedCount;
|
for (const id of idsToRemove) {
|
||||||
|
await this.removeItem(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (idsToRemove.length > 0) {
|
||||||
|
logger.log('info', `Cleaned up ${idsToRemove.length} old items from delivery queue`);
|
||||||
|
}
|
||||||
|
return idsToRemove.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -611,15 +632,9 @@ export class UnifiedDeliveryQueue extends EventEmitter {
|
|||||||
public async shutdown(): Promise<void> {
|
public async shutdown(): Promise<void> {
|
||||||
logger.log('info', 'Shutting down UnifiedDeliveryQueue');
|
logger.log('info', 'Shutting down UnifiedDeliveryQueue');
|
||||||
|
|
||||||
// Stop processing
|
// Stop processing (clears both check and cleanup timers)
|
||||||
this.stopProcessing();
|
this.stopProcessing();
|
||||||
|
|
||||||
// Clear the check timer to prevent memory leaks
|
|
||||||
if (this.checkTimer) {
|
|
||||||
clearInterval(this.checkTimer);
|
|
||||||
this.checkTimer = undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If using disk storage, make sure all items are persisted
|
// If using disk storage, make sure all items are persisted
|
||||||
if (this.options.storageType === 'disk') {
|
if (this.options.storageType === 'disk') {
|
||||||
const pendingWrites: Promise<void>[] = [];
|
const pendingWrites: Promise<void>[] = [];
|
||||||
|
|||||||
@@ -232,6 +232,20 @@ export class UnifiedRateLimiter extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean stale stats.byIp entries for IPs that no longer have active counters or blocks
|
||||||
|
for (const ip of Object.keys(this.stats.byIp)) {
|
||||||
|
if (!this.ipCounters.has(ip) && !(this.config.blocks && ip in this.config.blocks)) {
|
||||||
|
delete this.stats.byIp[ip];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean stale stats.byPattern entries for patterns that no longer have active counters
|
||||||
|
for (const pattern of Object.keys(this.stats.byPattern)) {
|
||||||
|
if (!this.patternCounters.has(pattern)) {
|
||||||
|
delete this.stats.byPattern[pattern];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update statistics
|
// Update statistics
|
||||||
this.updateStats();
|
this.updateStats();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user