dcrouter/test/suite/smtpclient_edge-cases/test.cedge-07.concurrent-operations.ts

634 lines
22 KiB
TypeScript
Raw Normal View History

2025-05-24 17:00:59 +00:00
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as plugins from './plugins.js';
import { createTestServer } from '../../helpers/server.loader.js';
import { createSmtpClient } from '../../helpers/smtp.client.js';
tap.test('CEDGE-07: should handle concurrent operations correctly', async (tools) => {
const testId = 'CEDGE-07-concurrent-operations';
console.log(`\n${testId}: Testing concurrent operation handling...`);
let scenarioCount = 0;
// Scenario 1: Multiple simultaneous connections
await (async () => {
scenarioCount++;
console.log(`\nScenario ${scenarioCount}: Testing multiple simultaneous connections`);
let activeConnections = 0;
let totalConnections = 0;
const testServer = await createTestServer({
onConnection: async (socket) => {
activeConnections++;
totalConnections++;
const connectionId = totalConnections;
console.log(` [Server] Connection ${connectionId} established (active: ${activeConnections})`);
socket.write('220 mail.example.com ESMTP\r\n');
socket.on('close', () => {
activeConnections--;
console.log(` [Server] Connection ${connectionId} closed (active: ${activeConnections})`);
});
socket.on('data', (data) => {
const command = data.toString().trim();
console.log(` [Server] Connection ${connectionId} received: ${command}`);
if (command.startsWith('EHLO')) {
socket.write('250-mail.example.com\r\n');
socket.write('250 OK\r\n');
} else if (command.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (command.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (command === 'DATA') {
socket.write('354 Start mail input\r\n');
} else if (command === '.') {
socket.write(`250 OK: Message ${connectionId} accepted\r\n`);
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
}
});
// Send multiple emails concurrently
const concurrentCount = 5;
const promises = Array(concurrentCount).fill(null).map(async (_, i) => {
const client = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false
});
const email = new plugins.smartmail.Email({
from: `sender${i + 1}@example.com`,
to: [`recipient${i + 1}@example.com`],
subject: `Concurrent test ${i + 1}`,
text: `This is concurrent email number ${i + 1}`
});
console.log(` Starting email ${i + 1}...`);
const start = Date.now();
const result = await client.sendMail(email);
const elapsed = Date.now() - start;
console.log(` Email ${i + 1} completed in ${elapsed}ms`);
return { index: i + 1, result, elapsed };
});
const results = await Promise.all(promises);
results.forEach(({ index, result, elapsed }) => {
expect(result).toBeDefined();
expect(result.messageId).toBeDefined();
console.log(` Email ${index}: Success (${elapsed}ms)`);
});
await testServer.server.close();
})();
// Scenario 2: Concurrent operations on pooled connection
await (async () => {
scenarioCount++;
console.log(`\nScenario ${scenarioCount}: Testing concurrent operations on pooled connections`);
let connectionCount = 0;
const connectionMessages = new Map<any, number>();
const testServer = await createTestServer({
onConnection: async (socket) => {
connectionCount++;
const connId = connectionCount;
connectionMessages.set(socket, 0);
console.log(` [Server] Pooled connection ${connId} established`);
socket.write('220 mail.example.com ESMTP\r\n');
socket.on('close', () => {
const msgCount = connectionMessages.get(socket) || 0;
connectionMessages.delete(socket);
console.log(` [Server] Connection ${connId} closed after ${msgCount} messages`);
});
socket.on('data', (data) => {
const command = data.toString().trim();
if (command.startsWith('EHLO')) {
socket.write('250-mail.example.com\r\n');
socket.write('250-PIPELINING\r\n');
socket.write('250 OK\r\n');
} else if (command.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (command.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (command === 'DATA') {
socket.write('354 Start mail input\r\n');
} else if (command === '.') {
const msgCount = (connectionMessages.get(socket) || 0) + 1;
connectionMessages.set(socket, msgCount);
socket.write(`250 OK: Message ${msgCount} on connection ${connId}\r\n`);
} else if (command === 'RSET') {
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
}
});
// Create pooled client
const pooledClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 3,
maxMessages: 100
});
// Send many emails concurrently through the pool
const emailCount = 10;
const promises = Array(emailCount).fill(null).map(async (_, i) => {
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: [`recipient${i + 1}@example.com`],
subject: `Pooled email ${i + 1}`,
text: `Testing connection pooling with email ${i + 1}`
});
const start = Date.now();
const result = await pooledClient.sendMail(email);
const elapsed = Date.now() - start;
return { index: i + 1, result, elapsed };
});
const results = await Promise.all(promises);
let totalTime = 0;
results.forEach(({ index, result, elapsed }) => {
totalTime += elapsed;
expect(result).toBeDefined();
expect(result.messageId).toBeDefined();
});
console.log(` All ${emailCount} emails sent successfully`);
console.log(` Average time per email: ${Math.round(totalTime / emailCount)}ms`);
console.log(` Total connections used: ${connectionCount} (pool size: 3)`);
// Close pooled connections
await pooledClient.close();
await testServer.server.close();
})();
// Scenario 3: Race conditions with rapid commands
await (async () => {
scenarioCount++;
console.log(`\nScenario ${scenarioCount}: Testing race conditions with rapid commands`);
const testServer = await createTestServer({
onConnection: async (socket) => {
console.log(' [Server] Client connected');
socket.write('220 mail.example.com ESMTP\r\n');
let commandBuffer: string[] = [];
let processing = false;
const processCommand = async (command: string) => {
// Simulate async processing with variable delays
const delay = Math.random() * 100;
await new Promise(resolve => setTimeout(resolve, delay));
if (command.startsWith('EHLO')) {
socket.write('250-mail.example.com\r\n');
socket.write('250-PIPELINING\r\n');
socket.write('250 OK\r\n');
} else if (command.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (command.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (command === 'DATA') {
socket.write('354 Start mail input\r\n');
} else if (command === '.') {
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
};
const processQueue = async () => {
if (processing || commandBuffer.length === 0) return;
processing = true;
while (commandBuffer.length > 0) {
const cmd = commandBuffer.shift()!;
console.log(` [Server] Processing: ${cmd}`);
await processCommand(cmd);
}
processing = false;
};
socket.on('data', (data) => {
const commands = data.toString().split('\r\n').filter(cmd => cmd.length > 0);
commands.forEach(cmd => {
console.log(` [Server] Queued: ${cmd}`);
commandBuffer.push(cmd);
});
processQueue();
});
}
});
const smtpClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false
});
// Send email with rapid command sequence
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: ['recipient1@example.com', 'recipient2@example.com', 'recipient3@example.com'],
subject: 'Testing rapid commands',
text: 'This tests race conditions with pipelined commands'
});
const result = await smtpClient.sendMail(email);
console.log(` Result: ${result.messageId ? 'Success' : 'Failed'}`);
expect(result).toBeDefined();
expect(result.messageId).toBeDefined();
await testServer.server.close();
})();
// Scenario 4: Concurrent authentication attempts
await (async () => {
scenarioCount++;
console.log(`\nScenario ${scenarioCount}: Testing concurrent authentication`);
let authAttempts = 0;
const testServer = await createTestServer({
onConnection: async (socket) => {
console.log(' [Server] Client connected');
socket.write('220 mail.example.com ESMTP\r\n');
socket.on('data', (data) => {
const command = data.toString().trim();
console.log(` [Server] Received: ${command}`);
if (command.startsWith('EHLO')) {
socket.write('250-mail.example.com\r\n');
socket.write('250-AUTH PLAIN LOGIN\r\n');
socket.write('250 OK\r\n');
} else if (command.startsWith('AUTH')) {
authAttempts++;
console.log(` [Server] Auth attempt ${authAttempts}`);
// Simulate auth processing delay
setTimeout(() => {
if (command.includes('PLAIN')) {
socket.write('235 2.7.0 Authentication successful\r\n');
} else {
socket.write('334 VXNlcm5hbWU6\r\n'); // Username:
}
}, 100);
} else if (Buffer.from(command, 'base64').toString().includes('testuser')) {
socket.write('334 UGFzc3dvcmQ6\r\n'); // Password:
} else if (Buffer.from(command, 'base64').toString().includes('testpass')) {
socket.write('235 2.7.0 Authentication successful\r\n');
} else if (command.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (command.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (command === 'DATA') {
socket.write('354 Start mail input\r\n');
} else if (command === '.') {
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
}
});
// Send multiple authenticated emails concurrently
const authPromises = Array(3).fill(null).map(async (_, i) => {
const client = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
auth: {
user: 'testuser',
pass: 'testpass'
}
});
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: [`recipient${i + 1}@example.com`],
subject: `Concurrent auth test ${i + 1}`,
text: `Testing concurrent authentication ${i + 1}`
});
console.log(` Starting authenticated email ${i + 1}...`);
const result = await client.sendMail(email);
console.log(` Authenticated email ${i + 1} completed`);
return result;
});
const authResults = await Promise.all(authPromises);
authResults.forEach((result, i) => {
expect(result).toBeDefined();
expect(result.messageId).toBeDefined();
console.log(` Auth email ${i + 1}: Success`);
});
await testServer.server.close();
})();
// Scenario 5: Concurrent TLS upgrades
await (async () => {
scenarioCount++;
console.log(`\nScenario ${scenarioCount}: Testing concurrent STARTTLS upgrades`);
let tlsUpgrades = 0;
const testServer = await createTestServer({
secure: false,
onConnection: async (socket) => {
console.log(' [Server] Client connected');
socket.write('220 mail.example.com ESMTP\r\n');
socket.on('data', (data) => {
const command = data.toString().trim();
console.log(` [Server] Received: ${command}`);
if (command.startsWith('EHLO')) {
socket.write('250-mail.example.com\r\n');
socket.write('250-STARTTLS\r\n');
socket.write('250 OK\r\n');
} else if (command === 'STARTTLS') {
tlsUpgrades++;
console.log(` [Server] TLS upgrade ${tlsUpgrades}`);
socket.write('220 2.0.0 Ready to start TLS\r\n');
// Note: In real test, would upgrade to TLS here
// For this test, we'll continue in plain text
} else if (command.startsWith('MAIL FROM:')) {
socket.write('250 OK\r\n');
} else if (command.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (command === 'DATA') {
socket.write('354 Start mail input\r\n');
} else if (command === '.') {
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
}
});
// Send multiple emails with STARTTLS concurrently
const tlsPromises = Array(3).fill(null).map(async (_, i) => {
const client = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
requireTLS: false // Would be true in production
});
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: [`recipient${i + 1}@example.com`],
subject: `TLS upgrade test ${i + 1}`,
text: `Testing concurrent TLS upgrades ${i + 1}`
});
console.log(` Starting TLS email ${i + 1}...`);
const result = await client.sendMail(email);
console.log(` TLS email ${i + 1} completed`);
return result;
});
const tlsResults = await Promise.all(tlsPromises);
tlsResults.forEach((result, i) => {
expect(result).toBeDefined();
expect(result.messageId).toBeDefined();
});
console.log(` Total TLS upgrades: ${tlsUpgrades}`);
await testServer.server.close();
})();
// Scenario 6: Mixed concurrent operations
await (async () => {
scenarioCount++;
console.log(`\nScenario ${scenarioCount}: Testing mixed concurrent operations`);
const stats = {
connections: 0,
messages: 0,
errors: 0,
timeouts: 0
};
const testServer = await createTestServer({
onConnection: async (socket) => {
stats.connections++;
const connId = stats.connections;
console.log(` [Server] Connection ${connId} established`);
socket.write('220 mail.example.com ESMTP\r\n');
let messageInProgress = false;
socket.on('data', async (data) => {
const command = data.toString().trim();
// Simulate various server behaviors
const behavior = connId % 4;
if (command.startsWith('EHLO')) {
if (behavior === 0) {
// Normal response
socket.write('250-mail.example.com\r\n');
socket.write('250 OK\r\n');
} else if (behavior === 1) {
// Slow response
await new Promise(resolve => setTimeout(resolve, 500));
socket.write('250-mail.example.com\r\n');
socket.write('250 OK\r\n');
} else if (behavior === 2) {
// Temporary error
socket.write('421 4.3.2 Service temporarily unavailable\r\n');
stats.errors++;
socket.end();
} else {
// Normal with extensions
socket.write('250-mail.example.com\r\n');
socket.write('250-PIPELINING\r\n');
socket.write('250-SIZE 10485760\r\n');
socket.write('250 OK\r\n');
}
} else if (command.startsWith('MAIL FROM:')) {
messageInProgress = true;
socket.write('250 OK\r\n');
} else if (command.startsWith('RCPT TO:')) {
socket.write('250 OK\r\n');
} else if (command === 'DATA') {
socket.write('354 Start mail input\r\n');
} else if (command === '.') {
if (messageInProgress) {
stats.messages++;
messageInProgress = false;
}
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
}
});
// Simulate connection timeout for some connections
if (behavior === 3) {
setTimeout(() => {
if (!socket.destroyed) {
console.log(` [Server] Connection ${connId} timed out`);
stats.timeouts++;
socket.destroy();
}
}, 2000);
}
}
});
// Send various types of operations concurrently
const operations = [
// Normal emails
...Array(5).fill(null).map((_, i) => ({
type: 'normal',
index: i,
action: async () => {
const client = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false
});
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: [`recipient${i + 1}@example.com`],
subject: `Normal email ${i + 1}`,
text: 'Testing mixed operations'
});
return await client.sendMail(email);
}
})),
// Large emails
...Array(2).fill(null).map((_, i) => ({
type: 'large',
index: i,
action: async () => {
const client = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false
});
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: ['recipient@example.com'],
subject: `Large email ${i + 1}`,
text: 'X'.repeat(100000) // 100KB
});
return await client.sendMail(email);
}
})),
// Multiple recipient emails
...Array(3).fill(null).map((_, i) => ({
type: 'multi',
index: i,
action: async () => {
const client = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false
});
const email = new plugins.smartmail.Email({
from: 'sender@example.com',
to: Array(10).fill(null).map((_, j) => `recipient${j + 1}@example.com`),
subject: `Multi-recipient email ${i + 1}`,
text: 'Testing multiple recipients'
});
return await client.sendMail(email);
}
}))
];
console.log(` Starting ${operations.length} mixed operations...`);
const results = await Promise.allSettled(
operations.map(async (op) => {
const start = Date.now();
try {
const result = await op.action();
const elapsed = Date.now() - start;
return { ...op, success: true, elapsed, result };
} catch (error) {
const elapsed = Date.now() - start;
return { ...op, success: false, elapsed, error: error.message };
}
})
);
// Analyze results
const summary = {
normal: { success: 0, failed: 0 },
large: { success: 0, failed: 0 },
multi: { success: 0, failed: 0 }
};
results.forEach((result) => {
if (result.status === 'fulfilled') {
const { type, success, elapsed } = result.value;
if (success) {
summary[type].success++;
} else {
summary[type].failed++;
}
console.log(` ${type} operation: ${success ? 'Success' : 'Failed'} (${elapsed}ms)`);
}
});
console.log('\n Summary:');
console.log(` - Normal emails: ${summary.normal.success}/${summary.normal.success + summary.normal.failed} successful`);
console.log(` - Large emails: ${summary.large.success}/${summary.large.success + summary.large.failed} successful`);
console.log(` - Multi-recipient: ${summary.multi.success}/${summary.multi.success + summary.multi.failed} successful`);
console.log(` - Server stats: ${stats.connections} connections, ${stats.messages} messages, ${stats.errors} errors, ${stats.timeouts} timeouts`);
await testServer.server.close();
})();
console.log(`\n${testId}: All ${scenarioCount} concurrent operation scenarios tested ✓`);
});