dcrouter/ts/mail/delivery/smtpserver/data-handler.ts
2025-05-21 13:42:12 +00:00

391 lines
12 KiB
TypeScript

/**
* SMTP Data Handler
* Responsible for processing email data during and after DATA command
*/
import * as plugins from '../../../plugins.js';
import * as fs from 'fs';
import * as path from 'path';
import { SmtpState } from './interfaces.js';
import type { ISmtpSession, ISmtpTransactionResult } from './interfaces.js';
import type { IDataHandler, ISessionManager } from './interfaces.js';
import { SmtpResponseCode, SMTP_PATTERNS, SMTP_DEFAULTS } from './constants.js';
import { SmtpLogger } from './utils/logging.js';
import { Email } from '../../core/classes.email.js';
import { UnifiedEmailServer } from '../../routing/classes.unified.email.server.js';
/**
* Handles SMTP DATA command and email data processing
*/
export class DataHandler implements IDataHandler {
/**
* Session manager instance
*/
private sessionManager: ISessionManager;
/**
* Email server reference
*/
private emailServer: UnifiedEmailServer;
/**
* SMTP server options
*/
private options: {
size: number;
tempDir?: string;
hostname?: string;
};
/**
* Creates a new data handler
* @param sessionManager - Session manager instance
* @param emailServer - Email server reference
* @param options - Data handler options
*/
constructor(
sessionManager: ISessionManager,
emailServer: UnifiedEmailServer,
options: {
size?: number;
tempDir?: string;
hostname?: string;
} = {}
) {
this.sessionManager = sessionManager;
this.emailServer = emailServer;
this.options = {
size: options.size || SMTP_DEFAULTS.MAX_MESSAGE_SIZE,
tempDir: options.tempDir,
hostname: options.hostname || SMTP_DEFAULTS.HOSTNAME
};
// Create temp directory if specified and doesn't exist
if (this.options.tempDir) {
try {
if (!fs.existsSync(this.options.tempDir)) {
fs.mkdirSync(this.options.tempDir, { recursive: true });
}
} catch (error) {
SmtpLogger.error(`Failed to create temp directory: ${error instanceof Error ? error.message : String(error)}`, {
error: error instanceof Error ? error : new Error(String(error)),
tempDir: this.options.tempDir
});
this.options.tempDir = undefined;
}
}
}
/**
* Process incoming email data
* @param socket - Client socket
* @param data - Data chunk
* @returns Promise that resolves when the data is processed
*/
public async processEmailData(socket: plugins.net.Socket | plugins.tls.TLSSocket, data: string): Promise<void> {
// Get the session for this socket
const session = this.sessionManager.getSession(socket);
if (!session) {
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Internal server error - session not found`);
return;
}
// Clear any existing timeout and set a new one
if (session.dataTimeoutId) {
clearTimeout(session.dataTimeoutId);
}
session.dataTimeoutId = setTimeout(() => {
if (session.state === SmtpState.DATA_RECEIVING) {
SmtpLogger.warn(`DATA timeout for session ${session.id}`, { sessionId: session.id });
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Data timeout`);
this.resetSession(session);
}
}, SMTP_DEFAULTS.DATA_TIMEOUT);
// Update activity timestamp
this.sessionManager.updateSessionActivity(session);
// Store data in chunks for better memory efficiency
if (!session.emailDataChunks) {
session.emailDataChunks = [];
}
session.emailDataChunks.push(data);
// Check if we've reached the max size
let totalSize = 0;
for (const chunk of session.emailDataChunks) {
totalSize += chunk.length;
}
if (totalSize > this.options.size) {
SmtpLogger.warn(`Message size exceeds limit for session ${session.id}`, {
sessionId: session.id,
size: totalSize,
limit: this.options.size
});
this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Message too big, size limit is ${this.options.size} bytes`);
this.resetSession(session);
return;
}
// Check for end of data marker
const lastChunk = session.emailDataChunks[session.emailDataChunks.length - 1] || '';
if (SMTP_PATTERNS.END_DATA.test(lastChunk)) {
// End of data marker found
await this.handleEndOfData(socket, session);
}
}
/**
* Process a complete email
* @param session - SMTP session
* @returns Promise that resolves with the result of the transaction
*/
public async processEmail(session: ISmtpSession): Promise<ISmtpTransactionResult> {
// Combine all chunks and remove end of data marker
session.emailData = (session.emailDataChunks || []).join('');
// Remove trailing end-of-data marker: \r\n.\r\n
session.emailData = session.emailData.replace(/\r\n\.\r\n$/, '');
// Remove dot-stuffing (RFC 5321, section 4.5.2)
session.emailData = session.emailData.replace(/\r\n\.\./g, '\r\n.');
try {
// Parse email into Email object
const email = await this.parseEmail(session);
// Process the email based on the processing mode
const processingMode = session.processingMode || 'mta';
let result: ISmtpTransactionResult = {
success: false,
error: 'Email processing failed'
};
switch (processingMode) {
case 'mta':
// Process through the MTA system
try {
SmtpLogger.debug(`Processing email in MTA mode for session ${session.id}`, {
sessionId: session.id,
messageId: email.getMessageId()
});
// Generate a message ID since queueEmail is not available
const messageId = `${Date.now()}-${Math.floor(Math.random() * 1000000)}@${this.options.hostname || 'mail.example.com'}`;
// In a full implementation, the email would be queued to the delivery system
// await this.emailServer.queueEmail(email);
result = {
success: true,
messageId,
email
};
} catch (error) {
SmtpLogger.error(`Failed to queue email: ${error instanceof Error ? error.message : String(error)}`, {
sessionId: session.id,
error: error instanceof Error ? error : new Error(String(error))
});
result = {
success: false,
error: `Failed to queue email: ${error instanceof Error ? error.message : String(error)}`
};
}
break;
case 'forward':
// Forward email to another server
SmtpLogger.debug(`Processing email in FORWARD mode for session ${session.id}`, {
sessionId: session.id,
messageId: email.getMessageId()
});
// Forward logic would be implemented here
result = {
success: true,
messageId: email.getMessageId(),
email
};
break;
case 'process':
// Process the email immediately
SmtpLogger.debug(`Processing email in PROCESS mode for session ${session.id}`, {
sessionId: session.id,
messageId: email.getMessageId()
});
// Direct processing logic would be implemented here
result = {
success: true,
messageId: email.getMessageId(),
email
};
break;
default:
SmtpLogger.warn(`Unknown processing mode: ${processingMode}`, { sessionId: session.id });
result = {
success: false,
error: `Unknown processing mode: ${processingMode}`
};
}
return result;
} catch (error) {
SmtpLogger.error(`Failed to parse email: ${error instanceof Error ? error.message : String(error)}`, {
sessionId: session.id,
error: error instanceof Error ? error : new Error(String(error))
});
return {
success: false,
error: `Failed to parse email: ${error instanceof Error ? error.message : String(error)}`
};
}
}
/**
* Save an email to disk
* @param session - SMTP session
*/
public saveEmail(session: ISmtpSession): void {
if (!this.options.tempDir) {
return;
}
try {
const timestamp = Date.now();
const filename = `${session.id}-${timestamp}.eml`;
const filePath = path.join(this.options.tempDir, filename);
fs.writeFileSync(filePath, session.emailData);
SmtpLogger.debug(`Saved email to disk: ${filePath}`, {
sessionId: session.id,
filePath
});
} catch (error) {
SmtpLogger.error(`Failed to save email to disk: ${error instanceof Error ? error.message : String(error)}`, {
sessionId: session.id,
error: error instanceof Error ? error : new Error(String(error))
});
}
}
/**
* Parse an email into an Email object
* @param session - SMTP session
* @returns Promise that resolves with the parsed Email object
*/
public async parseEmail(session: ISmtpSession): Promise<Email> {
// Create an email with minimal required options
const email = new Email({
from: session.envelope.mailFrom.address,
to: session.envelope.rcptTo.map(r => r.address),
subject: 'Received via SMTP',
text: session.emailData
});
// Note: In a real implementation, we would parse the raw email data
// to extract headers, content, etc., but that's beyond the scope of this refactoring
return email;
}
/**
* Handle end of data marker received
* @param socket - Client socket
* @param session - SMTP session
*/
private async handleEndOfData(socket: plugins.net.Socket | plugins.tls.TLSSocket, session: ISmtpSession): Promise<void> {
// Clear the data timeout
if (session.dataTimeoutId) {
clearTimeout(session.dataTimeoutId);
session.dataTimeoutId = undefined;
}
try {
// Update session state
this.sessionManager.updateSessionState(session, SmtpState.FINISHED);
// Optionally save email to disk
this.saveEmail(session);
// Process the email
const result = await this.processEmail(session);
if (result.success) {
// Send success response
this.sendResponse(socket, `${SmtpResponseCode.OK} OK message queued as ${result.messageId}`);
} else {
// Send error response
this.sendResponse(socket, `${SmtpResponseCode.TRANSACTION_FAILED} Failed to process email: ${result.error}`);
}
// Reset session for new transaction
this.resetSession(session);
} catch (error) {
SmtpLogger.error(`Error processing email: ${error instanceof Error ? error.message : String(error)}`, {
sessionId: session.id,
error: error instanceof Error ? error : new Error(String(error))
});
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Error processing email: ${error instanceof Error ? error.message : String(error)}`);
this.resetSession(session);
}
}
/**
* Reset session after email processing
* @param session - SMTP session
*/
private resetSession(session: ISmtpSession): void {
// Clear any data timeout
if (session.dataTimeoutId) {
clearTimeout(session.dataTimeoutId);
session.dataTimeoutId = undefined;
}
// Reset data fields but keep authentication state
session.mailFrom = '';
session.rcptTo = [];
session.emailData = '';
session.emailDataChunks = [];
session.envelope = {
mailFrom: { address: '', args: {} },
rcptTo: []
};
// Reset state to after EHLO
this.sessionManager.updateSessionState(session, SmtpState.AFTER_EHLO);
}
/**
* Send a response to the client
* @param socket - Client socket
* @param response - Response message
*/
private sendResponse(socket: plugins.net.Socket | plugins.tls.TLSSocket, response: string): void {
try {
socket.write(`${response}${SMTP_DEFAULTS.CRLF}`);
SmtpLogger.logResponse(response, socket);
} catch (error) {
SmtpLogger.error(`Error sending response: ${error instanceof Error ? error.message : String(error)}`, {
response,
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort,
error: error instanceof Error ? error : new Error(String(error))
});
socket.destroy();
}
}
}