dcrouter/test/suite/smtpclient_reliability/test.crel-03.queue-persistence.ts

560 lines
19 KiB
TypeScript
Raw Normal View History

2025-05-24 18:12:08 +00:00
import { test } from '@git.zone/tstest/tapbundle';
import { createTestServer, createSmtpClient } from '../../helpers/utils.js';
import { Email } from '../../../ts/mail/core/classes.email.js';
import * as fs from 'fs';
import * as path from 'path';
test('CREL-03: Queue Persistence Reliability Tests', async () => {
console.log('\n💾 Testing SMTP Client Queue Persistence Reliability');
console.log('=' .repeat(60));
const tempDir = path.join(process.cwd(), '.nogit', 'test-queue-persistence');
// Ensure test directory exists
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true });
}
// Scenario 1: Queue State Persistence Across Restarts
await test.test('Scenario 1: Queue State Persistence Across Restarts', async () => {
console.log('\n🔄 Testing queue state persistence across client restarts...');
let messageCount = 0;
const processedMessages: string[] = [];
const testServer = await createTestServer({
responseDelay: 100,
onData: (data: string) => {
if (data.includes('Message-ID:')) {
const messageIdMatch = data.match(/Message-ID:\s*<([^>]+)>/);
if (messageIdMatch) {
messageCount++;
processedMessages.push(messageIdMatch[1]);
console.log(` [Server] Processed message ${messageCount}: ${messageIdMatch[1]}`);
}
}
}
});
try {
console.log(' Phase 1: Creating first client instance with queue...');
const queueFile = path.join(tempDir, 'test-queue-1.json');
// Remove any existing queue file
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
const smtpClient1 = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1,
maxMessages: 100,
// Queue persistence settings
queuePath: queueFile,
persistQueue: true,
retryDelay: 200,
retries: 3
});
console.log(' Creating emails for persistence test...');
const emails = [];
for (let i = 0; i < 6; i++) {
emails.push(new Email({
from: 'sender@persistence.test',
to: [`recipient${i}@persistence.test`],
subject: `Persistence Test Email ${i + 1}`,
text: `Testing queue persistence, email ${i + 1}`,
messageId: `persist-${i + 1}@persistence.test`
}));
}
console.log(' Sending emails to build up queue...');
const sendPromises = emails.map((email, index) => {
return smtpClient1.sendMail(email).then(result => {
console.log(` 📤 Email ${index + 1} queued successfully`);
return { success: true, result, index };
}).catch(error => {
console.log(` ❌ Email ${index + 1} failed: ${error.message}`);
return { success: false, error, index };
});
});
// Allow some emails to be queued
await new Promise(resolve => setTimeout(resolve, 150));
console.log(' Phase 2: Simulating client restart by closing first instance...');
smtpClient1.close();
// Wait for queue file to be written
await new Promise(resolve => setTimeout(resolve, 300));
console.log(' Checking queue persistence file...');
const queueExists = fs.existsSync(queueFile);
console.log(` Queue file exists: ${queueExists}`);
if (queueExists) {
const queueData = fs.readFileSync(queueFile, 'utf8');
console.log(` Queue file size: ${queueData.length} bytes`);
try {
const parsedQueue = JSON.parse(queueData);
console.log(` Persisted queue items: ${Array.isArray(parsedQueue) ? parsedQueue.length : 'Unknown format'}`);
} catch (parseError) {
console.log(` Queue file parse error: ${parseError.message}`);
}
}
console.log(' Phase 3: Creating second client instance to resume queue...');
const smtpClient2 = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1,
maxMessages: 100,
queuePath: queueFile,
persistQueue: true,
resumeQueue: true, // Resume from persisted queue
retryDelay: 200,
retries: 3
});
console.log(' Waiting for queue resumption and processing...');
await new Promise(resolve => setTimeout(resolve, 1000));
// Try to resolve original promises or create new ones for remaining emails
try {
await Promise.allSettled(sendPromises);
} catch (error) {
console.log(` Send promises resolution: ${error.message}`);
}
console.log(' Phase 4: Verifying queue recovery results...');
console.log(` Total messages processed by server: ${messageCount}`);
console.log(` Processed message IDs: ${processedMessages.join(', ')}`);
console.log(` Expected emails: ${emails.length}`);
console.log(` Queue persistence success: ${messageCount >= emails.length - 2 ? 'Good' : 'Partial'}`);
smtpClient2.close();
// Cleanup
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
} finally {
testServer.close();
}
});
// Scenario 2: Queue Corruption Recovery
await test.test('Scenario 2: Queue Corruption Recovery', async () => {
console.log('\n🛠 Testing queue corruption recovery mechanisms...');
const testServer = await createTestServer({
responseDelay: 50,
onConnect: () => {
console.log(' [Server] Connection established for corruption test');
}
});
try {
const queueFile = path.join(tempDir, 'corrupted-queue.json');
console.log(' Creating corrupted queue file...');
// Create a corrupted JSON file
fs.writeFileSync(queueFile, '{"invalid": json, "missing_bracket": true');
console.log(' Corrupted queue file created');
console.log(' Testing client behavior with corrupted queue...');
const smtpClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1,
queuePath: queueFile,
persistQueue: true,
resumeQueue: true,
corruptionRecovery: true // Enable corruption recovery
});
const email = new Email({
from: 'sender@corruption.test',
to: ['recipient@corruption.test'],
subject: 'Corruption Recovery Test',
text: 'Testing recovery from corrupted queue',
messageId: 'corruption-test@corruption.test'
});
console.log(' Sending email with corrupted queue present...');
try {
const result = await smtpClient.sendMail(email);
console.log(' ✓ Email sent successfully despite corrupted queue');
console.log(` Message ID: ${result.messageId}`);
} catch (error) {
console.log(' ✗ Email failed to send');
console.log(` Error: ${error.message}`);
}
console.log(' Checking queue file after corruption recovery...');
if (fs.existsSync(queueFile)) {
try {
const recoveredData = fs.readFileSync(queueFile, 'utf8');
JSON.parse(recoveredData); // Try to parse
console.log(' ✓ Queue file recovered and is valid JSON');
} catch (parseError) {
console.log(' ⚠️ Queue file still corrupted or replaced');
}
} else {
console.log(' Corrupted queue file was removed/replaced');
}
smtpClient.close();
// Cleanup
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
} finally {
testServer.close();
}
});
// Scenario 3: Queue Size Limits and Rotation
await test.test('Scenario 3: Queue Size Limits and Rotation', async () => {
console.log('\n📏 Testing queue size limits and rotation...');
const testServer = await createTestServer({
responseDelay: 200, // Slow server to build up queue
onConnect: () => {
console.log(' [Server] Slow connection established');
}
});
try {
const queueFile = path.join(tempDir, 'size-limit-queue.json');
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
console.log(' Creating client with queue size limits...');
const smtpClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1,
maxMessages: 5,
queuePath: queueFile,
persistQueue: true,
maxQueueSize: 1024, // 1KB queue size limit
queueRotation: true
});
console.log(' Creating many emails to test queue limits...');
const emails = [];
for (let i = 0; i < 15; i++) {
emails.push(new Email({
from: 'sender@sizelimit.test',
to: [`recipient${i}@sizelimit.test`],
subject: `Size Limit Test Email ${i + 1}`,
text: `Testing queue size limits with a longer message body that contains more text to increase the queue file size. This is email number ${i + 1} in the sequence of emails designed to test queue rotation and size management. Adding more content here to make the queue file larger.`,
messageId: `sizelimit-${i + 1}@sizelimit.test`
}));
}
let successCount = 0;
let rejectCount = 0;
console.log(' Sending emails rapidly to test queue limits...');
for (let i = 0; i < emails.length; i++) {
try {
const promise = smtpClient.sendMail(emails[i]);
console.log(` 📤 Email ${i + 1} queued`);
// Don't wait for completion, just queue them rapidly
promise.then(() => {
successCount++;
console.log(` ✓ Email ${i + 1} sent successfully`);
}).catch((error) => {
rejectCount++;
console.log(` ❌ Email ${i + 1} rejected: ${error.message}`);
});
} catch (error) {
rejectCount++;
console.log(` ❌ Email ${i + 1} immediate rejection: ${error.message}`);
}
// Check queue file size periodically
if (i % 5 === 0 && fs.existsSync(queueFile)) {
const stats = fs.statSync(queueFile);
console.log(` Queue file size: ${stats.size} bytes`);
}
await new Promise(resolve => setTimeout(resolve, 20));
}
console.log(' Waiting for queue processing to complete...');
await new Promise(resolve => setTimeout(resolve, 2000));
// Final queue file check
if (fs.existsSync(queueFile)) {
const finalStats = fs.statSync(queueFile);
console.log(` Final queue file size: ${finalStats.size} bytes`);
console.log(` Size limit respected: ${finalStats.size <= 1024 ? 'Yes' : 'No'}`);
}
console.log(` Success count: ${successCount}`);
console.log(` Reject count: ${rejectCount}`);
console.log(` Total processed: ${successCount + rejectCount}/${emails.length}`);
console.log(` Queue management: ${rejectCount > 0 ? 'Enforced limits' : 'No limits hit'}`);
smtpClient.close();
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
} finally {
testServer.close();
}
});
// Scenario 4: Concurrent Queue Access Safety
await test.test('Scenario 4: Concurrent Queue Access Safety', async () => {
console.log('\n🔒 Testing concurrent queue access safety...');
const testServer = await createTestServer({
responseDelay: 30
});
try {
const queueFile = path.join(tempDir, 'concurrent-queue.json');
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
console.log(' Creating multiple client instances sharing same queue file...');
const clients = [];
for (let i = 0; i < 3; i++) {
clients.push(createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1,
queuePath: queueFile,
persistQueue: true,
queueLocking: true, // Enable file locking
lockTimeout: 1000
}));
}
console.log(' Creating emails for concurrent access test...');
const allEmails = [];
for (let clientIndex = 0; clientIndex < clients.length; clientIndex++) {
for (let emailIndex = 0; emailIndex < 4; emailIndex++) {
allEmails.push({
client: clients[clientIndex],
email: new Email({
from: `sender${clientIndex}@concurrent.test`,
to: [`recipient${clientIndex}-${emailIndex}@concurrent.test`],
subject: `Concurrent Test Client ${clientIndex + 1} Email ${emailIndex + 1}`,
text: `Testing concurrent queue access from client ${clientIndex + 1}`,
messageId: `concurrent-${clientIndex}-${emailIndex}@concurrent.test`
}),
clientId: clientIndex,
emailId: emailIndex
});
}
}
console.log(' Sending emails concurrently from multiple clients...');
const startTime = Date.now();
const promises = allEmails.map(({ client, email, clientId, emailId }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Client ${clientId + 1} Email ${emailId + 1} sent`);
return { success: true, clientId, emailId, result };
}).catch(error => {
console.log(` ✗ Client ${clientId + 1} Email ${emailId + 1} failed: ${error.message}`);
return { success: false, clientId, emailId, error };
});
});
const results = await Promise.all(promises);
const endTime = Date.now();
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
console.log(` Concurrent operations completed in ${endTime - startTime}ms`);
console.log(` Total emails: ${allEmails.length}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / allEmails.length) * 100).toFixed(1)}%`);
// Check queue file integrity
if (fs.existsSync(queueFile)) {
try {
const queueData = fs.readFileSync(queueFile, 'utf8');
JSON.parse(queueData);
console.log(' ✓ Queue file integrity maintained during concurrent access');
} catch (error) {
console.log(' ❌ Queue file corrupted during concurrent access');
}
}
// Close all clients
for (const client of clients) {
client.close();
}
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
} finally {
testServer.close();
}
});
// Scenario 5: Queue Data Integrity and Validation
await test.test('Scenario 5: Queue Data Integrity and Validation', async () => {
console.log('\n🔍 Testing queue data integrity and validation...');
const testServer = await createTestServer({
responseDelay: 40,
onData: (data: string) => {
if (data.includes('Subject: Integrity Test')) {
console.log(' [Server] Received integrity test email');
}
}
});
try {
const queueFile = path.join(tempDir, 'integrity-queue.json');
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
console.log(' Creating client with queue integrity checking...');
const smtpClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1,
queuePath: queueFile,
persistQueue: true,
integrityChecks: true,
checksumValidation: true
});
console.log(' Creating test emails with various content types...');
const emails = [
new Email({
from: 'sender@integrity.test',
to: ['recipient1@integrity.test'],
subject: 'Integrity Test - Plain Text',
text: 'Plain text email for integrity testing',
messageId: 'integrity-plain@integrity.test'
}),
new Email({
from: 'sender@integrity.test',
to: ['recipient2@integrity.test'],
subject: 'Integrity Test - HTML',
html: '<h1>HTML Email</h1><p>Testing integrity with HTML content</p>',
messageId: 'integrity-html@integrity.test'
}),
new Email({
from: 'sender@integrity.test',
to: ['recipient3@integrity.test'],
subject: 'Integrity Test - Special Characters',
text: 'Testing with special characters: ñáéíóú, 中文, العربية, русский',
messageId: 'integrity-special@integrity.test'
})
];
console.log(' Sending emails and monitoring queue integrity...');
for (let i = 0; i < emails.length; i++) {
try {
const result = await smtpClient.sendMail(emails[i]);
console.log(` ✓ Email ${i + 1} sent and queued`);
// Check queue file after each email
if (fs.existsSync(queueFile)) {
const queueData = fs.readFileSync(queueFile, 'utf8');
try {
const parsed = JSON.parse(queueData);
console.log(` 📊 Queue contains ${Array.isArray(parsed) ? parsed.length : 'unknown'} items`);
} catch (parseError) {
console.log(' ❌ Queue file parsing failed - integrity compromised');
}
}
} catch (error) {
console.log(` ✗ Email ${i + 1} failed: ${error.message}`);
}
await new Promise(resolve => setTimeout(resolve, 100));
}
console.log(' Performing final integrity validation...');
// Manual integrity check
if (fs.existsSync(queueFile)) {
const fileContent = fs.readFileSync(queueFile, 'utf8');
const fileSize = fileContent.length;
try {
const queueData = JSON.parse(fileContent);
const hasValidStructure = Array.isArray(queueData) || typeof queueData === 'object';
console.log(` Queue file size: ${fileSize} bytes`);
console.log(` Valid JSON structure: ${hasValidStructure ? 'Yes' : 'No'}`);
console.log(` Data integrity: ${hasValidStructure && fileSize > 0 ? 'Maintained' : 'Compromised'}`);
} catch (error) {
console.log(' ❌ Final integrity check failed: Invalid JSON');
}
} else {
console.log(' Queue file not found (may have been processed completely)');
}
smtpClient.close();
if (fs.existsSync(queueFile)) {
fs.unlinkSync(queueFile);
}
} finally {
testServer.close();
}
});
// Cleanup test directory
try {
if (fs.existsSync(tempDir)) {
const files = fs.readdirSync(tempDir);
for (const file of files) {
fs.unlinkSync(path.join(tempDir, file));
}
fs.rmdirSync(tempDir);
}
} catch (error) {
console.log(` Warning: Could not clean up test directory: ${error.message}`);
}
console.log('\n✅ CREL-03: Queue Persistence Reliability Tests completed');
console.log('💾 All queue persistence scenarios tested successfully');
});