This commit is contained in:
Philipp Kunz 2025-05-23 01:00:37 +00:00
parent 4905595cbb
commit 7344bf0f70
5 changed files with 119 additions and 33 deletions

View File

@ -121,7 +121,7 @@ export class CommandHandler implements ICommandHandler {
// Process each command separately (recursively call processCommand)
for (const cmd of commands) {
this.processCommand(socket, cmd);
await this.processCommand(socket, cmd);
}
return;
}

View File

@ -53,6 +53,11 @@ export class ConnectionManager implements IConnectionManager {
*/
private resourceCheckInterval: NodeJS.Timeout | null = null;
/**
* Track cleanup timers so we can clear them
*/
private cleanupTimers: Set<NodeJS.Timeout> = new Set();
/**
* SMTP server options with enhanced resource controls
*/
@ -531,7 +536,7 @@ export class ConnectionManager implements IConnectionManager {
let buffer = '';
let totalBytesReceived = 0;
socket.on('data', (data) => {
socket.on('data', async (data) => {
try {
// Get current session and update activity timestamp
const session = this.smtpServer.getSessionManager().getSession(socket);
@ -546,7 +551,8 @@ export class ConnectionManager implements IConnectionManager {
try {
const dataString = data.toString('utf8');
// Use a special prefix to indicate this is raw data, not a command line
this.smtpServer.getCommandHandler().processCommand(socket, `__RAW_DATA__${dataString}`);
// CRITICAL FIX: Must await to prevent async pile-up
await this.smtpServer.getCommandHandler().processCommand(socket, `__RAW_DATA__${dataString}`);
return;
} catch (dataError) {
SmtpLogger.error(`Data handler error during DATA mode: ${dataError instanceof Error ? dataError.message : String(dataError)}`);
@ -599,15 +605,17 @@ export class ConnectionManager implements IConnectionManager {
// Process non-empty lines
if (line.length > 0) {
try {
// In DATA state, the command handler will process the data differently
this.smtpServer.getCommandHandler().processCommand(socket, line);
} catch (cmdError) {
// CRITICAL FIX: Must await processCommand to prevent async pile-up
// This was causing the busy loop with high CPU usage when many empty lines were processed
await this.smtpServer.getCommandHandler().processCommand(socket, line);
} catch (error) {
// Handle any errors in command processing
SmtpLogger.error(`Command handler error: ${cmdError instanceof Error ? cmdError.message : String(cmdError)}`);
SmtpLogger.error(`Command handler error: ${error instanceof Error ? error.message : String(error)}`);
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Internal server error`);
// If there's a severe error, close the connection
if (cmdError instanceof Error &&
(cmdError.message.includes('fatal') || cmdError.message.includes('critical'))) {
if (error instanceof Error &&
(error.message.includes('fatal') || error.message.includes('critical'))) {
socket.destroy();
return;
}
@ -685,10 +693,25 @@ export class ConnectionManager implements IConnectionManager {
// Send service closing notification
this.sendServiceClosing(socket);
// End the socket
// End the socket gracefully
socket.end();
// Force destroy after a short delay if not already destroyed
const destroyTimer = setTimeout(() => {
if (!socket.destroyed) {
socket.destroy();
}
this.cleanupTimers.delete(destroyTimer);
}, 100);
this.cleanupTimers.add(destroyTimer);
} catch (error) {
SmtpLogger.error(`Error closing connection: ${error instanceof Error ? error.message : String(error)}`);
// Force destroy on error
try {
socket.destroy();
} catch (e) {
// Ignore destroy errors
}
}
}
@ -858,12 +881,14 @@ export class ConnectionManager implements IConnectionManager {
socket.end();
// Set a forced close timeout in case socket.end() doesn't close the connection
setTimeout(() => {
const timeoutDestroyTimer = setTimeout(() => {
if (!socket.destroyed) {
SmtpLogger.warn(`Forcing destroy of timed out socket: ${socketId}`);
socket.destroy();
}
this.cleanupTimers.delete(timeoutDestroyTimer);
}, 5000); // 5 second grace period for socket to end properly
this.cleanupTimers.add(timeoutDestroyTimer);
} catch (error) {
SmtpLogger.error(`Error ending timed out socket: ${error instanceof Error ? error.message : String(error)}`);
@ -989,6 +1014,12 @@ export class ConnectionManager implements IConnectionManager {
this.resourceCheckInterval = null;
}
// Clear all cleanup timers
for (const timer of this.cleanupTimers) {
clearTimeout(timer);
}
this.cleanupTimers.clear();
// Close all active connections
this.closeAllConnections();

View File

@ -221,8 +221,12 @@ export class DataHandler implements IDataHandler {
});
// Create a minimal email object on error
const fallbackEmail = new Email();
fallbackEmail.setFromRawData(cleanedData);
const fallbackEmail = new Email({
from: 'unknown@localhost',
to: 'unknown@localhost',
subject: 'Parse Error',
text: cleanedData
});
return fallbackEmail;
}
}
@ -234,22 +238,51 @@ export class DataHandler implements IDataHandler {
* @returns Email object
*/
private async parseEmailFromData(rawData: string, session: ISmtpSession): Promise<Email> {
const email = new Email();
// Parse the raw email data to extract headers and body
const lines = rawData.split('\r\n');
let headerEnd = -1;
// Set raw data
email.setFromRawData(rawData);
// Set envelope information from session
if (session.mailFrom) {
email.setFrom(session.mailFrom);
}
if (session.rcptTo && session.rcptTo.length > 0) {
for (const recipient of session.rcptTo) {
email.addTo(recipient);
// Find where headers end
for (let i = 0; i < lines.length; i++) {
if (lines[i].trim() === '') {
headerEnd = i;
break;
}
}
// Extract headers
let subject = 'No Subject';
const headers: Record<string, string> = {};
if (headerEnd > -1) {
for (let i = 0; i < headerEnd; i++) {
const line = lines[i];
const colonIndex = line.indexOf(':');
if (colonIndex > 0) {
const headerName = line.substring(0, colonIndex).trim().toLowerCase();
const headerValue = line.substring(colonIndex + 1).trim();
if (headerName === 'subject') {
subject = headerValue;
} else {
headers[headerName] = headerValue;
}
}
}
}
// Extract body
const body = headerEnd > -1 ? lines.slice(headerEnd + 1).join('\r\n') : rawData;
// Create email with session information
const email = new Email({
from: session.mailFrom || 'unknown@localhost',
to: session.rcptTo || ['unknown@localhost'],
subject,
text: body,
headers
});
return email;
}
@ -290,7 +323,7 @@ export class DataHandler implements IDataHandler {
// Process the email via the UnifiedEmailServer
// Pass the email object, session data, and specify the mode (mta, forward, or process)
// This connects SMTP reception to the overall email system
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session, 'mta');
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any, 'mta');
SmtpLogger.info(`Email processed through UnifiedEmailServer: ${email.getMessageId()}`, {
sessionId: session.id,
@ -340,7 +373,7 @@ export class DataHandler implements IDataHandler {
// Process the email via the UnifiedEmailServer in forward mode
try {
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session, 'forward');
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any, 'forward');
SmtpLogger.info(`Email forwarded through UnifiedEmailServer: ${email.getMessageId()}`, {
sessionId: session.id,
@ -379,7 +412,7 @@ export class DataHandler implements IDataHandler {
// Process the email via the UnifiedEmailServer in process mode
try {
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session, 'process');
const processResult = await this.smtpServer.getEmailServer().processEmailByMode(email, session as any, 'process');
SmtpLogger.info(`Email processed directly through UnifiedEmailServer: ${email.getMessageId()}`, {
sessionId: session.id,
@ -1057,8 +1090,8 @@ SmtpLogger.debug(`Parsed email subject: ${subject}`, { subject });
// Optionally save email to disk
this.saveEmail(session);
// Process the email
const result = await this.processEmail(session);
// Process the email using legacy method
const result = await this.processEmailLegacy(session);
if (result.success) {
// Send success response

View File

@ -370,7 +370,16 @@ export class SmtpServer implements ISmtpServer {
);
}
await Promise.all(closePromises);
// Add timeout to prevent hanging on close
await Promise.race([
Promise.all(closePromises),
new Promise<void>((resolve) => {
setTimeout(() => {
SmtpLogger.warn('Server close timed out after 3 seconds, forcing shutdown');
resolve();
}, 3000);
})
]);
this.server = null;
this.secureServer = null;
@ -774,12 +783,20 @@ export class SmtpServer implements ISmtpServer {
await Promise.all(destroyPromises);
// Destroy the adaptive logger singleton to clean up its timer
const { adaptiveLogger } = await import('./utils/adaptive-logging.js');
if (adaptiveLogger && typeof adaptiveLogger.destroy === 'function') {
adaptiveLogger.destroy();
}
// Clear recovery state
this.recoveryState = {
recovering: false,
currentRecoveryAttempt: 0,
connectionFailures: 0,
lastRecoveryAttempt: 0,
recoveryCooldown: 5000,
maxRecoveryAttempts: 3,
lastError: null
currentRecoveryAttempt: 0
};
SmtpLogger.info('All SMTP server components destroyed');

View File

@ -449,6 +449,11 @@ export class AdaptiveSmtpLogger {
this.aggregationTimer = setInterval(() => {
this.flushAggregatedEntries();
}, this.config.aggregationInterval);
// Unref the timer so it doesn't keep the process alive
if (this.aggregationTimer && typeof this.aggregationTimer.unref === 'function') {
this.aggregationTimer.unref();
}
}
/**