dcrouter/test/suite/smtpclient_reliability/test.crel-06.concurrency-safety.ts
2025-05-24 18:12:08 +00:00

547 lines
21 KiB
TypeScript

import { test } from '@git.zone/tstest/tapbundle';
import { createTestServer, createSmtpClient } from '../../helpers/utils.js';
import { Email } from '../../../ts/mail/core/classes.email.js';
import * as crypto from 'crypto';
test('CREL-06: Concurrent Operation Safety Reliability Tests', async () => {
console.log('\n⚡ Testing SMTP Client Concurrent Operation Safety');
console.log('=' .repeat(60));
// Scenario 1: Simultaneous Connection Management
await test.test('Scenario 1: Simultaneous Connection Management', async () => {
console.log('\n🔗 Testing simultaneous connection management safety...');
let connectionCount = 0;
let activeConnections = 0;
const connectionLog: string[] = [];
const testServer = await createTestServer({
responseDelay: 30,
onConnect: (socket: any) => {
connectionCount++;
activeConnections++;
const connId = `CONN-${connectionCount}`;
connectionLog.push(`${new Date().toISOString()}: ${connId} OPENED (active: ${activeConnections})`);
console.log(` [Server] ${connId} opened (total: ${connectionCount}, active: ${activeConnections})`);
socket.on('close', () => {
activeConnections--;
connectionLog.push(`${new Date().toISOString()}: ${connId} CLOSED (active: ${activeConnections})`);
console.log(` [Server] ${connId} closed (active: ${activeConnections})`);
});
}
});
try {
console.log(' Creating multiple SMTP clients with shared connection pool...');
const clients = [];
for (let i = 0; i < 5; i++) {
clients.push(createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 3,
maxMessages: 10,
connectionTimeout: 2000,
threadSafe: true
}));
}
console.log(' Launching concurrent email sending operations...');
const emailBatches = clients.map((client, clientIndex) => {
return Array.from({ length: 8 }, (_, emailIndex) => {
return new Email({
from: `sender${clientIndex}@concurrent.test`,
to: [`recipient${clientIndex}-${emailIndex}@concurrent.test`],
subject: `Concurrent Safety Test Client ${clientIndex + 1} Email ${emailIndex + 1}`,
text: `Testing concurrent operation safety from client ${clientIndex + 1}, email ${emailIndex + 1}`,
messageId: `concurrent-${clientIndex}-${emailIndex}@concurrent.test`
});
});
});
const startTime = Date.now();
const allPromises: Promise<any>[] = [];
// Launch all email operations simultaneously
emailBatches.forEach((emails, clientIndex) => {
emails.forEach((email, emailIndex) => {
const promise = clients[clientIndex].sendMail(email).then(result => {
console.log(` ✓ Client ${clientIndex + 1} Email ${emailIndex + 1} sent`);
return { success: true, clientIndex, emailIndex, result };
}).catch(error => {
console.log(` ✗ Client ${clientIndex + 1} Email ${emailIndex + 1} failed: ${error.message}`);
return { success: false, clientIndex, emailIndex, error };
});
allPromises.push(promise);
});
});
const results = await Promise.all(allPromises);
const endTime = Date.now();
// Close all clients
clients.forEach(client => client.close());
// Wait for connections to close
await new Promise(resolve => setTimeout(resolve, 500));
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
const totalEmails = emailBatches.flat().length;
console.log(`\n Concurrent operation results:`);
console.log(` Total operations: ${totalEmails}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / totalEmails) * 100).toFixed(1)}%`);
console.log(` Execution time: ${endTime - startTime}ms`);
console.log(` Peak connections: ${Math.max(...connectionLog.map(log => {
const match = log.match(/active: (\d+)/);
return match ? parseInt(match[1]) : 0;
}))}`);
console.log(` Connection management: ${activeConnections === 0 ? 'Clean' : 'Connections remaining'}`);
} finally {
testServer.close();
}
});
// Scenario 2: Thread-Safe Queue Operations
await test.test('Scenario 2: Thread-Safe Queue Operations', async () => {
console.log('\n🔒 Testing thread-safe queue operations...');
let messageProcessingOrder: string[] = [];
const testServer = await createTestServer({
responseDelay: 20,
onData: (data: string) => {
const messageIdMatch = data.match(/Message-ID:\s*<([^>]+)>/);
if (messageIdMatch) {
messageProcessingOrder.push(messageIdMatch[1]);
console.log(` [Server] Processing: ${messageIdMatch[1]}`);
}
}
});
try {
console.log(' Creating SMTP client with thread-safe queue...');
const smtpClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 2,
maxMessages: 50,
queueSafety: true,
lockingMode: 'strict'
});
console.log(' Launching concurrent queue operations...');
const operations: Promise<any>[] = [];
const emailGroups = ['A', 'B', 'C', 'D'];
// Create concurrent operations that modify the queue
emailGroups.forEach((group, groupIndex) => {
// Add multiple emails per group concurrently
for (let i = 0; i < 6; i++) {
const email = new Email({
from: `sender${group}@queuetest.example`,
to: [`recipient${group}${i}@queuetest.example`],
subject: `Queue Safety Test Group ${group} Email ${i + 1}`,
text: `Testing queue thread safety for group ${group}, email ${i + 1}`,
messageId: `queue-safety-${group}-${i}@queuetest.example`
});
const operation = smtpClient.sendMail(email).then(result => {
return {
success: true,
group,
index: i,
messageId: email.messageId,
timestamp: Date.now()
};
}).catch(error => {
return {
success: false,
group,
index: i,
messageId: email.messageId,
error: error.message
};
});
operations.push(operation);
}
});
const startTime = Date.now();
const results = await Promise.all(operations);
const endTime = Date.now();
// Wait for all processing to complete
await new Promise(resolve => setTimeout(resolve, 300));
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
console.log(`\n Queue safety results:`);
console.log(` Total queue operations: ${operations.length}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / operations.length) * 100).toFixed(1)}%`);
console.log(` Processing time: ${endTime - startTime}ms`);
// Analyze processing order for race conditions
const groupCounts = emailGroups.reduce((acc, group) => {
acc[group] = messageProcessingOrder.filter(id => id.includes(`-${group}-`)).length;
return acc;
}, {} as Record<string, number>);
console.log(` Processing distribution:`);
Object.entries(groupCounts).forEach(([group, count]) => {
console.log(` Group ${group}: ${count} emails processed`);
});
const totalProcessed = Object.values(groupCounts).reduce((a, b) => a + b, 0);
console.log(` Queue integrity: ${totalProcessed === successful ? 'Maintained' : 'Potential race condition'}`);
smtpClient.close();
} finally {
testServer.close();
}
});
// Scenario 3: Concurrent Error Handling
await test.test('Scenario 3: Concurrent Error Handling', async () => {
console.log('\n❌ Testing concurrent error handling safety...');
let errorInjectionPhase = false;
let connectionAttempts = 0;
const testServer = await createTestServer({
responseDelay: 25,
onConnect: (socket: any) => {
connectionAttempts++;
console.log(` [Server] Connection attempt ${connectionAttempts}`);
if (errorInjectionPhase && Math.random() < 0.4) {
console.log(` [Server] Injecting connection error ${connectionAttempts}`);
socket.destroy();
return;
}
},
onData: (data: string, socket: any) => {
if (errorInjectionPhase && data.includes('MAIL FROM') && Math.random() < 0.3) {
console.log(' [Server] Injecting SMTP error');
socket.write('450 Temporary failure, please retry\r\n');
return false;
}
return true;
}
});
try {
console.log(' Creating multiple clients for concurrent error testing...');
const clients = [];
for (let i = 0; i < 4; i++) {
clients.push(createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 2,
retryDelay: 100,
retries: 3,
errorHandling: 'concurrent-safe',
failureRecovery: true
}));
}
const emails = [];
for (let clientIndex = 0; clientIndex < clients.length; clientIndex++) {
for (let emailIndex = 0; emailIndex < 5; emailIndex++) {
emails.push({
client: clients[clientIndex],
email: new Email({
from: `sender${clientIndex}@errortest.example`,
to: [`recipient${clientIndex}-${emailIndex}@errortest.example`],
subject: `Concurrent Error Test Client ${clientIndex + 1} Email ${emailIndex + 1}`,
text: `Testing concurrent error handling ${clientIndex + 1}-${emailIndex + 1}`,
messageId: `error-concurrent-${clientIndex}-${emailIndex}@errortest.example`
}),
clientIndex,
emailIndex
});
}
}
console.log(' Phase 1: Normal operation...');
const phase1Results = [];
const phase1Emails = emails.slice(0, 8); // First 8 emails
const phase1Promises = phase1Emails.map(({ client, email, clientIndex, emailIndex }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Phase 1: Client ${clientIndex + 1} Email ${emailIndex + 1} sent`);
return { success: true, phase: 1, clientIndex, emailIndex };
}).catch(error => {
console.log(` ✗ Phase 1: Client ${clientIndex + 1} Email ${emailIndex + 1} failed`);
return { success: false, phase: 1, clientIndex, emailIndex, error: error.message };
});
});
const phase1Resolved = await Promise.all(phase1Promises);
phase1Results.push(...phase1Resolved);
console.log(' Phase 2: Error injection enabled...');
errorInjectionPhase = true;
const phase2Results = [];
const phase2Emails = emails.slice(8); // Remaining emails
const phase2Promises = phase2Emails.map(({ client, email, clientIndex, emailIndex }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Phase 2: Client ${clientIndex + 1} Email ${emailIndex + 1} recovered`);
return { success: true, phase: 2, clientIndex, emailIndex };
}).catch(error => {
console.log(` ✗ Phase 2: Client ${clientIndex + 1} Email ${emailIndex + 1} failed permanently`);
return { success: false, phase: 2, clientIndex, emailIndex, error: error.message };
});
});
const phase2Resolved = await Promise.all(phase2Promises);
phase2Results.push(...phase2Resolved);
// Close all clients
clients.forEach(client => client.close());
const phase1Success = phase1Results.filter(r => r.success).length;
const phase2Success = phase2Results.filter(r => r.success).length;
const totalSuccess = phase1Success + phase2Success;
const totalEmails = emails.length;
console.log(`\n Concurrent error handling results:`);
console.log(` Phase 1 (normal): ${phase1Success}/${phase1Results.length} successful`);
console.log(` Phase 2 (errors): ${phase2Success}/${phase2Results.length} successful`);
console.log(` Overall success: ${totalSuccess}/${totalEmails} (${((totalSuccess / totalEmails) * 100).toFixed(1)}%)`);
console.log(` Error resilience: ${phase2Success > 0 ? 'Good' : 'Poor'}`);
console.log(` Concurrent error safety: ${phase1Success === phase1Results.length ? 'Maintained' : 'Compromised'}`);
} finally {
testServer.close();
}
});
// Scenario 4: Resource Contention Management
await test.test('Scenario 4: Resource Contention Management', async () => {
console.log('\n🏁 Testing resource contention management...');
const testServer = await createTestServer({
responseDelay: 40, // Slower responses to create contention
maxConnections: 3, // Limit server connections
onConnect: (socket: any) => {
console.log(' [Server] New connection established');
}
});
try {
console.log(' Creating high-contention scenario with limited resources...');
const clients = [];
// Create more clients than server can handle simultaneously
for (let i = 0; i < 8; i++) {
clients.push(createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 1, // Force contention
maxMessages: 10,
connectionTimeout: 3000,
resourceContention: 'managed',
backoffStrategy: 'exponential'
}));
}
const emails = [];
clients.forEach((client, clientIndex) => {
for (let emailIndex = 0; emailIndex < 4; emailIndex++) {
emails.push({
client,
email: new Email({
from: `sender${clientIndex}@contention.test`,
to: [`recipient${clientIndex}-${emailIndex}@contention.test`],
subject: `Resource Contention Test ${clientIndex + 1}-${emailIndex + 1}`,
text: `Testing resource contention management ${clientIndex + 1}-${emailIndex + 1}`,
messageId: `contention-${clientIndex}-${emailIndex}@contention.test`
}),
clientIndex,
emailIndex
});
}
});
console.log(' Launching high-contention operations...');
const startTime = Date.now();
const promises = emails.map(({ client, email, clientIndex, emailIndex }) => {
return client.sendMail(email).then(result => {
console.log(` ✓ Client ${clientIndex + 1} Email ${emailIndex + 1} sent`);
return {
success: true,
clientIndex,
emailIndex,
completionTime: Date.now() - startTime
};
}).catch(error => {
console.log(` ✗ Client ${clientIndex + 1} Email ${emailIndex + 1} failed: ${error.message}`);
return {
success: false,
clientIndex,
emailIndex,
error: error.message,
completionTime: Date.now() - startTime
};
});
});
const results = await Promise.all(promises);
const endTime = Date.now();
// Close all clients
clients.forEach(client => client.close());
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
const avgCompletionTime = results
.filter(r => r.success)
.reduce((sum, r) => sum + r.completionTime, 0) / successful || 0;
console.log(`\n Resource contention results:`);
console.log(` Total operations: ${emails.length}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / emails.length) * 100).toFixed(1)}%`);
console.log(` Total execution time: ${endTime - startTime}ms`);
console.log(` Average completion time: ${avgCompletionTime.toFixed(0)}ms`);
console.log(` Resource management: ${successful > emails.length * 0.8 ? 'Effective' : 'Needs improvement'}`);
} finally {
testServer.close();
}
});
// Scenario 5: Data Race Prevention
await test.test('Scenario 5: Data Race Prevention', async () => {
console.log('\n🏃 Testing data race prevention mechanisms...');
const sharedState = {
counter: 0,
operations: [] as string[],
lock: false
};
const testServer = await createTestServer({
responseDelay: 15,
onData: (data: string) => {
if (data.includes('Data Race Test')) {
// Simulate shared state access
if (!sharedState.lock) {
sharedState.lock = true;
sharedState.counter++;
sharedState.operations.push(`Operation ${sharedState.counter} at ${Date.now()}`);
sharedState.lock = false;
}
}
}
});
try {
console.log(' Setting up concurrent operations that access shared state...');
const smtpClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 4,
racePreventionMode: true,
atomicOperations: true
});
const iterations = 20;
const concurrentOperations: Promise<any>[] = [];
console.log(' Launching concurrent operations...');
for (let i = 0; i < iterations; i++) {
const email = new Email({
from: 'sender@datarace.test',
to: [`recipient${i}@datarace.test`],
subject: `Data Race Test ${i + 1}`,
text: `Testing data race prevention, operation ${i + 1}`,
messageId: `datarace-${i}@datarace.test`
});
const operation = smtpClient.sendMail(email).then(result => {
return {
success: true,
operationId: i + 1,
messageId: result.messageId,
timestamp: Date.now()
};
}).catch(error => {
return {
success: false,
operationId: i + 1,
error: error.message,
timestamp: Date.now()
};
});
concurrentOperations.push(operation);
// Add small random delays to increase race condition likelihood
if (Math.random() < 0.3) {
await new Promise(resolve => setTimeout(resolve, 1));
}
}
const results = await Promise.all(concurrentOperations);
// Wait for shared state operations to complete
await new Promise(resolve => setTimeout(resolve, 200));
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
console.log(`\n Data race prevention results:`);
console.log(` Concurrent operations: ${iterations}`);
console.log(` Successful: ${successful}, Failed: ${failed}`);
console.log(` Success rate: ${((successful / iterations) * 100).toFixed(1)}%`);
console.log(` Shared state counter: ${sharedState.counter}`);
console.log(` State operations recorded: ${sharedState.operations.length}`);
console.log(` Data consistency: ${sharedState.counter === sharedState.operations.length ? 'Maintained' : 'Race condition detected'}`);
console.log(` Race prevention: ${sharedState.counter <= successful ? 'Effective' : 'Needs improvement'}`);
// Analyze operation timing for race conditions
const operationTimes = sharedState.operations.map(op => {
const match = op.match(/at (\d+)/);
return match ? parseInt(match[1]) : 0;
});
if (operationTimes.length > 1) {
const timeGaps = [];
for (let i = 1; i < operationTimes.length; i++) {
timeGaps.push(operationTimes[i] - operationTimes[i - 1]);
}
const avgGap = timeGaps.reduce((a, b) => a + b, 0) / timeGaps.length;
console.log(` Average operation gap: ${avgGap.toFixed(1)}ms`);
console.log(` Timing consistency: ${avgGap > 0 ? 'Sequential' : 'Potential overlap'}`);
}
smtpClient.close();
} finally {
testServer.close();
}
});
console.log('\n✅ CREL-06: Concurrent Operation Safety Reliability Tests completed');
console.log('⚡ All concurrency safety scenarios tested successfully');
});