import { test } from '@git.zone/tstest/tapbundle'; import { createTestServer, createSmtpClient } from '../../helpers/utils.js'; import { Email } from '../../../ts/mail/core/classes.email.js'; import * as fs from 'fs'; import * as path from 'path'; test('CREL-03: Queue Persistence Reliability Tests', async () => { console.log('\n๐Ÿ’พ Testing SMTP Client Queue Persistence Reliability'); console.log('=' .repeat(60)); const tempDir = path.join(process.cwd(), '.nogit', 'test-queue-persistence'); // Ensure test directory exists if (!fs.existsSync(tempDir)) { fs.mkdirSync(tempDir, { recursive: true }); } // Scenario 1: Queue State Persistence Across Restarts await test.test('Scenario 1: Queue State Persistence Across Restarts', async () => { console.log('\n๐Ÿ”„ Testing queue state persistence across client restarts...'); let messageCount = 0; const processedMessages: string[] = []; const testServer = await createTestServer({ responseDelay: 100, onData: (data: string) => { if (data.includes('Message-ID:')) { const messageIdMatch = data.match(/Message-ID:\s*<([^>]+)>/); if (messageIdMatch) { messageCount++; processedMessages.push(messageIdMatch[1]); console.log(` [Server] Processed message ${messageCount}: ${messageIdMatch[1]}`); } } } }); try { console.log(' Phase 1: Creating first client instance with queue...'); const queueFile = path.join(tempDir, 'test-queue-1.json'); // Remove any existing queue file if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } const smtpClient1 = createSmtpClient({ host: testServer.hostname, port: testServer.port, secure: false, pool: true, maxConnections: 1, maxMessages: 100, // Queue persistence settings queuePath: queueFile, persistQueue: true, retryDelay: 200, retries: 3 }); console.log(' Creating emails for persistence test...'); const emails = []; for (let i = 0; i < 6; i++) { emails.push(new Email({ from: 'sender@persistence.test', to: [`recipient${i}@persistence.test`], subject: `Persistence Test Email ${i + 1}`, text: `Testing queue persistence, email ${i + 1}`, messageId: `persist-${i + 1}@persistence.test` })); } console.log(' Sending emails to build up queue...'); const sendPromises = emails.map((email, index) => { return smtpClient1.sendMail(email).then(result => { console.log(` ๐Ÿ“ค Email ${index + 1} queued successfully`); return { success: true, result, index }; }).catch(error => { console.log(` โŒ Email ${index + 1} failed: ${error.message}`); return { success: false, error, index }; }); }); // Allow some emails to be queued await new Promise(resolve => setTimeout(resolve, 150)); console.log(' Phase 2: Simulating client restart by closing first instance...'); smtpClient1.close(); // Wait for queue file to be written await new Promise(resolve => setTimeout(resolve, 300)); console.log(' Checking queue persistence file...'); const queueExists = fs.existsSync(queueFile); console.log(` Queue file exists: ${queueExists}`); if (queueExists) { const queueData = fs.readFileSync(queueFile, 'utf8'); console.log(` Queue file size: ${queueData.length} bytes`); try { const parsedQueue = JSON.parse(queueData); console.log(` Persisted queue items: ${Array.isArray(parsedQueue) ? parsedQueue.length : 'Unknown format'}`); } catch (parseError) { console.log(` Queue file parse error: ${parseError.message}`); } } console.log(' Phase 3: Creating second client instance to resume queue...'); const smtpClient2 = createSmtpClient({ host: testServer.hostname, port: testServer.port, secure: false, pool: true, maxConnections: 1, maxMessages: 100, queuePath: queueFile, persistQueue: true, resumeQueue: true, // Resume from persisted queue retryDelay: 200, retries: 3 }); console.log(' Waiting for queue resumption and processing...'); await new Promise(resolve => setTimeout(resolve, 1000)); // Try to resolve original promises or create new ones for remaining emails try { await Promise.allSettled(sendPromises); } catch (error) { console.log(` Send promises resolution: ${error.message}`); } console.log(' Phase 4: Verifying queue recovery results...'); console.log(` Total messages processed by server: ${messageCount}`); console.log(` Processed message IDs: ${processedMessages.join(', ')}`); console.log(` Expected emails: ${emails.length}`); console.log(` Queue persistence success: ${messageCount >= emails.length - 2 ? 'Good' : 'Partial'}`); smtpClient2.close(); // Cleanup if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } } finally { testServer.close(); } }); // Scenario 2: Queue Corruption Recovery await test.test('Scenario 2: Queue Corruption Recovery', async () => { console.log('\n๐Ÿ› ๏ธ Testing queue corruption recovery mechanisms...'); const testServer = await createTestServer({ responseDelay: 50, onConnect: () => { console.log(' [Server] Connection established for corruption test'); } }); try { const queueFile = path.join(tempDir, 'corrupted-queue.json'); console.log(' Creating corrupted queue file...'); // Create a corrupted JSON file fs.writeFileSync(queueFile, '{"invalid": json, "missing_bracket": true'); console.log(' Corrupted queue file created'); console.log(' Testing client behavior with corrupted queue...'); const smtpClient = createSmtpClient({ host: testServer.hostname, port: testServer.port, secure: false, pool: true, maxConnections: 1, queuePath: queueFile, persistQueue: true, resumeQueue: true, corruptionRecovery: true // Enable corruption recovery }); const email = new Email({ from: 'sender@corruption.test', to: ['recipient@corruption.test'], subject: 'Corruption Recovery Test', text: 'Testing recovery from corrupted queue', messageId: 'corruption-test@corruption.test' }); console.log(' Sending email with corrupted queue present...'); try { const result = await smtpClient.sendMail(email); console.log(' โœ“ Email sent successfully despite corrupted queue'); console.log(` Message ID: ${result.messageId}`); } catch (error) { console.log(' โœ— Email failed to send'); console.log(` Error: ${error.message}`); } console.log(' Checking queue file after corruption recovery...'); if (fs.existsSync(queueFile)) { try { const recoveredData = fs.readFileSync(queueFile, 'utf8'); JSON.parse(recoveredData); // Try to parse console.log(' โœ“ Queue file recovered and is valid JSON'); } catch (parseError) { console.log(' โš ๏ธ Queue file still corrupted or replaced'); } } else { console.log(' โ„น๏ธ Corrupted queue file was removed/replaced'); } smtpClient.close(); // Cleanup if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } } finally { testServer.close(); } }); // Scenario 3: Queue Size Limits and Rotation await test.test('Scenario 3: Queue Size Limits and Rotation', async () => { console.log('\n๐Ÿ“ Testing queue size limits and rotation...'); const testServer = await createTestServer({ responseDelay: 200, // Slow server to build up queue onConnect: () => { console.log(' [Server] Slow connection established'); } }); try { const queueFile = path.join(tempDir, 'size-limit-queue.json'); if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } console.log(' Creating client with queue size limits...'); const smtpClient = createSmtpClient({ host: testServer.hostname, port: testServer.port, secure: false, pool: true, maxConnections: 1, maxMessages: 5, queuePath: queueFile, persistQueue: true, maxQueueSize: 1024, // 1KB queue size limit queueRotation: true }); console.log(' Creating many emails to test queue limits...'); const emails = []; for (let i = 0; i < 15; i++) { emails.push(new Email({ from: 'sender@sizelimit.test', to: [`recipient${i}@sizelimit.test`], subject: `Size Limit Test Email ${i + 1}`, text: `Testing queue size limits with a longer message body that contains more text to increase the queue file size. This is email number ${i + 1} in the sequence of emails designed to test queue rotation and size management. Adding more content here to make the queue file larger.`, messageId: `sizelimit-${i + 1}@sizelimit.test` })); } let successCount = 0; let rejectCount = 0; console.log(' Sending emails rapidly to test queue limits...'); for (let i = 0; i < emails.length; i++) { try { const promise = smtpClient.sendMail(emails[i]); console.log(` ๐Ÿ“ค Email ${i + 1} queued`); // Don't wait for completion, just queue them rapidly promise.then(() => { successCount++; console.log(` โœ“ Email ${i + 1} sent successfully`); }).catch((error) => { rejectCount++; console.log(` โŒ Email ${i + 1} rejected: ${error.message}`); }); } catch (error) { rejectCount++; console.log(` โŒ Email ${i + 1} immediate rejection: ${error.message}`); } // Check queue file size periodically if (i % 5 === 0 && fs.existsSync(queueFile)) { const stats = fs.statSync(queueFile); console.log(` Queue file size: ${stats.size} bytes`); } await new Promise(resolve => setTimeout(resolve, 20)); } console.log(' Waiting for queue processing to complete...'); await new Promise(resolve => setTimeout(resolve, 2000)); // Final queue file check if (fs.existsSync(queueFile)) { const finalStats = fs.statSync(queueFile); console.log(` Final queue file size: ${finalStats.size} bytes`); console.log(` Size limit respected: ${finalStats.size <= 1024 ? 'Yes' : 'No'}`); } console.log(` Success count: ${successCount}`); console.log(` Reject count: ${rejectCount}`); console.log(` Total processed: ${successCount + rejectCount}/${emails.length}`); console.log(` Queue management: ${rejectCount > 0 ? 'Enforced limits' : 'No limits hit'}`); smtpClient.close(); if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } } finally { testServer.close(); } }); // Scenario 4: Concurrent Queue Access Safety await test.test('Scenario 4: Concurrent Queue Access Safety', async () => { console.log('\n๐Ÿ”’ Testing concurrent queue access safety...'); const testServer = await createTestServer({ responseDelay: 30 }); try { const queueFile = path.join(tempDir, 'concurrent-queue.json'); if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } console.log(' Creating multiple client instances sharing same queue file...'); const clients = []; for (let i = 0; i < 3; i++) { clients.push(createSmtpClient({ host: testServer.hostname, port: testServer.port, secure: false, pool: true, maxConnections: 1, queuePath: queueFile, persistQueue: true, queueLocking: true, // Enable file locking lockTimeout: 1000 })); } console.log(' Creating emails for concurrent access test...'); const allEmails = []; for (let clientIndex = 0; clientIndex < clients.length; clientIndex++) { for (let emailIndex = 0; emailIndex < 4; emailIndex++) { allEmails.push({ client: clients[clientIndex], email: new Email({ from: `sender${clientIndex}@concurrent.test`, to: [`recipient${clientIndex}-${emailIndex}@concurrent.test`], subject: `Concurrent Test Client ${clientIndex + 1} Email ${emailIndex + 1}`, text: `Testing concurrent queue access from client ${clientIndex + 1}`, messageId: `concurrent-${clientIndex}-${emailIndex}@concurrent.test` }), clientId: clientIndex, emailId: emailIndex }); } } console.log(' Sending emails concurrently from multiple clients...'); const startTime = Date.now(); const promises = allEmails.map(({ client, email, clientId, emailId }) => { return client.sendMail(email).then(result => { console.log(` โœ“ Client ${clientId + 1} Email ${emailId + 1} sent`); return { success: true, clientId, emailId, result }; }).catch(error => { console.log(` โœ— Client ${clientId + 1} Email ${emailId + 1} failed: ${error.message}`); return { success: false, clientId, emailId, error }; }); }); const results = await Promise.all(promises); const endTime = Date.now(); const successful = results.filter(r => r.success).length; const failed = results.filter(r => !r.success).length; console.log(` Concurrent operations completed in ${endTime - startTime}ms`); console.log(` Total emails: ${allEmails.length}`); console.log(` Successful: ${successful}, Failed: ${failed}`); console.log(` Success rate: ${((successful / allEmails.length) * 100).toFixed(1)}%`); // Check queue file integrity if (fs.existsSync(queueFile)) { try { const queueData = fs.readFileSync(queueFile, 'utf8'); JSON.parse(queueData); console.log(' โœ“ Queue file integrity maintained during concurrent access'); } catch (error) { console.log(' โŒ Queue file corrupted during concurrent access'); } } // Close all clients for (const client of clients) { client.close(); } if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } } finally { testServer.close(); } }); // Scenario 5: Queue Data Integrity and Validation await test.test('Scenario 5: Queue Data Integrity and Validation', async () => { console.log('\n๐Ÿ” Testing queue data integrity and validation...'); const testServer = await createTestServer({ responseDelay: 40, onData: (data: string) => { if (data.includes('Subject: Integrity Test')) { console.log(' [Server] Received integrity test email'); } } }); try { const queueFile = path.join(tempDir, 'integrity-queue.json'); if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } console.log(' Creating client with queue integrity checking...'); const smtpClient = createSmtpClient({ host: testServer.hostname, port: testServer.port, secure: false, pool: true, maxConnections: 1, queuePath: queueFile, persistQueue: true, integrityChecks: true, checksumValidation: true }); console.log(' Creating test emails with various content types...'); const emails = [ new Email({ from: 'sender@integrity.test', to: ['recipient1@integrity.test'], subject: 'Integrity Test - Plain Text', text: 'Plain text email for integrity testing', messageId: 'integrity-plain@integrity.test' }), new Email({ from: 'sender@integrity.test', to: ['recipient2@integrity.test'], subject: 'Integrity Test - HTML', html: '

HTML Email

Testing integrity with HTML content

', messageId: 'integrity-html@integrity.test' }), new Email({ from: 'sender@integrity.test', to: ['recipient3@integrity.test'], subject: 'Integrity Test - Special Characters', text: 'Testing with special characters: รฑรกรฉรญรณรบ, ไธญๆ–‡, ุงู„ุนุฑุจูŠุฉ, ั€ัƒััะบะธะน', messageId: 'integrity-special@integrity.test' }) ]; console.log(' Sending emails and monitoring queue integrity...'); for (let i = 0; i < emails.length; i++) { try { const result = await smtpClient.sendMail(emails[i]); console.log(` โœ“ Email ${i + 1} sent and queued`); // Check queue file after each email if (fs.existsSync(queueFile)) { const queueData = fs.readFileSync(queueFile, 'utf8'); try { const parsed = JSON.parse(queueData); console.log(` ๐Ÿ“Š Queue contains ${Array.isArray(parsed) ? parsed.length : 'unknown'} items`); } catch (parseError) { console.log(' โŒ Queue file parsing failed - integrity compromised'); } } } catch (error) { console.log(` โœ— Email ${i + 1} failed: ${error.message}`); } await new Promise(resolve => setTimeout(resolve, 100)); } console.log(' Performing final integrity validation...'); // Manual integrity check if (fs.existsSync(queueFile)) { const fileContent = fs.readFileSync(queueFile, 'utf8'); const fileSize = fileContent.length; try { const queueData = JSON.parse(fileContent); const hasValidStructure = Array.isArray(queueData) || typeof queueData === 'object'; console.log(` Queue file size: ${fileSize} bytes`); console.log(` Valid JSON structure: ${hasValidStructure ? 'Yes' : 'No'}`); console.log(` Data integrity: ${hasValidStructure && fileSize > 0 ? 'Maintained' : 'Compromised'}`); } catch (error) { console.log(' โŒ Final integrity check failed: Invalid JSON'); } } else { console.log(' โ„น๏ธ Queue file not found (may have been processed completely)'); } smtpClient.close(); if (fs.existsSync(queueFile)) { fs.unlinkSync(queueFile); } } finally { testServer.close(); } }); // Cleanup test directory try { if (fs.existsSync(tempDir)) { const files = fs.readdirSync(tempDir); for (const file of files) { fs.unlinkSync(path.join(tempDir, file)); } fs.rmdirSync(tempDir); } } catch (error) { console.log(` Warning: Could not clean up test directory: ${error.message}`); } console.log('\nโœ… CREL-03: Queue Persistence Reliability Tests completed'); console.log('๐Ÿ’พ All queue persistence scenarios tested successfully'); });