386 lines
12 KiB
TypeScript
386 lines
12 KiB
TypeScript
|
/**
|
||
|
* SMTP Data Handler
|
||
|
* Responsible for processing email data during and after DATA command
|
||
|
*/
|
||
|
|
||
|
import * as plugins from '../../../plugins.js';
|
||
|
import * as fs from 'fs';
|
||
|
import * as path from 'path';
|
||
|
import { SmtpState, ISmtpSession, ISmtpTransactionResult } from '../interfaces.js';
|
||
|
import { IDataHandler, ISessionManager } from './interfaces.js';
|
||
|
import { SmtpResponseCode, SMTP_PATTERNS, SMTP_DEFAULTS } from './constants.js';
|
||
|
import { SmtpLogger } from './utils/logging.js';
|
||
|
import { Email } from '../../core/classes.email.js';
|
||
|
import { UnifiedEmailServer } from '../../routing/classes.unified.email.server.js';
|
||
|
|
||
|
/**
|
||
|
* Handles SMTP DATA command and email data processing
|
||
|
*/
|
||
|
export class DataHandler implements IDataHandler {
|
||
|
/**
|
||
|
* Session manager instance
|
||
|
*/
|
||
|
private sessionManager: ISessionManager;
|
||
|
|
||
|
/**
|
||
|
* Email server reference
|
||
|
*/
|
||
|
private emailServer: UnifiedEmailServer;
|
||
|
|
||
|
/**
|
||
|
* SMTP server options
|
||
|
*/
|
||
|
private options: {
|
||
|
size: number;
|
||
|
tempDir?: string;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Creates a new data handler
|
||
|
* @param sessionManager - Session manager instance
|
||
|
* @param emailServer - Email server reference
|
||
|
* @param options - Data handler options
|
||
|
*/
|
||
|
constructor(
|
||
|
sessionManager: ISessionManager,
|
||
|
emailServer: UnifiedEmailServer,
|
||
|
options: {
|
||
|
size?: number;
|
||
|
tempDir?: string;
|
||
|
} = {}
|
||
|
) {
|
||
|
this.sessionManager = sessionManager;
|
||
|
this.emailServer = emailServer;
|
||
|
|
||
|
this.options = {
|
||
|
size: options.size || SMTP_DEFAULTS.MAX_MESSAGE_SIZE,
|
||
|
tempDir: options.tempDir
|
||
|
};
|
||
|
|
||
|
// Create temp directory if specified and doesn't exist
|
||
|
if (this.options.tempDir) {
|
||
|
try {
|
||
|
if (!fs.existsSync(this.options.tempDir)) {
|
||
|
fs.mkdirSync(this.options.tempDir, { recursive: true });
|
||
|
}
|
||
|
} catch (error) {
|
||
|
SmtpLogger.error(`Failed to create temp directory: ${error instanceof Error ? error.message : String(error)}`, {
|
||
|
error: error instanceof Error ? error : new Error(String(error)),
|
||
|
tempDir: this.options.tempDir
|
||
|
});
|
||
|
this.options.tempDir = undefined;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Process incoming email data
|
||
|
* @param socket - Client socket
|
||
|
* @param data - Data chunk
|
||
|
* @returns Promise that resolves when the data is processed
|
||
|
*/
|
||
|
public async processEmailData(socket: plugins.net.Socket | plugins.tls.TLSSocket, data: string): Promise<void> {
|
||
|
// Get the session for this socket
|
||
|
const session = this.sessionManager.getSession(socket);
|
||
|
if (!session) {
|
||
|
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Internal server error - session not found`);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Clear any existing timeout and set a new one
|
||
|
if (session.dataTimeoutId) {
|
||
|
clearTimeout(session.dataTimeoutId);
|
||
|
}
|
||
|
|
||
|
session.dataTimeoutId = setTimeout(() => {
|
||
|
if (session.state === SmtpState.DATA_RECEIVING) {
|
||
|
SmtpLogger.warn(`DATA timeout for session ${session.id}`, { sessionId: session.id });
|
||
|
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Data timeout`);
|
||
|
this.resetSession(session);
|
||
|
}
|
||
|
}, SMTP_DEFAULTS.DATA_TIMEOUT);
|
||
|
|
||
|
// Update activity timestamp
|
||
|
this.sessionManager.updateSessionActivity(session);
|
||
|
|
||
|
// Store data in chunks for better memory efficiency
|
||
|
if (!session.emailDataChunks) {
|
||
|
session.emailDataChunks = [];
|
||
|
}
|
||
|
|
||
|
session.emailDataChunks.push(data);
|
||
|
|
||
|
// Check if we've reached the max size
|
||
|
let totalSize = 0;
|
||
|
for (const chunk of session.emailDataChunks) {
|
||
|
totalSize += chunk.length;
|
||
|
}
|
||
|
|
||
|
if (totalSize > this.options.size) {
|
||
|
SmtpLogger.warn(`Message size exceeds limit for session ${session.id}`, {
|
||
|
sessionId: session.id,
|
||
|
size: totalSize,
|
||
|
limit: this.options.size
|
||
|
});
|
||
|
|
||
|
this.sendResponse(socket, `${SmtpResponseCode.EXCEEDED_STORAGE} Message too big, size limit is ${this.options.size} bytes`);
|
||
|
this.resetSession(session);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Check for end of data marker
|
||
|
const lastChunk = session.emailDataChunks[session.emailDataChunks.length - 1] || '';
|
||
|
if (SMTP_PATTERNS.END_DATA.test(lastChunk)) {
|
||
|
// End of data marker found
|
||
|
await this.handleEndOfData(socket, session);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Process a complete email
|
||
|
* @param session - SMTP session
|
||
|
* @returns Promise that resolves with the result of the transaction
|
||
|
*/
|
||
|
public async processEmail(session: ISmtpSession): Promise<ISmtpTransactionResult> {
|
||
|
// Combine all chunks and remove end of data marker
|
||
|
session.emailData = (session.emailDataChunks || []).join('');
|
||
|
|
||
|
// Remove trailing end-of-data marker: \r\n.\r\n
|
||
|
session.emailData = session.emailData.replace(/\r\n\.\r\n$/, '');
|
||
|
|
||
|
// Remove dot-stuffing (RFC 5321, section 4.5.2)
|
||
|
session.emailData = session.emailData.replace(/\r\n\.\./g, '\r\n.');
|
||
|
|
||
|
try {
|
||
|
// Parse email into Email object
|
||
|
const email = await this.parseEmail(session);
|
||
|
|
||
|
// Process the email based on the processing mode
|
||
|
const processingMode = session.processingMode || 'mta';
|
||
|
|
||
|
let result: ISmtpTransactionResult = {
|
||
|
success: false,
|
||
|
error: 'Email processing failed'
|
||
|
};
|
||
|
|
||
|
switch (processingMode) {
|
||
|
case 'mta':
|
||
|
// Process through the MTA system
|
||
|
try {
|
||
|
SmtpLogger.debug(`Processing email in MTA mode for session ${session.id}`, {
|
||
|
sessionId: session.id,
|
||
|
messageId: email.getMessageId()
|
||
|
});
|
||
|
|
||
|
// Queue the email for further processing by the email server
|
||
|
const messageId = await this.emailServer.queueEmail(email);
|
||
|
|
||
|
result = {
|
||
|
success: true,
|
||
|
messageId,
|
||
|
email
|
||
|
};
|
||
|
} catch (error) {
|
||
|
SmtpLogger.error(`Failed to queue email: ${error instanceof Error ? error.message : String(error)}`, {
|
||
|
sessionId: session.id,
|
||
|
error: error instanceof Error ? error : new Error(String(error))
|
||
|
});
|
||
|
|
||
|
result = {
|
||
|
success: false,
|
||
|
error: `Failed to queue email: ${error instanceof Error ? error.message : String(error)}`
|
||
|
};
|
||
|
}
|
||
|
break;
|
||
|
|
||
|
case 'forward':
|
||
|
// Forward email to another server
|
||
|
SmtpLogger.debug(`Processing email in FORWARD mode for session ${session.id}`, {
|
||
|
sessionId: session.id,
|
||
|
messageId: email.getMessageId()
|
||
|
});
|
||
|
|
||
|
// Forward logic would be implemented here
|
||
|
result = {
|
||
|
success: true,
|
||
|
messageId: email.getMessageId(),
|
||
|
email
|
||
|
};
|
||
|
break;
|
||
|
|
||
|
case 'process':
|
||
|
// Process the email immediately
|
||
|
SmtpLogger.debug(`Processing email in PROCESS mode for session ${session.id}`, {
|
||
|
sessionId: session.id,
|
||
|
messageId: email.getMessageId()
|
||
|
});
|
||
|
|
||
|
// Direct processing logic would be implemented here
|
||
|
result = {
|
||
|
success: true,
|
||
|
messageId: email.getMessageId(),
|
||
|
email
|
||
|
};
|
||
|
break;
|
||
|
|
||
|
default:
|
||
|
SmtpLogger.warn(`Unknown processing mode: ${processingMode}`, { sessionId: session.id });
|
||
|
result = {
|
||
|
success: false,
|
||
|
error: `Unknown processing mode: ${processingMode}`
|
||
|
};
|
||
|
}
|
||
|
|
||
|
return result;
|
||
|
} catch (error) {
|
||
|
SmtpLogger.error(`Failed to parse email: ${error instanceof Error ? error.message : String(error)}`, {
|
||
|
sessionId: session.id,
|
||
|
error: error instanceof Error ? error : new Error(String(error))
|
||
|
});
|
||
|
|
||
|
return {
|
||
|
success: false,
|
||
|
error: `Failed to parse email: ${error instanceof Error ? error.message : String(error)}`
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Save an email to disk
|
||
|
* @param session - SMTP session
|
||
|
*/
|
||
|
public saveEmail(session: ISmtpSession): void {
|
||
|
if (!this.options.tempDir) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
const timestamp = Date.now();
|
||
|
const filename = `${session.id}-${timestamp}.eml`;
|
||
|
const filePath = path.join(this.options.tempDir, filename);
|
||
|
|
||
|
fs.writeFileSync(filePath, session.emailData);
|
||
|
|
||
|
SmtpLogger.debug(`Saved email to disk: ${filePath}`, {
|
||
|
sessionId: session.id,
|
||
|
filePath
|
||
|
});
|
||
|
} catch (error) {
|
||
|
SmtpLogger.error(`Failed to save email to disk: ${error instanceof Error ? error.message : String(error)}`, {
|
||
|
sessionId: session.id,
|
||
|
error: error instanceof Error ? error : new Error(String(error))
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Parse an email into an Email object
|
||
|
* @param session - SMTP session
|
||
|
* @returns Promise that resolves with the parsed Email object
|
||
|
*/
|
||
|
public async parseEmail(session: ISmtpSession): Promise<Email> {
|
||
|
// Create a new Email object
|
||
|
const email = new Email();
|
||
|
|
||
|
// Set envelope information from SMTP session
|
||
|
email.setFrom(session.envelope.mailFrom.address);
|
||
|
|
||
|
for (const recipient of session.envelope.rcptTo) {
|
||
|
email.addTo(recipient.address);
|
||
|
}
|
||
|
|
||
|
// Parse the raw email data
|
||
|
await email.parseFromRaw(session.emailData);
|
||
|
|
||
|
return email;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Handle end of data marker received
|
||
|
* @param socket - Client socket
|
||
|
* @param session - SMTP session
|
||
|
*/
|
||
|
private async handleEndOfData(socket: plugins.net.Socket | plugins.tls.TLSSocket, session: ISmtpSession): Promise<void> {
|
||
|
// Clear the data timeout
|
||
|
if (session.dataTimeoutId) {
|
||
|
clearTimeout(session.dataTimeoutId);
|
||
|
session.dataTimeoutId = undefined;
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
// Update session state
|
||
|
this.sessionManager.updateSessionState(session, SmtpState.FINISHED);
|
||
|
|
||
|
// Optionally save email to disk
|
||
|
this.saveEmail(session);
|
||
|
|
||
|
// Process the email
|
||
|
const result = await this.processEmail(session);
|
||
|
|
||
|
if (result.success) {
|
||
|
// Send success response
|
||
|
this.sendResponse(socket, `${SmtpResponseCode.OK} OK message queued as ${result.messageId}`);
|
||
|
} else {
|
||
|
// Send error response
|
||
|
this.sendResponse(socket, `${SmtpResponseCode.TRANSACTION_FAILED} Failed to process email: ${result.error}`);
|
||
|
}
|
||
|
|
||
|
// Reset session for new transaction
|
||
|
this.resetSession(session);
|
||
|
} catch (error) {
|
||
|
SmtpLogger.error(`Error processing email: ${error instanceof Error ? error.message : String(error)}`, {
|
||
|
sessionId: session.id,
|
||
|
error: error instanceof Error ? error : new Error(String(error))
|
||
|
});
|
||
|
|
||
|
this.sendResponse(socket, `${SmtpResponseCode.LOCAL_ERROR} Error processing email: ${error instanceof Error ? error.message : String(error)}`);
|
||
|
this.resetSession(session);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Reset session after email processing
|
||
|
* @param session - SMTP session
|
||
|
*/
|
||
|
private resetSession(session: ISmtpSession): void {
|
||
|
// Clear any data timeout
|
||
|
if (session.dataTimeoutId) {
|
||
|
clearTimeout(session.dataTimeoutId);
|
||
|
session.dataTimeoutId = undefined;
|
||
|
}
|
||
|
|
||
|
// Reset data fields but keep authentication state
|
||
|
session.mailFrom = '';
|
||
|
session.rcptTo = [];
|
||
|
session.emailData = '';
|
||
|
session.emailDataChunks = [];
|
||
|
session.envelope = {
|
||
|
mailFrom: { address: '', args: {} },
|
||
|
rcptTo: []
|
||
|
};
|
||
|
|
||
|
// Reset state to after EHLO
|
||
|
this.sessionManager.updateSessionState(session, SmtpState.AFTER_EHLO);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Send a response to the client
|
||
|
* @param socket - Client socket
|
||
|
* @param response - Response message
|
||
|
*/
|
||
|
private sendResponse(socket: plugins.net.Socket | plugins.tls.TLSSocket, response: string): void {
|
||
|
try {
|
||
|
socket.write(`${response}${SMTP_DEFAULTS.CRLF}`);
|
||
|
SmtpLogger.logResponse(response, socket);
|
||
|
} catch (error) {
|
||
|
SmtpLogger.error(`Error sending response: ${error instanceof Error ? error.message : String(error)}`, {
|
||
|
response,
|
||
|
remoteAddress: socket.remoteAddress,
|
||
|
remotePort: socket.remotePort,
|
||
|
error: error instanceof Error ? error : new Error(String(error))
|
||
|
});
|
||
|
|
||
|
socket.destroy();
|
||
|
}
|
||
|
}
|
||
|
}
|