dcrouter/test/suite/smtpclient_error-handling/test.cerr-09.connection-pool-errors.ts
2025-05-24 17:00:59 +00:00

616 lines
17 KiB
TypeScript

import { tap, expect } from '@git.zone/tstest/tapbundle';
import { startTestSmtpServer } from '../../helpers/server.loader.js';
import { createSmtpClient } from '../../helpers/smtp.client.js';
import { Email } from '../../../ts/mail/core/classes.email.js';
import * as net from 'net';
let testServer: any;
tap.test('setup test SMTP server', async () => {
testServer = await startTestSmtpServer();
expect(testServer).toBeTruthy();
expect(testServer.port).toBeGreaterThan(0);
});
tap.test('CERR-09: Pool exhaustion', async () => {
const pooledClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 3,
maxMessages: 100,
connectionTimeout: 5000,
debug: true
});
console.log('Testing connection pool exhaustion...');
console.log('Pool configuration: maxConnections=3');
// Track pool state
const poolStats = {
active: 0,
idle: 0,
pending: 0,
created: 0,
destroyed: 0
};
pooledClient.on('pool-connection-create', () => {
poolStats.created++;
console.log(` Pool: Connection created (total: ${poolStats.created})`);
});
pooledClient.on('pool-connection-close', () => {
poolStats.destroyed++;
console.log(` Pool: Connection closed (total: ${poolStats.destroyed})`);
});
// Send more concurrent messages than pool size
const messageCount = 10;
const emails = Array.from({ length: messageCount }, (_, i) => new Email({
from: 'sender@example.com',
to: [`recipient${i}@example.com`],
subject: `Pool test ${i}`,
text: 'Testing connection pool exhaustion'
}));
console.log(`\nSending ${messageCount} concurrent messages...`);
const startTime = Date.now();
const results = await Promise.allSettled(
emails.map((email, i) => {
return pooledClient.sendMail(email).then(() => {
console.log(` Message ${i}: Sent`);
return { index: i, status: 'sent' };
}).catch(error => {
console.log(` Message ${i}: Failed - ${error.message}`);
return { index: i, status: 'failed', error: error.message };
});
})
);
const elapsed = Date.now() - startTime;
const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;
console.log(`\nResults after ${elapsed}ms:`);
console.log(` Successful: ${successful}/${messageCount}`);
console.log(` Failed: ${failed}/${messageCount}`);
console.log(` Connections created: ${poolStats.created}`);
console.log(` Connections destroyed: ${poolStats.destroyed}`);
// Pool should limit concurrent connections
expect(poolStats.created).toBeLessThanOrEqual(3);
await pooledClient.close();
});
tap.test('CERR-09: Connection pool timeouts', async () => {
// Create slow server
const slowServer = net.createServer((socket) => {
socket.write('220 Slow Server\r\n');
socket.on('data', (data) => {
const command = data.toString().trim();
// Add delays to simulate slow responses
setTimeout(() => {
if (command.startsWith('EHLO')) {
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
} else {
// Slow response for other commands
setTimeout(() => {
socket.write('250 OK\r\n');
}, 3000); // 3 second delay
}
}, 1000);
});
});
await new Promise<void>((resolve) => {
slowServer.listen(0, '127.0.0.1', () => resolve());
});
const slowPort = (slowServer.address() as net.AddressInfo).port;
const pooledClient = createSmtpClient({
host: '127.0.0.1',
port: slowPort,
secure: false,
pool: true,
maxConnections: 2,
poolTimeout: 2000, // 2 second timeout for getting connection from pool
commandTimeout: 4000,
debug: true
});
console.log('\nTesting connection pool timeouts...');
console.log('Pool timeout: 2 seconds');
// Send multiple messages to trigger pool timeout
const emails = Array.from({ length: 5 }, (_, i) => new Email({
from: 'sender@example.com',
to: [`recipient${i}@example.com`],
subject: `Timeout test ${i}`,
text: 'Testing pool timeout'
}));
const timeoutErrors = [];
await Promise.allSettled(
emails.map(async (email, i) => {
try {
console.log(` Message ${i}: Attempting to send...`);
await pooledClient.sendMail(email);
console.log(` Message ${i}: Sent successfully`);
} catch (error) {
console.log(` Message ${i}: ${error.message}`);
if (error.message.includes('timeout')) {
timeoutErrors.push(error);
}
}
})
);
console.log(`\nTimeout errors: ${timeoutErrors.length}`);
expect(timeoutErrors.length).toBeGreaterThan(0);
await pooledClient.close();
slowServer.close();
});
tap.test('CERR-09: Dead connection detection', async () => {
const pooledClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 3,
poolIdleTimeout: 5000, // Connections idle for 5s are closed
poolPingInterval: 2000, // Ping idle connections every 2s
debug: true
});
console.log('\nTesting dead connection detection...');
// Track connection health checks
let pingCount = 0;
let deadConnections = 0;
pooledClient.on('pool-connection-ping', (result) => {
pingCount++;
console.log(` Ping ${pingCount}: ${result.alive ? 'Connection alive' : 'Connection dead'}`);
if (!result.alive) {
deadConnections++;
}
});
// Send initial message to create connection
await pooledClient.sendMail(new Email({
from: 'sender@example.com',
to: ['recipient@example.com'],
subject: 'Initial message',
text: 'Creating connection'
}));
console.log('Connection created, waiting for health checks...');
// Wait for health checks
await new Promise(resolve => setTimeout(resolve, 6000));
console.log(`\nHealth check results:`);
console.log(` Total pings: ${pingCount}`);
console.log(` Dead connections detected: ${deadConnections}`);
// Send another message to test connection recovery
try {
await pooledClient.sendMail(new Email({
from: 'sender@example.com',
to: ['recipient@example.com'],
subject: 'After idle',
text: 'Testing after idle period'
}));
console.log('Message sent successfully after idle period');
} catch (error) {
console.log('Error after idle:', error.message);
}
await pooledClient.close();
});
tap.test('CERR-09: Pool connection limit per host', async () => {
// Create multiple servers
const servers = [];
for (let i = 0; i < 3; i++) {
const server = net.createServer((socket) => {
socket.write(`220 Server ${i + 1}\r\n`);
socket.on('data', (data) => {
const command = data.toString().trim();
if (command.startsWith('EHLO')) {
socket.write(`250 server${i + 1}.example.com\r\n`);
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
} else {
socket.write('250 OK\r\n');
}
});
});
await new Promise<void>((resolve) => {
server.listen(0, '127.0.0.1', () => resolve());
});
servers.push({
server,
port: (server.address() as net.AddressInfo).port
});
}
console.log('\nTesting per-host connection limits...');
// Create pooled client with per-host limits
const pooledClient = createSmtpClient({
pool: true,
maxConnections: 10, // Total pool size
maxConnectionsPerHost: 2, // Per-host limit
debug: true
});
// Track connections per host
const hostConnections: { [key: string]: number } = {};
pooledClient.on('pool-connection-create', (info) => {
const host = info.host || 'unknown';
hostConnections[host] = (hostConnections[host] || 0) + 1;
console.log(` Created connection to ${host} (total: ${hostConnections[host]})`);
});
// Send messages to different servers
const messages = [];
for (let i = 0; i < 3; i++) {
for (let j = 0; j < 4; j++) {
messages.push({
server: i,
email: new Email({
from: 'sender@example.com',
to: [`recipient${j}@server${i}.com`],
subject: `Test ${j} to server ${i}`,
text: 'Testing per-host limits'
})
});
}
}
// Override host/port for each message
await Promise.allSettled(
messages.map(async ({ server, email }) => {
const client = createSmtpClient({
host: '127.0.0.1',
port: servers[server].port,
secure: false,
pool: true,
maxConnections: 10,
maxConnectionsPerHost: 2,
debug: false
});
try {
await client.sendMail(email);
console.log(` Sent to server ${server + 1}`);
} catch (error) {
console.log(` Failed to server ${server + 1}: ${error.message}`);
}
await client.close();
})
);
console.log('\nConnections per host:');
Object.entries(hostConnections).forEach(([host, count]) => {
console.log(` ${host}: ${count} connections`);
expect(count).toBeLessThanOrEqual(2); // Should respect per-host limit
});
// Clean up servers
servers.forEach(s => s.server.close());
});
tap.test('CERR-09: Connection pool recovery', async () => {
// Create unstable server
let shouldFail = true;
let requestCount = 0;
const unstableServer = net.createServer((socket) => {
requestCount++;
if (shouldFail && requestCount <= 3) {
// Abruptly close connection for first 3 requests
socket.destroy();
return;
}
socket.write('220 Unstable Server\r\n');
socket.on('data', (data) => {
const command = data.toString().trim();
if (command.startsWith('EHLO')) {
socket.write('250 OK\r\n');
} else if (command === 'QUIT') {
socket.write('221 Bye\r\n');
socket.end();
} else {
socket.write('250 OK\r\n');
}
});
});
await new Promise<void>((resolve) => {
unstableServer.listen(0, '127.0.0.1', () => resolve());
});
const unstablePort = (unstableServer.address() as net.AddressInfo).port;
const pooledClient = createSmtpClient({
host: '127.0.0.1',
port: unstablePort,
secure: false,
pool: true,
maxConnections: 2,
retryFailedConnections: true,
connectionRetryDelay: 1000,
debug: true
});
console.log('\nTesting connection pool recovery...');
console.log('Server will fail first 3 connection attempts');
// Track recovery attempts
let recoveryAttempts = 0;
pooledClient.on('pool-connection-retry', () => {
recoveryAttempts++;
console.log(` Recovery attempt ${recoveryAttempts}`);
});
// Try to send messages
const results = [];
for (let i = 0; i < 5; i++) {
const email = new Email({
from: 'sender@example.com',
to: [`recipient${i}@example.com`],
subject: `Recovery test ${i}`,
text: 'Testing connection recovery'
});
try {
console.log(`\nMessage ${i}: Attempting...`);
await pooledClient.sendMail(email);
console.log(`Message ${i}: Success`);
results.push('success');
} catch (error) {
console.log(`Message ${i}: Failed - ${error.message}`);
results.push('failed');
// After some failures, allow connections
if (i === 2) {
shouldFail = false;
console.log(' Server stabilized, connections should succeed now');
}
}
// Small delay between attempts
await new Promise(resolve => setTimeout(resolve, 500));
}
console.log('\nFinal results:', results);
const successCount = results.filter(r => r === 'success').length;
expect(successCount).toBeGreaterThan(0); // Should recover eventually
await pooledClient.close();
unstableServer.close();
});
tap.test('CERR-09: Pool metrics and monitoring', async () => {
const pooledClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 5,
poolMetrics: true,
debug: true
});
console.log('\nTesting pool metrics collection...');
// Collect metrics
const metrics = {
connectionsCreated: 0,
connectionsDestroyed: 0,
messagesQueued: 0,
messagesSent: 0,
errors: 0,
avgWaitTime: 0,
waitTimes: [] as number[]
};
pooledClient.on('pool-metrics', (data) => {
Object.assign(metrics, data);
});
pooledClient.on('message-queued', () => {
metrics.messagesQueued++;
});
pooledClient.on('message-sent', (info) => {
metrics.messagesSent++;
if (info.waitTime) {
metrics.waitTimes.push(info.waitTime);
}
});
// Send batch of messages
const messageCount = 20;
const startTime = Date.now();
await Promise.allSettled(
Array.from({ length: messageCount }, (_, i) =>
pooledClient.sendMail(new Email({
from: 'sender@example.com',
to: [`recipient${i}@example.com`],
subject: `Metrics test ${i}`,
text: 'Testing pool metrics'
}))
)
);
const totalTime = Date.now() - startTime;
// Calculate average wait time
if (metrics.waitTimes.length > 0) {
metrics.avgWaitTime = metrics.waitTimes.reduce((a, b) => a + b, 0) / metrics.waitTimes.length;
}
// Get final pool status
const poolStatus = pooledClient.getPoolStatus();
console.log('\nPool Metrics:');
console.log(` Messages queued: ${metrics.messagesQueued}`);
console.log(` Messages sent: ${metrics.messagesSent}`);
console.log(` Average wait time: ${metrics.avgWaitTime.toFixed(2)}ms`);
console.log(` Total time: ${totalTime}ms`);
console.log(` Throughput: ${(messageCount / totalTime * 1000).toFixed(2)} msg/sec`);
console.log('\nPool Status:');
console.log(` Active connections: ${poolStatus.active}`);
console.log(` Idle connections: ${poolStatus.idle}`);
console.log(` Total connections: ${poolStatus.total}`);
await pooledClient.close();
});
tap.test('CERR-09: Connection affinity', async () => {
const pooledClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 3,
connectionAffinity: 'sender', // Reuse same connection for same sender
debug: true
});
console.log('\nTesting connection affinity...');
// Track which connection handles which sender
const senderConnections: { [sender: string]: string } = {};
pooledClient.on('connection-assigned', (info) => {
senderConnections[info.sender] = info.connectionId;
console.log(` Sender ${info.sender} assigned to connection ${info.connectionId}`);
});
// Send messages from different senders
const senders = ['alice@example.com', 'bob@example.com', 'alice@example.com', 'charlie@example.com', 'bob@example.com'];
for (const sender of senders) {
const email = new Email({
from: sender,
to: ['recipient@example.com'],
subject: `From ${sender}`,
text: 'Testing connection affinity'
});
await pooledClient.sendMail(email);
const connectionId = senderConnections[sender];
console.log(` Message from ${sender} sent via connection ${connectionId}`);
}
// Verify affinity
console.log('\nConnection affinity results:');
const uniqueSenders = [...new Set(senders)];
uniqueSenders.forEach(sender => {
const messages = senders.filter(s => s === sender).length;
console.log(` ${sender}: ${messages} messages, connection ${senderConnections[sender]}`);
});
await pooledClient.close();
});
tap.test('CERR-09: Pool resource cleanup', async () => {
const pooledClient = createSmtpClient({
host: testServer.hostname,
port: testServer.port,
secure: false,
pool: true,
maxConnections: 3,
poolCleanupInterval: 1000, // Clean up every second
debug: true
});
console.log('\nTesting pool resource cleanup...');
// Track cleanup events
const cleanupStats = {
idleClosed: 0,
staleClosed: 0,
errorClosed: 0
};
pooledClient.on('pool-connection-cleanup', (reason) => {
switch (reason.type) {
case 'idle':
cleanupStats.idleClosed++;
console.log(` Closed idle connection: ${reason.connectionId}`);
break;
case 'stale':
cleanupStats.staleClosed++;
console.log(` Closed stale connection: ${reason.connectionId}`);
break;
case 'error':
cleanupStats.errorClosed++;
console.log(` Closed errored connection: ${reason.connectionId}`);
break;
}
});
// Send some messages
for (let i = 0; i < 3; i++) {
await pooledClient.sendMail(new Email({
from: 'sender@example.com',
to: [`recipient${i}@example.com`],
subject: `Cleanup test ${i}`,
text: 'Testing cleanup'
}));
}
console.log('Messages sent, waiting for cleanup...');
// Wait for cleanup cycles
await new Promise(resolve => setTimeout(resolve, 5000));
console.log('\nCleanup statistics:');
console.log(` Idle connections closed: ${cleanupStats.idleClosed}`);
console.log(` Stale connections closed: ${cleanupStats.staleClosed}`);
console.log(` Errored connections closed: ${cleanupStats.errorClosed}`);
const finalStatus = pooledClient.getPoolStatus();
console.log(`\nFinal pool status:`);
console.log(` Active: ${finalStatus.active}`);
console.log(` Idle: ${finalStatus.idle}`);
console.log(` Total: ${finalStatus.total}`);
await pooledClient.close();
});
tap.test('cleanup test SMTP server', async () => {
if (testServer) {
await testServer.stop();
}
});
export default tap.start();