dcrouter/test/suite/smtpserver_performance/test.perf-02.concurrency.ts
2025-05-25 19:05:43 +00:00

388 lines
12 KiB
TypeScript

import * as plugins from '@git.zone/tstest/tapbundle';
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as net from 'net';
import { startTestServer, stopTestServer } from '../../helpers/server.loader.js';
const TEST_PORT = 2525;
let testServer;
tap.test('prepare server', async () => {
testServer = await startTestServer({ port: TEST_PORT });
await new Promise(resolve => setTimeout(resolve, 100));
});
tap.test('PERF-02: Concurrency testing - Multiple simultaneous connections', async (tools) => {
const done = tools.defer();
const concurrentCount = 20;
const connectionResults: Array<{
connectionId: number;
success: boolean;
duration: number;
error?: string;
}> = [];
const createConcurrentConnection = (connectionId: number): Promise<void> => {
return new Promise((resolve) => {
const startTime = Date.now();
const socket = net.createConnection({
host: 'localhost',
port: TEST_PORT,
timeout: 10000
});
let state = 'connecting';
let receivedData = '';
const timeoutHandle = setTimeout(() => {
socket.destroy();
connectionResults.push({
connectionId,
success: false,
duration: Date.now() - startTime,
error: 'Connection timeout'
});
resolve();
}, 10000);
socket.on('connect', () => {
state = 'connected';
});
socket.on('data', (chunk) => {
receivedData += chunk.toString();
const lines = receivedData.split('\r\n');
for (const line of lines) {
if (!line.trim()) continue;
if (state === 'connected' && line.startsWith('220')) {
state = 'ehlo';
socket.write(`EHLO testhost-${connectionId}\r\n`);
} else if (state === 'ehlo' && line.includes('250 ') && !line.includes('250-')) {
// Final 250 response received
state = 'quit';
socket.write('QUIT\r\n');
} else if (state === 'quit' && line.startsWith('221')) {
clearTimeout(timeoutHandle);
socket.end();
connectionResults.push({
connectionId,
success: true,
duration: Date.now() - startTime
});
resolve();
}
}
});
socket.on('error', (error) => {
clearTimeout(timeoutHandle);
connectionResults.push({
connectionId,
success: false,
duration: Date.now() - startTime,
error: error.message
});
resolve();
});
socket.on('close', () => {
clearTimeout(timeoutHandle);
if (!connectionResults.find(r => r.connectionId === connectionId)) {
connectionResults.push({
connectionId,
success: false,
duration: Date.now() - startTime,
error: 'Connection closed unexpectedly'
});
}
resolve();
});
});
};
try {
// Create all concurrent connections
const promises: Promise<void>[] = [];
console.log(`Creating ${concurrentCount} concurrent connections...`);
for (let i = 0; i < concurrentCount; i++) {
promises.push(createConcurrentConnection(i));
// Small stagger to avoid overwhelming the system
if (i % 5 === 0) {
await new Promise(resolve => setTimeout(resolve, 10));
}
}
// Wait for all connections to complete
await Promise.all(promises);
// Analyze results
const successful = connectionResults.filter(r => r.success).length;
const failed = connectionResults.filter(r => !r.success).length;
const successRate = successful / concurrentCount;
const avgDuration = connectionResults
.filter(r => r.success)
.reduce((sum, r) => sum + r.duration, 0) / successful || 0;
console.log(`\nConcurrency Test Results:`);
console.log(`Total connections: ${concurrentCount}`);
console.log(`Successful: ${successful} (${(successRate * 100).toFixed(1)}%)`);
console.log(`Failed: ${failed}`);
console.log(`Average duration: ${avgDuration.toFixed(0)}ms`);
if (failed > 0) {
const errors = connectionResults
.filter(r => !r.success)
.map(r => r.error)
.filter((v, i, a) => a.indexOf(v) === i); // unique errors
console.log(`Unique errors: ${errors.join(', ')}`);
}
// Success if at least 80% of connections succeed
expect(successRate).toBeGreaterThanOrEqual(0.8);
done.resolve();
} catch (error) {
done.reject(error);
}
});
tap.test('PERF-02: Concurrency testing - Concurrent transactions', async (tools) => {
const done = tools.defer();
const transactionCount = 10;
const transactionResults: Array<{
transactionId: number;
success: boolean;
duration: number;
error?: string;
}> = [];
const performConcurrentTransaction = (transactionId: number): Promise<void> => {
return new Promise((resolve) => {
const startTime = Date.now();
const socket = net.createConnection({
host: 'localhost',
port: TEST_PORT,
timeout: 15000
});
let state = 'connecting';
const timeoutHandle = setTimeout(() => {
socket.destroy();
transactionResults.push({
transactionId,
success: false,
duration: Date.now() - startTime,
error: 'Transaction timeout'
});
resolve();
}, 15000);
const processResponse = async () => {
try {
// Read greeting
await new Promise<void>((res) => {
let greeting = '';
const handleGreeting = (chunk: Buffer) => {
greeting += chunk.toString();
if (greeting.includes('220') && greeting.includes('\r\n')) {
socket.removeListener('data', handleGreeting);
res();
}
};
socket.on('data', handleGreeting);
});
// Send EHLO
socket.write(`EHLO testhost-tx-${transactionId}\r\n`);
await new Promise<void>((res) => {
let data = '';
const handleData = (chunk: Buffer) => {
data += chunk.toString();
// Look for the end of EHLO response (250 without dash)
if (data.includes('250 ')) {
socket.removeListener('data', handleData);
res();
}
};
socket.on('data', handleData);
});
// Complete email transaction
socket.write(`MAIL FROM:<sender${transactionId}@example.com>\r\n`);
await new Promise<void>((res, rej) => {
let mailResponse = '';
const handleMailResponse = (chunk: Buffer) => {
mailResponse += chunk.toString();
if (mailResponse.includes('\r\n')) {
socket.removeListener('data', handleMailResponse);
if (!mailResponse.includes('250')) {
rej(new Error('MAIL FROM failed'));
} else {
res();
}
}
};
socket.on('data', handleMailResponse);
});
socket.write(`RCPT TO:<recipient${transactionId}@example.com>\r\n`);
await new Promise<void>((res, rej) => {
let rcptResponse = '';
const handleRcptResponse = (chunk: Buffer) => {
rcptResponse += chunk.toString();
if (rcptResponse.includes('\r\n')) {
socket.removeListener('data', handleRcptResponse);
if (!rcptResponse.includes('250')) {
rej(new Error('RCPT TO failed'));
} else {
res();
}
}
};
socket.on('data', handleRcptResponse);
});
socket.write('DATA\r\n');
await new Promise<void>((res, rej) => {
let dataResponse = '';
const handleDataResponse = (chunk: Buffer) => {
dataResponse += chunk.toString();
if (dataResponse.includes('\r\n')) {
socket.removeListener('data', handleDataResponse);
if (!dataResponse.includes('354')) {
rej(new Error('DATA command failed'));
} else {
res();
}
}
};
socket.on('data', handleDataResponse);
});
// Send email content
const emailContent = [
`From: sender${transactionId}@example.com`,
`To: recipient${transactionId}@example.com`,
`Subject: Concurrent test ${transactionId}`,
'',
`This is concurrent test message ${transactionId}`,
'.',
''
].join('\r\n');
socket.write(emailContent);
await new Promise<void>((res, rej) => {
let submitResponse = '';
const handleSubmitResponse = (chunk: Buffer) => {
submitResponse += chunk.toString();
if (submitResponse.includes('\r\n') && submitResponse.includes('250')) {
socket.removeListener('data', handleSubmitResponse);
res();
} else if (submitResponse.includes('\r\n') && (submitResponse.includes('4') || submitResponse.includes('5'))) {
socket.removeListener('data', handleSubmitResponse);
rej(new Error('Message submission failed'));
}
};
socket.on('data', handleSubmitResponse);
});
socket.write('QUIT\r\n');
await new Promise<void>((res) => {
socket.once('data', () => res());
});
clearTimeout(timeoutHandle);
socket.end();
transactionResults.push({
transactionId,
success: true,
duration: Date.now() - startTime
});
resolve();
} catch (error) {
clearTimeout(timeoutHandle);
socket.end();
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
console.log(`Transaction ${transactionId} failed: ${errorMsg}`);
transactionResults.push({
transactionId,
success: false,
duration: Date.now() - startTime,
error: errorMsg
});
resolve();
}
};
socket.on('connect', () => {
state = 'connected';
processResponse();
});
socket.on('error', (error) => {
clearTimeout(timeoutHandle);
if (!transactionResults.find(r => r.transactionId === transactionId)) {
transactionResults.push({
transactionId,
success: false,
duration: Date.now() - startTime,
error: error.message
});
}
resolve();
});
});
};
try {
// Create concurrent transactions
const promises: Promise<void>[] = [];
console.log(`\nStarting ${transactionCount} concurrent email transactions...`);
for (let i = 0; i < transactionCount; i++) {
promises.push(performConcurrentTransaction(i));
// Small stagger
await new Promise(resolve => setTimeout(resolve, 50));
}
// Wait for all transactions
await Promise.all(promises);
// Analyze results
const successful = transactionResults.filter(r => r.success).length;
const failed = transactionResults.filter(r => !r.success).length;
const successRate = successful / transactionCount;
const avgDuration = transactionResults
.filter(r => r.success)
.reduce((sum, r) => sum + r.duration, 0) / successful || 0;
console.log(`\nConcurrent Transaction Results:`);
console.log(`Total transactions: ${transactionCount}`);
console.log(`Successful: ${successful} (${(successRate * 100).toFixed(1)}%)`);
console.log(`Failed: ${failed}`);
console.log(`Average duration: ${avgDuration.toFixed(0)}ms`);
// Success if at least 80% of transactions complete
expect(successRate).toBeGreaterThanOrEqual(0.8);
done.resolve();
} catch (error) {
done.reject(error);
}
});
tap.test('cleanup server', async () => {
await stopTestServer(testServer);
});
export default tap.start();