dcrouter/test/suite/smtpclient_reliability/test.crel-06.concurrency-safety.ts
2025-05-26 14:50:55 +00:00

558 lines
20 KiB
TypeScript

import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as net from 'net';
import { createTestSmtpClient } from '../../helpers/smtp.client.js';
import { Email } from '../../../ts/mail/core/classes.email.js';
tap.test('CREL-06: Simultaneous Connection Management', async () => {
console.log('\n⚡ Testing SMTP Client Concurrent Operation Safety');
console.log('=' .repeat(60));
console.log('\n🔗 Testing simultaneous connection management safety...');
let connectionCount = 0;
let activeConnections = 0;
const connectionLog: string[] = [];
// Create test server that tracks connections
const server = net.createServer(socket => {
connectionCount++;
activeConnections++;
const connId = `CONN-${connectionCount}`;
connectionLog.push(`${new Date().toISOString()}: ${connId} OPENED (active: ${activeConnections})`);
console.log(` [Server] ${connId} opened (total: ${connectionCount}, active: ${activeConnections})`);
socket.on('close', () => {
activeConnections--;
connectionLog.push(`${new Date().toISOString()}: ${connId} CLOSED (active: ${activeConnections})`);
console.log(` [Server] ${connId} closed (active: ${activeConnections})`);
});
socket.write('220 localhost SMTP Test Server\r\n');
socket.on('data', (data) => {
const lines = data.toString().split('\r\n');
lines.forEach(line => {
if (line.startsWith('EHLO') || line.startsWith('HELO')) {
socket.write('250-localhost\r\n');
socket.write('250 SIZE 10485760\r\n');
} else if (line.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (line.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (line === 'DATA') {
socket.write('354 Send data\r\n');
} else if (line === '.') {
socket.write('250 OK Message accepted\r\n');
} else if (line === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
});
});
await new Promise<void>((resolve) => {
server.listen(0, '127.0.0.1', () => {
resolve();
});
});
const port = (server.address() as net.AddressInfo).port;
try {
console.log(' Creating multiple SMTP clients with shared connection pool settings...');
const clients = [];
for (let i = 0; i < 5; i++) {
clients.push(createTestSmtpClient({
host: '127.0.0.1',
port: port,
secure: false,
maxConnections: 3, // Allow up to 3 connections
maxMessages: 10,
connectionTimeout: 2000
}));
}
console.log(' Launching concurrent email sending operations...');
const emailBatches = clients.map((client, clientIndex) => {
return Array.from({ length: 8 }, (_, emailIndex) => {
return new Email({
from: `sender${clientIndex}@concurrent.test`,
to: [`recipient${clientIndex}-${emailIndex}@concurrent.test`],
subject: `Concurrent Safety Test Client ${clientIndex + 1} Email ${emailIndex + 1}`,
text: `Testing concurrent operation safety from client ${clientIndex + 1}, email ${emailIndex + 1}`
});
});
});
const startTime = Date.now();
const allPromises: Promise<any>[] = [];
// Launch all email operations simultaneously
emailBatches.forEach((emails, clientIndex) => {
emails.forEach((email, emailIndex) => {
const promise = clients[clientIndex].sendMail(email).then(result => {
console.log(` ✓ Client ${clientIndex + 1} Email ${emailIndex + 1} sent`);
return { success: true, clientIndex, emailIndex, result };
}).catch(error => {
console.log(` ✗ Client ${clientIndex + 1} Email ${emailIndex + 1} failed: ${error.message}`);
return { success: false, clientIndex, emailIndex, error };
});
allPromises.push(promise);
});
});
const results = await Promise.all(allPromises);
const endTime = Date.now();
// Close all clients
clients.forEach(client => client.close());
// Wait for connections to close
await new Promise(resolve => setTimeout(resolve, 500));
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
const totalEmails = emailBatches.flat().length;
console.log(`\n Concurrent operation results:`);
console.log(` Total operations: ${totalEmails}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / totalEmails) * 100).toFixed(1)}%`);
console.log(` Execution time: ${endTime - startTime}ms`);
console.log(` Peak connections: ${Math.max(...connectionLog.map(log => {
const match = log.match(/active: (\d+)/);
return match ? parseInt(match[1]) : 0;
}))}`);
console.log(` Connection management: ${activeConnections === 0 ? 'Clean' : 'Connections remaining'}`);
expect(successful).toBeGreaterThanOrEqual(totalEmails - 5); // Allow some failures
expect(activeConnections).toEqual(0); // All connections should be closed
} finally {
server.close();
}
});
tap.test('CREL-06: Concurrent Queue Operations', async () => {
console.log('\n🔒 Testing concurrent queue operations...');
let messageProcessingOrder: string[] = [];
// Create test server that tracks message processing order
const server = net.createServer(socket => {
socket.write('220 localhost SMTP Test Server\r\n');
let inData = false;
let currentData = '';
socket.on('data', (data) => {
const lines = data.toString().split('\r\n');
lines.forEach(line => {
if (inData) {
if (line === '.') {
// Extract Message-ID from email data
const messageIdMatch = currentData.match(/Message-ID:\s*<([^>]+)>/);
if (messageIdMatch) {
messageProcessingOrder.push(messageIdMatch[1]);
console.log(` [Server] Processing: ${messageIdMatch[1]}`);
}
socket.write('250 OK Message accepted\r\n');
inData = false;
currentData = '';
} else {
currentData += line + '\r\n';
}
} else {
if (line.startsWith('EHLO') || line.startsWith('HELO')) {
socket.write('250-localhost\r\n');
socket.write('250 SIZE 10485760\r\n');
} else if (line.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (line.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (line === 'DATA') {
socket.write('354 Send data\r\n');
inData = true;
} else if (line === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
}
});
});
});
await new Promise<void>((resolve) => {
server.listen(0, '127.0.0.1', () => {
resolve();
});
});
const port = (server.address() as net.AddressInfo).port;
try {
console.log(' Creating SMTP client for concurrent queue operations...');
const smtpClient = createTestSmtpClient({
host: '127.0.0.1',
port: port,
secure: false,
maxConnections: 2,
maxMessages: 50
});
console.log(' Launching concurrent queue operations...');
const operations: Promise<any>[] = [];
const emailGroups = ['A', 'B', 'C', 'D'];
// Create concurrent operations that use the queue
emailGroups.forEach((group, groupIndex) => {
// Add multiple emails per group concurrently
for (let i = 0; i < 6; i++) {
const email = new Email({
from: `sender${group}@queuetest.example`,
to: [`recipient${group}${i}@queuetest.example`],
subject: `Queue Safety Test Group ${group} Email ${i + 1}`,
text: `Testing queue safety for group ${group}, email ${i + 1}`
});
const operation = smtpClient.sendMail(email).then(result => {
return {
success: true,
group,
index: i,
messageId: result.messageId,
timestamp: Date.now()
};
}).catch(error => {
return {
success: false,
group,
index: i,
error: error.message
};
});
operations.push(operation);
}
});
const startTime = Date.now();
const results = await Promise.all(operations);
const endTime = Date.now();
// Wait for all processing to complete
await new Promise(resolve => setTimeout(resolve, 300));
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
console.log(`\n Queue safety results:`);
console.log(` Total queue operations: ${operations.length}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / operations.length) * 100).toFixed(1)}%`);
console.log(` Processing time: ${endTime - startTime}ms`);
// Analyze processing order
const groupCounts = emailGroups.reduce((acc, group) => {
acc[group] = messageProcessingOrder.filter(id => id && id.includes(`${group}`)).length;
return acc;
}, {} as Record<string, number>);
console.log(` Processing distribution:`);
Object.entries(groupCounts).forEach(([group, count]) => {
console.log(` Group ${group}: ${count} emails processed`);
});
const totalProcessed = Object.values(groupCounts).reduce((a, b) => a + b, 0);
console.log(` Queue integrity: ${totalProcessed === successful ? 'Maintained' : 'Some messages lost'}`);
expect(successful).toBeGreaterThanOrEqual(operations.length - 2); // Allow minimal failures
smtpClient.close();
} finally {
server.close();
}
});
tap.test('CREL-06: Concurrent Error Handling', async () => {
console.log('\n❌ Testing concurrent error handling safety...');
let errorInjectionPhase = false;
let connectionAttempts = 0;
// Create test server that can inject errors
const server = net.createServer(socket => {
connectionAttempts++;
console.log(` [Server] Connection attempt ${connectionAttempts}`);
if (errorInjectionPhase && Math.random() < 0.4) {
console.log(` [Server] Injecting connection error ${connectionAttempts}`);
socket.destroy();
return;
}
socket.write('220 localhost SMTP Test Server\r\n');
socket.on('data', (data) => {
const lines = data.toString().split('\r\n');
lines.forEach(line => {
if (errorInjectionPhase && line.startsWith('MAIL FROM') && Math.random() < 0.3) {
console.log(' [Server] Injecting SMTP error');
socket.write('450 Temporary failure, please retry\r\n');
return;
}
if (line.startsWith('EHLO') || line.startsWith('HELO')) {
socket.write('250-localhost\r\n');
socket.write('250 SIZE 10485760\r\n');
} else if (line.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (line.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (line === 'DATA') {
socket.write('354 Send data\r\n');
} else if (line === '.') {
socket.write('250 OK Message accepted\r\n');
} else if (line === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
});
});
await new Promise<void>((resolve) => {
server.listen(0, '127.0.0.1', () => {
resolve();
});
});
const port = (server.address() as net.AddressInfo).port;
try {
console.log(' Creating multiple clients for concurrent error testing...');
const clients = [];
for (let i = 0; i < 4; i++) {
clients.push(createTestSmtpClient({
host: '127.0.0.1',
port: port,
secure: false,
maxConnections: 2,
connectionTimeout: 3000
}));
}
const emails = [];
for (let clientIndex = 0; clientIndex < clients.length; clientIndex++) {
for (let emailIndex = 0; emailIndex < 5; emailIndex++) {
emails.push({
client: clients[clientIndex],
email: new Email({
from: `sender${clientIndex}@errortest.example`,
to: [`recipient${clientIndex}-${emailIndex}@errortest.example`],
subject: `Concurrent Error Test Client ${clientIndex + 1} Email ${emailIndex + 1}`,
text: `Testing concurrent error handling ${clientIndex + 1}-${emailIndex + 1}`
}),
clientIndex,
emailIndex
});
}
}
console.log(' Phase 1: Normal operation...');
const phase1Results = [];
const phase1Emails = emails.slice(0, 8); // First 8 emails
const phase1Promises = phase1Emails.map(({ client, email, clientIndex, emailIndex }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Phase 1: Client ${clientIndex + 1} Email ${emailIndex + 1} sent`);
return { success: true, phase: 1, clientIndex, emailIndex };
}).catch(error => {
console.log(` ✗ Phase 1: Client ${clientIndex + 1} Email ${emailIndex + 1} failed`);
return { success: false, phase: 1, clientIndex, emailIndex, error: error.message };
});
});
const phase1Resolved = await Promise.all(phase1Promises);
phase1Results.push(...phase1Resolved);
console.log(' Phase 2: Error injection enabled...');
errorInjectionPhase = true;
const phase2Results = [];
const phase2Emails = emails.slice(8); // Remaining emails
const phase2Promises = phase2Emails.map(({ client, email, clientIndex, emailIndex }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Phase 2: Client ${clientIndex + 1} Email ${emailIndex + 1} recovered`);
return { success: true, phase: 2, clientIndex, emailIndex };
}).catch(error => {
console.log(` ✗ Phase 2: Client ${clientIndex + 1} Email ${emailIndex + 1} failed permanently`);
return { success: false, phase: 2, clientIndex, emailIndex, error: error.message };
});
});
const phase2Resolved = await Promise.all(phase2Promises);
phase2Results.push(...phase2Resolved);
// Close all clients
clients.forEach(client => client.close());
const phase1Success = phase1Results.filter(r => r.success).length;
const phase2Success = phase2Results.filter(r => r.success).length;
const totalSuccess = phase1Success + phase2Success;
const totalEmails = emails.length;
console.log(`\n Concurrent error handling results:`);
console.log(` Phase 1 (normal): ${phase1Success}/${phase1Results.length} successful`);
console.log(` Phase 2 (errors): ${phase2Success}/${phase2Results.length} successful`);
console.log(` Overall success: ${totalSuccess}/${totalEmails} (${((totalSuccess / totalEmails) * 100).toFixed(1)}%)`);
console.log(` Error resilience: ${phase2Success > 0 ? 'Good' : 'Poor'}`);
console.log(` Concurrent error safety: ${phase1Success === phase1Results.length ? 'Maintained' : 'Some failures'}`);
expect(phase1Success).toBeGreaterThanOrEqual(phase1Results.length - 1); // Most should succeed
expect(phase2Success).toBeGreaterThanOrEqual(1); // Some should succeed despite errors
} finally {
server.close();
}
});
tap.test('CREL-06: Resource Contention Management', async () => {
console.log('\n🏁 Testing resource contention management...');
// Create test server with limited capacity
const server = net.createServer(socket => {
console.log(' [Server] New connection established');
socket.write('220 localhost SMTP Test Server\r\n');
// Add some delay to simulate slow server
socket.on('data', (data) => {
setTimeout(() => {
const lines = data.toString().split('\r\n');
lines.forEach(line => {
if (line.startsWith('EHLO') || line.startsWith('HELO')) {
socket.write('250-localhost\r\n');
socket.write('250 SIZE 10485760\r\n');
} else if (line.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (line.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (line === 'DATA') {
socket.write('354 Send data\r\n');
} else if (line === '.') {
socket.write('250 OK Message accepted\r\n');
} else if (line === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
}, 20); // Add 20ms delay to responses
});
});
server.maxConnections = 3; // Limit server connections
await new Promise<void>((resolve) => {
server.listen(0, '127.0.0.1', () => {
resolve();
});
});
const port = (server.address() as net.AddressInfo).port;
try {
console.log(' Creating high-contention scenario with limited resources...');
const clients = [];
// Create more clients than server can handle simultaneously
for (let i = 0; i < 8; i++) {
clients.push(createTestSmtpClient({
host: '127.0.0.1',
port: port,
secure: false,
maxConnections: 1, // Force contention
maxMessages: 10,
connectionTimeout: 3000
}));
}
const emails = [];
clients.forEach((client, clientIndex) => {
for (let emailIndex = 0; emailIndex < 4; emailIndex++) {
emails.push({
client,
email: new Email({
from: `sender${clientIndex}@contention.test`,
to: [`recipient${clientIndex}-${emailIndex}@contention.test`],
subject: `Resource Contention Test ${clientIndex + 1}-${emailIndex + 1}`,
text: `Testing resource contention management ${clientIndex + 1}-${emailIndex + 1}`
}),
clientIndex,
emailIndex
});
}
});
console.log(' Launching high-contention operations...');
const startTime = Date.now();
const promises = emails.map(({ client, email, clientIndex, emailIndex }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Client ${clientIndex + 1} Email ${emailIndex + 1} sent`);
return {
success: true,
clientIndex,
emailIndex,
completionTime: Date.now() - startTime
};
}).catch(error => {
console.log(` ✗ Client ${clientIndex + 1} Email ${emailIndex + 1} failed: ${error.message}`);
return {
success: false,
clientIndex,
emailIndex,
error: error.message,
completionTime: Date.now() - startTime
};
});
});
const results = await Promise.all(promises);
const endTime = Date.now();
// Close all clients
clients.forEach(client => client.close());
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
const avgCompletionTime = results
.filter(r => r.success)
.reduce((sum, r) => sum + r.completionTime, 0) / successful || 0;
console.log(`\n Resource contention results:`);
console.log(` Total operations: ${emails.length}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / emails.length) * 100).toFixed(1)}%`);
console.log(` Total execution time: ${endTime - startTime}ms`);
console.log(` Average completion time: ${avgCompletionTime.toFixed(0)}ms`);
console.log(` Resource management: ${successful > emails.length * 0.8 ? 'Effective' : 'Needs improvement'}`);
expect(successful).toBeGreaterThanOrEqual(emails.length * 0.7); // At least 70% should succeed
} finally {
server.close();
}
});
tap.test('CREL-06: Test Summary', async () => {
console.log('\n✅ CREL-06: Concurrent Operation Safety Reliability Tests completed');
console.log('⚡ All concurrency safety scenarios tested successfully');
});
tap.start();