This commit is contained in:
Philipp Kunz 2025-05-21 10:00:06 +00:00
parent 162795802f
commit ecb913843c
3 changed files with 442 additions and 68 deletions

View File

@ -407,8 +407,8 @@ export class SmtpClient {
// Clear previous extensions
this.supportedExtensions.clear();
// Send EHLO
const response = await this.sendCommand(`EHLO ${this.options.domain}`);
// Send EHLO - don't allow pipelining for this command
const response = await this.sendCommand(`EHLO ${this.options.domain}`, false);
// Parse supported extensions
const lines = response.split('\r\n');
@ -420,7 +420,13 @@ export class SmtpClient {
}
}
// Check if server supports pipelining
this.supportsPipelining = this.supportedExtensions.has('PIPELINING');
logger.log('debug', `Server supports extensions: ${Array.from(this.supportedExtensions).join(', ')}`);
if (this.supportsPipelining) {
logger.log('info', 'Server supports PIPELINING - will use for improved performance');
}
}
/**
@ -661,20 +667,75 @@ export class SmtpClient {
result.dkimSigned = true;
}
// Send MAIL FROM
// Get envelope and recipients
const envelope_from = email.getEnvelopeFrom() || email.from;
await this.sendCommand(`MAIL FROM:<${envelope_from}> SIZE=${this.getEmailSize(email)}`);
// Send RCPT TO for each recipient
const recipients = email.getAllRecipients();
for (const recipient of recipients) {
// Check if we can use pipelining for MAIL FROM and RCPT TO commands
if (this.supportsPipelining && recipients.length > 0) {
logger.log('debug', 'Using SMTP pipelining for sending');
// Send MAIL FROM command first (always needed)
const mailFromCmd = `MAIL FROM:<${envelope_from}> SIZE=${this.getEmailSize(email)}`;
let mailFromResponse: string;
try {
await this.sendCommand(`RCPT TO:<${recipient}>`);
result.acceptedRecipients.push(recipient);
mailFromResponse = await this.sendCommand(mailFromCmd);
if (!mailFromResponse.startsWith('250')) {
throw new MtaDeliveryError(
`MAIL FROM command failed: ${mailFromResponse}`,
{
data: {
command: mailFromCmd,
response: mailFromResponse
}
}
);
}
} catch (error) {
logger.log('warn', `Recipient ${recipient} rejected: ${error.message}`);
result.rejectedRecipients.push(recipient);
logger.log('error', `MAIL FROM failed: ${error.message}`);
throw error;
}
// Pipeline all RCPT TO commands
const rcptPromises = recipients.map(recipient => {
return this.sendCommand(`RCPT TO:<${recipient}>`)
.then(response => {
if (response.startsWith('250')) {
result.acceptedRecipients.push(recipient);
return { recipient, accepted: true, response };
} else {
result.rejectedRecipients.push(recipient);
logger.log('warn', `Recipient ${recipient} rejected: ${response}`);
return { recipient, accepted: false, response };
}
})
.catch(error => {
result.rejectedRecipients.push(recipient);
logger.log('warn', `Recipient ${recipient} rejected with error: ${error.message}`);
return { recipient, accepted: false, error: error.message };
});
});
// Wait for all RCPT TO commands to complete
await Promise.all(rcptPromises);
} else {
// Fall back to sequential commands if pipelining not supported
logger.log('debug', 'Using sequential SMTP commands for sending');
// Send MAIL FROM
await this.sendCommand(`MAIL FROM:<${envelope_from}> SIZE=${this.getEmailSize(email)}`);
// Send RCPT TO for each recipient
for (const recipient of recipients) {
try {
await this.sendCommand(`RCPT TO:<${recipient}>`);
result.acceptedRecipients.push(recipient);
} catch (error) {
logger.log('warn', `Recipient ${recipient} rejected: ${error.message}`);
result.rejectedRecipients.push(recipient);
}
}
}
@ -705,7 +766,7 @@ export class SmtpClient {
);
}
// Format email content (simplified for now)
// Format email content efficiently
const emailContent = await this.getFormattedEmail(email);
// Send email content
@ -923,7 +984,26 @@ export class SmtpClient {
* Send SMTP command and wait for response
* @param command SMTP command to send
*/
private async sendCommand(command: string): Promise<string> {
// Queue for command pipelining
private commandQueue: Array<{
command: string;
resolve: (response: string) => void;
reject: (error: any) => void;
timeout: NodeJS.Timeout;
}> = [];
// Flag to indicate if we're currently processing commands
private processingCommands = false;
// Flag to indicate if server supports pipelining
private supportsPipelining = false;
/**
* Send an SMTP command and wait for response
* @param command SMTP command to send
* @param allowPipelining Whether this command can be pipelined
*/
private async sendCommand(command: string, allowPipelining = true): Promise<string> {
if (!this.socket) {
throw new MtaConnectionError(
'Not connected to server',
@ -946,6 +1026,12 @@ export class SmtpClient {
return new Promise<string>((resolve, reject) => {
// Set up timeout for command
const timeout = setTimeout(() => {
// Remove this command from the queue if it times out
const index = this.commandQueue.findIndex(item => item.command === command);
if (index !== -1) {
this.commandQueue.splice(index, 1);
}
reject(MtaTimeoutError.commandTimeout(
command.split(' ')[0],
this.options.host,
@ -953,35 +1039,168 @@ export class SmtpClient {
));
}, this.options.commandTimeout);
// Send command
this.socket.write(command + '\r\n', (err) => {
if (err) {
clearTimeout(timeout);
reject(new MtaConnectionError(
`Failed to send command: ${err.message}`,
{
data: {
command: command.split(' ')[0],
error: err.message
}
}
));
}
// Add command to the queue
this.commandQueue.push({
command,
resolve,
reject,
timeout
});
// Process command queue if we can pipeline or if not currently processing commands
if ((this.supportsPipelining && allowPipelining) || !this.processingCommands) {
this.processCommandQueue();
}
});
}
/**
* Process the command queue - either one by one or pipelined if supported
*/
private processCommandQueue(): void {
if (this.processingCommands || this.commandQueue.length === 0 || !this.socket) {
return;
}
this.processingCommands = true;
try {
// If pipelining is supported, send all commands at once
if (this.supportsPipelining) {
// Send all commands in queue at once
const commands = this.commandQueue.map(item => item.command).join('\r\n') + '\r\n';
this.socket.write(commands, (err) => {
if (err) {
// Handle write error for all commands
const error = new MtaConnectionError(
`Failed to send commands: ${err.message}`,
{
data: {
error: err.message
}
}
);
// Fail all pending commands
while (this.commandQueue.length > 0) {
const item = this.commandQueue.shift();
clearTimeout(item.timeout);
item.reject(error);
}
this.processingCommands = false;
}
});
// Process responses one by one in order
this.processResponses();
} else {
// Process commands one by one if pipelining not supported
this.processNextCommand();
}
} catch (error) {
logger.log('error', `Error processing command queue: ${error.message}`);
this.processingCommands = false;
}
}
/**
* Process the next command in the queue (non-pipelined mode)
*/
private processNextCommand(): void {
if (this.commandQueue.length === 0 || !this.socket) {
this.processingCommands = false;
return;
}
const currentCommand = this.commandQueue[0];
this.socket.write(currentCommand.command + '\r\n', (err) => {
if (err) {
// Handle write error
const error = new MtaConnectionError(
`Failed to send command: ${err.message}`,
{
data: {
command: currentCommand.command.split(' ')[0],
error: err.message
}
}
);
// Remove from queue
this.commandQueue.shift();
clearTimeout(currentCommand.timeout);
currentCommand.reject(error);
// Continue with next command
this.processNextCommand();
return;
}
// Read response
this.readResponse()
.then((response) => {
clearTimeout(timeout);
resolve(response);
// Remove from queue and resolve
this.commandQueue.shift();
clearTimeout(currentCommand.timeout);
currentCommand.resolve(response);
// Process next command
this.processNextCommand();
})
.catch((err) => {
clearTimeout(timeout);
reject(err);
// Remove from queue and reject
this.commandQueue.shift();
clearTimeout(currentCommand.timeout);
currentCommand.reject(err);
// Process next command
this.processNextCommand();
});
});
}
/**
* Process responses for pipelined commands
*/
private async processResponses(): Promise<void> {
try {
// Process responses for each command in order
while (this.commandQueue.length > 0) {
const currentCommand = this.commandQueue[0];
try {
// Wait for response
const response = await this.readResponse();
// Remove from queue and resolve
this.commandQueue.shift();
clearTimeout(currentCommand.timeout);
currentCommand.resolve(response);
} catch (error) {
// Remove from queue and reject
this.commandQueue.shift();
clearTimeout(currentCommand.timeout);
currentCommand.reject(error);
// Stop processing if this is a critical error
if (
error instanceof MtaConnectionError &&
(error.message.includes('Connection closed') || error.message.includes('Not connected'))
) {
break;
}
}
}
} catch (error) {
logger.log('error', `Error processing responses: ${error.message}`);
} finally {
this.processingCommands = false;
}
}
/**
* Read response from the server
*/
@ -999,37 +1218,45 @@ export class SmtpClient {
}
return new Promise<string>((resolve, reject) => {
let responseData = '';
// Use an array to collect response chunks instead of string concatenation
const responseChunks: Buffer[] = [];
// Single function to clean up all listeners
const cleanupListeners = () => {
if (!this.socket) return;
this.socket.removeListener('data', onData);
this.socket.removeListener('error', onError);
this.socket.removeListener('close', onClose);
this.socket.removeListener('end', onEnd);
};
const onData = (data: Buffer) => {
responseData += data.toString();
// Store buffer directly, avoiding unnecessary string conversion
responseChunks.push(data);
// Convert to string only for response checking
const responseData = Buffer.concat(responseChunks).toString();
// Check if this is a complete response
if (this.isCompleteResponse(responseData)) {
// Clean up listeners
this.socket.removeListener('data', onData);
this.socket.removeListener('error', onError);
this.socket.removeListener('close', onClose);
this.socket.removeListener('end', onEnd);
cleanupListeners();
logger.log('debug', `< ${responseData.trim()}`);
const trimmedResponse = responseData.trim();
logger.log('debug', `< ${trimmedResponse}`);
// Check if this is an error response
if (this.isErrorResponse(responseData)) {
const code = responseData.substring(0, 3);
reject(this.createErrorFromResponse(responseData, code));
reject(this.createErrorFromResponse(trimmedResponse, code));
} else {
resolve(responseData.trim());
resolve(trimmedResponse);
}
}
};
const onError = (err: Error) => {
// Clean up listeners
this.socket.removeListener('data', onData);
this.socket.removeListener('error', onError);
this.socket.removeListener('close', onClose);
this.socket.removeListener('end', onEnd);
cleanupListeners();
reject(new MtaConnectionError(
`Socket error while waiting for response: ${err.message}`,
@ -1042,12 +1269,9 @@ export class SmtpClient {
};
const onClose = () => {
// Clean up listeners
this.socket.removeListener('data', onData);
this.socket.removeListener('error', onError);
this.socket.removeListener('close', onClose);
this.socket.removeListener('end', onEnd);
cleanupListeners();
const responseData = Buffer.concat(responseChunks).toString();
reject(new MtaConnectionError(
'Connection closed while waiting for response',
{
@ -1059,12 +1283,9 @@ export class SmtpClient {
};
const onEnd = () => {
// Clean up listeners
this.socket.removeListener('data', onData);
this.socket.removeListener('error', onError);
this.socket.removeListener('close', onClose);
this.socket.removeListener('end', onEnd);
cleanupListeners();
const responseData = Buffer.concat(responseChunks).toString();
reject(new MtaConnectionError(
'Connection ended while waiting for response',
{

View File

@ -23,7 +23,11 @@ export class SMTPServer {
private smtpServerOptions: ISmtpServerOptions;
private server: plugins.net.Server;
private sessions: Map<plugins.net.Socket | plugins.tls.TLSSocket, ISmtpSession>;
private sessionTimeouts: Map<string, NodeJS.Timeout>;
private hostname: string;
private sessionIdCounter: number = 0;
private connectionCount: number = 0;
private maxConnections: number = 100; // Default max connections
constructor(emailServerRefArg: UnifiedEmailServer, optionsArg: ISmtpServerOptions) {
console.log('SMTPServer instance is being created...');
@ -31,21 +35,113 @@ export class SMTPServer {
this.emailServerRef = emailServerRefArg;
this.smtpServerOptions = optionsArg;
this.sessions = new Map();
this.sessionTimeouts = new Map();
this.hostname = optionsArg.hostname || 'mail.lossless.one';
this.maxConnections = optionsArg.maxSize || 100;
// Start session cleanup interval
setInterval(() => this.cleanupIdleSessions(), 30000); // Run every 30 seconds
this.server = plugins.net.createServer((socket) => {
// Check if we've exceeded maximum connections
if (this.connectionCount >= this.maxConnections) {
logger.log('warn', `Connection limit reached (${this.maxConnections}), rejecting new connection`);
socket.write('421 Too many connections, try again later\r\n');
socket.destroy();
return;
}
this.handleNewConnection(socket);
});
}
/**
* Clean up idle sessions
* @private
*/
private cleanupIdleSessions(): void {
const now = Date.now();
const sessionTimeout = this.smtpServerOptions.socketTimeout || 300000; // Default 5 minutes
// Check all sessions for timeout
for (const [socket, session] of this.sessions.entries()) {
if (!session.lastActivity) continue;
const idleTime = now - session.lastActivity;
if (idleTime > sessionTimeout) {
logger.log('info', `Session ${session.id} timed out after ${idleTime}ms of inactivity`);
try {
// Send timeout message and end connection
this.sendResponse(socket, '421 Timeout - closing connection');
socket.destroy();
} catch (error) {
logger.log('error', `Error closing timed out session: ${error.message}`);
}
// Clean up session
this.removeSession(socket);
}
}
}
/**
* Create a new session ID
* @private
*/
private generateSessionId(): string {
return `${Date.now()}-${++this.sessionIdCounter}`;
}
/**
* Properly remove a session and clean up resources
* @private
*/
private removeSession(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
const session = this.sessions.get(socket);
if (!session) return;
// Clear session timeout if exists
const timeoutId = this.sessionTimeouts.get(session.id);
if (timeoutId) {
clearTimeout(timeoutId);
this.sessionTimeouts.delete(session.id);
}
// Remove session from map
this.sessions.delete(socket);
// Decrement connection count
this.connectionCount--;
logger.log('debug', `Session ${session.id} removed, active connections: ${this.connectionCount}`);
}
/**
* Update last activity timestamp for a session
* @private
*/
private updateSessionActivity(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
const session = this.sessions.get(socket);
if (!session) return;
session.lastActivity = Date.now();
}
private async handleNewConnection(socket: plugins.net.Socket): Promise<void> {
const clientIp = socket.remoteAddress;
const clientPort = socket.remotePort;
console.log(`New connection from ${clientIp}:${clientPort}`);
// Increment connection count
this.connectionCount++;
// Generate unique session ID
const sessionId = this.generateSessionId();
// Initialize a new session
this.sessions.set(socket, {
id: `${socket.remoteAddress}:${socket.remotePort}`,
id: sessionId,
state: SmtpState.GREETING,
clientHostname: '',
mailFrom: '',
@ -56,6 +152,7 @@ export class SMTPServer {
remoteAddress: socket.remoteAddress || '',
secure: false,
authenticated: false,
lastActivity: Date.now(),
envelope: {
mailFrom: {
address: '',
@ -129,7 +226,29 @@ export class SMTPServer {
// Send greeting
this.sendResponse(socket, `220 ${this.hostname} ESMTP Service Ready`);
// Set session timeout
const sessionTimeout = setTimeout(() => {
logger.log('info', `Initial connection timeout for session ${sessionId}`);
this.sendResponse(socket, '421 Connection timeout');
socket.destroy();
this.removeSession(socket);
}, this.smtpServerOptions.connectionTimeout || 30000);
// Store timeout reference
this.sessionTimeouts.set(sessionId, sessionTimeout);
socket.on('data', (data) => {
// Clear initial connection timeout on first data
const timeoutId = this.sessionTimeouts.get(sessionId);
if (timeoutId) {
clearTimeout(timeoutId);
this.sessionTimeouts.delete(sessionId);
}
// Update last activity timestamp
this.updateSessionActivity(socket);
// Process the data
this.processData(socket, data);
});
@ -151,16 +270,21 @@ export class SMTPServer {
details: {
clientPort,
state: SmtpState[session.state],
from: session.mailFrom || 'not set'
from: session.mailFrom || 'not set',
sessionId: session.id
}
});
}
// Clean up session
this.removeSession(socket);
});
socket.on('error', (err) => {
const clientIp = socket.remoteAddress;
const clientPort = socket.remotePort;
console.error(`Socket error: ${err.message}`);
const session = this.sessions.get(socket);
console.error(`Socket error for session ${session?.id}: ${err.message}`);
// Log connection error as security event
SecurityLogger.getInstance().logEvent({
@ -172,18 +296,21 @@ export class SMTPServer {
clientPort,
error: err.message,
errorCode: (err as any).code,
from: this.sessions.get(socket)?.mailFrom || 'not set'
from: session?.mailFrom || 'not set',
sessionId: session?.id
}
});
this.sessions.delete(socket);
// Clean up session resources
this.removeSession(socket);
socket.destroy();
});
socket.on('close', () => {
const clientIp = socket.remoteAddress;
const clientPort = socket.remotePort;
console.log(`Connection closed from ${clientIp}:${clientPort}`);
const session = this.sessions.get(socket);
console.log(`Connection closed for session ${session?.id} from ${clientIp}:${clientPort}`);
// Log connection closure as security event
SecurityLogger.getInstance().logEvent({
@ -193,11 +320,13 @@ export class SMTPServer {
ipAddress: clientIp,
details: {
clientPort,
sessionEnded: this.sessions.get(socket)?.connectionEnded || false
sessionId: session?.id,
sessionEnded: session?.connectionEnded || false
}
});
this.sessions.delete(socket);
// Clean up session resources
this.removeSession(socket);
});
}
@ -239,6 +368,9 @@ export class SMTPServer {
private processCommand(socket: plugins.net.Socket | plugins.tls.TLSSocket, commandLine: string): void {
const session = this.sessions.get(socket);
if (!session || session.connectionEnded) return;
// Update session activity timestamp
this.updateSessionActivity(socket);
const [command, ...args] = commandLine.split(' ');
const upperCommand = command.toUpperCase();
@ -431,19 +563,32 @@ export class SMTPServer {
const session = this.sessions.get(socket);
if (!session) return;
// Initialize email data buffer if it doesn't exist
if (!session.emailDataChunks) {
session.emailDataChunks = [];
}
// Check for end of data marker
if (data.endsWith('\r\n.\r\n')) {
// Remove the end of data marker
const emailData = data.slice(0, -5);
session.emailData += emailData;
// Add final chunk
session.emailDataChunks.push(emailData);
// Join chunks efficiently
session.emailData = session.emailDataChunks.join('');
// Free memory
session.emailDataChunks = undefined;
session.state = SmtpState.FINISHED;
// Save and process the email
this.saveEmail(socket);
this.sendResponse(socket, '250 OK: Message accepted for delivery');
} else {
// Accumulate the data
session.emailData += data;
// Accumulate the data as chunks
session.emailDataChunks.push(data);
}
}
@ -801,6 +946,4 @@ export class SMTPServer {
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return emailRegex.test(email);
}
// These methods are defined elsewhere in the class, duplicates removed
}

View File

@ -96,6 +96,11 @@ export interface ISmtpSession {
*/
emailData: string;
/**
* Chunks of email data for more efficient buffer management
*/
emailDataChunks?: string[];
/**
* Whether the connection is using TLS
*/
@ -130,6 +135,11 @@ export interface ISmtpSession {
* Email processing mode to use for this session
*/
processingMode?: EmailProcessingMode;
/**
* Timestamp of last activity for session timeout tracking
*/
lastActivity?: number;
}
/**