2025-05-21 12:52:24 +00:00
/ * *
* SMTP Data Handler
* Responsible for processing email data during and after DATA command
* /
import * as plugins from '../../../plugins.js' ;
import * as fs from 'fs' ;
import * as path from 'path' ;
2025-05-21 13:42:12 +00:00
import { SmtpState } from './interfaces.js' ;
import type { ISmtpSession , ISmtpTransactionResult } from './interfaces.js' ;
import type { IDataHandler , ISessionManager } from './interfaces.js' ;
2025-05-21 12:52:24 +00:00
import { SmtpResponseCode , SMTP_PATTERNS , SMTP_DEFAULTS } from './constants.js' ;
import { SmtpLogger } from './utils/logging.js' ;
import { Email } from '../../core/classes.email.js' ;
import { UnifiedEmailServer } from '../../routing/classes.unified.email.server.js' ;
/ * *
* Handles SMTP DATA command and email data processing
* /
export class DataHandler implements IDataHandler {
/ * *
* Session manager instance
* /
private sessionManager : ISessionManager ;
/ * *
* Email server reference
* /
private emailServer : UnifiedEmailServer ;
/ * *
* SMTP server options
* /
private options : {
size : number ;
tempDir? : string ;
2025-05-21 13:42:12 +00:00
hostname? : string ;
2025-05-21 12:52:24 +00:00
} ;
/ * *
* Creates a new data handler
* @param sessionManager - Session manager instance
* @param emailServer - Email server reference
* @param options - Data handler options
* /
constructor (
sessionManager : ISessionManager ,
emailServer : UnifiedEmailServer ,
options : {
size? : number ;
tempDir? : string ;
2025-05-21 13:42:12 +00:00
hostname? : string ;
2025-05-21 12:52:24 +00:00
} = { }
) {
this . sessionManager = sessionManager ;
this . emailServer = emailServer ;
this . options = {
size : options.size || SMTP_DEFAULTS . MAX_MESSAGE_SIZE ,
2025-05-21 13:42:12 +00:00
tempDir : options.tempDir ,
hostname : options.hostname || SMTP_DEFAULTS . HOSTNAME
2025-05-21 12:52:24 +00:00
} ;
// Create temp directory if specified and doesn't exist
if ( this . options . tempDir ) {
try {
if ( ! fs . existsSync ( this . options . tempDir ) ) {
fs . mkdirSync ( this . options . tempDir , { recursive : true } ) ;
}
} catch ( error ) {
SmtpLogger . error ( ` Failed to create temp directory: ${ error instanceof Error ? error.message : String ( error ) } ` , {
error : error instanceof Error ? error : new Error ( String ( error ) ) ,
tempDir : this.options.tempDir
} ) ;
this . options . tempDir = undefined ;
}
}
}
/ * *
* Process incoming email data
* @param socket - Client socket
* @param data - Data chunk
* @returns Promise that resolves when the data is processed
* /
public async processEmailData ( socket : plugins.net.Socket | plugins . tls . TLSSocket , data : string ) : Promise < void > {
// Get the session for this socket
const session = this . sessionManager . getSession ( socket ) ;
if ( ! session ) {
this . sendResponse ( socket , ` ${ SmtpResponseCode . LOCAL_ERROR } Internal server error - session not found ` ) ;
return ;
}
// Clear any existing timeout and set a new one
if ( session . dataTimeoutId ) {
clearTimeout ( session . dataTimeoutId ) ;
}
session . dataTimeoutId = setTimeout ( ( ) = > {
if ( session . state === SmtpState . DATA_RECEIVING ) {
SmtpLogger . warn ( ` DATA timeout for session ${ session . id } ` , { sessionId : session.id } ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . LOCAL_ERROR } Data timeout ` ) ;
this . resetSession ( session ) ;
}
} , SMTP_DEFAULTS . DATA_TIMEOUT ) ;
// Update activity timestamp
this . sessionManager . updateSessionActivity ( session ) ;
// Store data in chunks for better memory efficiency
if ( ! session . emailDataChunks ) {
session . emailDataChunks = [ ] ;
}
session . emailDataChunks . push ( data ) ;
// Check if we've reached the max size
let totalSize = 0 ;
for ( const chunk of session . emailDataChunks ) {
totalSize += chunk . length ;
}
if ( totalSize > this . options . size ) {
SmtpLogger . warn ( ` Message size exceeds limit for session ${ session . id } ` , {
sessionId : session.id ,
size : totalSize ,
limit : this.options.size
} ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . EXCEEDED_STORAGE } Message too big, size limit is ${ this . options . size } bytes ` ) ;
this . resetSession ( session ) ;
return ;
}
2025-05-21 14:28:33 +00:00
// Check for end of data marker - combine all chunks to ensure we don't miss it if split across chunks
const combinedData = session . emailDataChunks . join ( '' ) ;
// More permissive check for the end-of-data marker
// Check for various formats: \r\n.\r\n, \n.\r\n, \r\n.\n, \n.\n, or just . or .\r\n at the end
if ( combinedData . endsWith ( '\r\n.\r\n' ) ||
combinedData . endsWith ( '\n.\r\n' ) ||
combinedData . endsWith ( '\r\n.\n' ) ||
combinedData . endsWith ( '\n.\n' ) ||
data === '.\r\n' ||
data === '.' ) {
SmtpLogger . debug ( ` End of data marker found for session ${ session . id } ` , { sessionId : session.id } ) ;
2025-05-21 12:52:24 +00:00
// End of data marker found
await this . handleEndOfData ( socket , session ) ;
}
}
/ * *
* Process a complete email
* @param session - SMTP session
* @returns Promise that resolves with the result of the transaction
* /
public async processEmail ( session : ISmtpSession ) : Promise < ISmtpTransactionResult > {
// Combine all chunks and remove end of data marker
session . emailData = ( session . emailDataChunks || [ ] ) . join ( '' ) ;
2025-05-21 14:28:33 +00:00
// Remove trailing end-of-data marker: various formats
session . emailData = session . emailData
. replace ( /\r\n\.\r\n$/ , '' )
. replace ( /\n\.\r\n$/ , '' )
. replace ( /\r\n\.\n$/ , '' )
. replace ( /\n\.\n$/ , '' )
. replace ( /\.$/ , '' ) ; // Handle a lone dot at the end
2025-05-21 12:52:24 +00:00
// Remove dot-stuffing (RFC 5321, section 4.5.2)
session . emailData = session . emailData . replace ( /\r\n\.\./g , '\r\n.' ) ;
try {
// Parse email into Email object
const email = await this . parseEmail ( session ) ;
// Process the email based on the processing mode
const processingMode = session . processingMode || 'mta' ;
let result : ISmtpTransactionResult = {
success : false ,
error : 'Email processing failed'
} ;
switch ( processingMode ) {
case 'mta' :
// Process through the MTA system
try {
SmtpLogger . debug ( ` Processing email in MTA mode for session ${ session . id } ` , {
sessionId : session.id ,
messageId : email.getMessageId ( )
} ) ;
2025-05-21 13:42:12 +00:00
// Generate a message ID since queueEmail is not available
const messageId = ` ${ Date . now ( ) } - ${ Math . floor ( Math . random ( ) * 1000000 ) } @ ${ this . options . hostname || 'mail.example.com' } ` ;
// In a full implementation, the email would be queued to the delivery system
// await this.emailServer.queueEmail(email);
2025-05-21 12:52:24 +00:00
result = {
success : true ,
messageId ,
email
} ;
} catch ( error ) {
SmtpLogger . error ( ` Failed to queue email: ${ error instanceof Error ? error.message : String ( error ) } ` , {
sessionId : session.id ,
error : error instanceof Error ? error : new Error ( String ( error ) )
} ) ;
result = {
success : false ,
error : ` Failed to queue email: ${ error instanceof Error ? error.message : String ( error ) } `
} ;
}
break ;
case 'forward' :
// Forward email to another server
SmtpLogger . debug ( ` Processing email in FORWARD mode for session ${ session . id } ` , {
sessionId : session.id ,
messageId : email.getMessageId ( )
} ) ;
// Forward logic would be implemented here
result = {
success : true ,
messageId : email.getMessageId ( ) ,
email
} ;
break ;
case 'process' :
// Process the email immediately
SmtpLogger . debug ( ` Processing email in PROCESS mode for session ${ session . id } ` , {
sessionId : session.id ,
messageId : email.getMessageId ( )
} ) ;
// Direct processing logic would be implemented here
result = {
success : true ,
messageId : email.getMessageId ( ) ,
email
} ;
break ;
default :
SmtpLogger . warn ( ` Unknown processing mode: ${ processingMode } ` , { sessionId : session.id } ) ;
result = {
success : false ,
error : ` Unknown processing mode: ${ processingMode } `
} ;
}
return result ;
} catch ( error ) {
SmtpLogger . error ( ` Failed to parse email: ${ error instanceof Error ? error.message : String ( error ) } ` , {
sessionId : session.id ,
error : error instanceof Error ? error : new Error ( String ( error ) )
} ) ;
return {
success : false ,
error : ` Failed to parse email: ${ error instanceof Error ? error.message : String ( error ) } `
} ;
}
}
/ * *
* Save an email to disk
* @param session - SMTP session
* /
public saveEmail ( session : ISmtpSession ) : void {
if ( ! this . options . tempDir ) {
return ;
}
try {
const timestamp = Date . now ( ) ;
const filename = ` ${ session . id } - ${ timestamp } .eml ` ;
const filePath = path . join ( this . options . tempDir , filename ) ;
fs . writeFileSync ( filePath , session . emailData ) ;
SmtpLogger . debug ( ` Saved email to disk: ${ filePath } ` , {
sessionId : session.id ,
filePath
} ) ;
} catch ( error ) {
SmtpLogger . error ( ` Failed to save email to disk: ${ error instanceof Error ? error.message : String ( error ) } ` , {
sessionId : session.id ,
error : error instanceof Error ? error : new Error ( String ( error ) )
} ) ;
}
}
/ * *
* Parse an email into an Email object
* @param session - SMTP session
* @returns Promise that resolves with the parsed Email object
* /
public async parseEmail ( session : ISmtpSession ) : Promise < Email > {
2025-05-21 14:28:33 +00:00
// Parse raw email text to extract headers
const rawData = session . emailData ;
const headerEndIndex = rawData . indexOf ( '\r\n\r\n' ) ;
if ( headerEndIndex === - 1 ) {
// No headers/body separation, create basic email
return new Email ( {
from : session . envelope . mailFrom . address ,
to : session.envelope.rcptTo.map ( r = > r . address ) ,
subject : 'Received via SMTP' ,
text : rawData
} ) ;
}
// Extract headers and body
const headersText = rawData . substring ( 0 , headerEndIndex ) ;
const bodyText = rawData . substring ( headerEndIndex + 4 ) ; // Skip the \r\n\r\n separator
// Parse headers
const headers : Record < string , string > = { } ;
const headerLines = headersText . split ( '\r\n' ) ;
let currentHeader = '' ;
for ( const line of headerLines ) {
// Check if this is a continuation of a previous header
if ( line . startsWith ( ' ' ) || line . startsWith ( '\t' ) ) {
if ( currentHeader ) {
headers [ currentHeader ] += ' ' + line . trim ( ) ;
}
continue ;
}
// This is a new header
const separatorIndex = line . indexOf ( ':' ) ;
if ( separatorIndex !== - 1 ) {
const name = line . substring ( 0 , separatorIndex ) . trim ( ) . toLowerCase ( ) ;
const value = line . substring ( separatorIndex + 1 ) . trim ( ) ;
headers [ name ] = value ;
currentHeader = name ;
}
}
// Extract common headers
const subject = headers [ 'subject' ] || 'No Subject' ;
const from = headers [ 'from' ] || session . envelope . mailFrom . address ;
const to = headers [ 'to' ] || session . envelope . rcptTo . map ( r = > r . address ) . join ( ', ' ) ;
const messageId = headers [ 'message-id' ] || ` < ${ Date . now ( ) } . ${ Math . random ( ) . toString ( 36 ) . substring ( 2 ) } @ ${ this . options . hostname } > ` ;
// Create email object
2025-05-21 13:42:12 +00:00
const email = new Email ( {
2025-05-21 14:28:33 +00:00
from : from ,
to : to.split ( ',' ) . map ( addr = > addr . trim ( ) ) ,
subject : subject ,
text : bodyText ,
messageId : messageId ,
// Add original session envelope data for accurate routing
originalMailFrom : session.envelope.mailFrom.address ,
originalRcptTo : session.envelope.rcptTo.map ( r = > r . address )
2025-05-21 13:42:12 +00:00
} ) ;
2025-05-21 12:52:24 +00:00
2025-05-21 14:28:33 +00:00
// Add received header
const timestamp = new Date ( ) . toUTCString ( ) ;
const receivedHeader = ` from ${ session . clientHostname || 'unknown' } ( ${ session . remoteAddress } ) by ${ this . options . hostname } with ESMTP id ${ session . id } ; ${ timestamp } ` ;
email . addHeader ( 'Received' , receivedHeader ) ;
// Add all original headers
for ( const [ name , value ] of Object . entries ( headers ) ) {
if ( ! [ 'from' , 'to' , 'subject' , 'message-id' ] . includes ( name ) ) {
email . addHeader ( name , value ) ;
}
}
2025-05-21 12:52:24 +00:00
return email ;
}
/ * *
* Handle end of data marker received
* @param socket - Client socket
* @param session - SMTP session
* /
private async handleEndOfData ( socket : plugins.net.Socket | plugins . tls . TLSSocket , session : ISmtpSession ) : Promise < void > {
// Clear the data timeout
if ( session . dataTimeoutId ) {
clearTimeout ( session . dataTimeoutId ) ;
session . dataTimeoutId = undefined ;
}
try {
// Update session state
this . sessionManager . updateSessionState ( session , SmtpState . FINISHED ) ;
// Optionally save email to disk
this . saveEmail ( session ) ;
// Process the email
const result = await this . processEmail ( session ) ;
if ( result . success ) {
// Send success response
this . sendResponse ( socket , ` ${ SmtpResponseCode . OK } OK message queued as ${ result . messageId } ` ) ;
} else {
// Send error response
this . sendResponse ( socket , ` ${ SmtpResponseCode . TRANSACTION_FAILED } Failed to process email: ${ result . error } ` ) ;
}
// Reset session for new transaction
this . resetSession ( session ) ;
} catch ( error ) {
SmtpLogger . error ( ` Error processing email: ${ error instanceof Error ? error.message : String ( error ) } ` , {
sessionId : session.id ,
error : error instanceof Error ? error : new Error ( String ( error ) )
} ) ;
this . sendResponse ( socket , ` ${ SmtpResponseCode . LOCAL_ERROR } Error processing email: ${ error instanceof Error ? error.message : String ( error ) } ` ) ;
this . resetSession ( session ) ;
}
}
/ * *
* Reset session after email processing
* @param session - SMTP session
* /
private resetSession ( session : ISmtpSession ) : void {
// Clear any data timeout
if ( session . dataTimeoutId ) {
clearTimeout ( session . dataTimeoutId ) ;
session . dataTimeoutId = undefined ;
}
// Reset data fields but keep authentication state
session . mailFrom = '' ;
session . rcptTo = [ ] ;
session . emailData = '' ;
session . emailDataChunks = [ ] ;
session . envelope = {
mailFrom : { address : '' , args : { } } ,
rcptTo : [ ]
} ;
// Reset state to after EHLO
this . sessionManager . updateSessionState ( session , SmtpState . AFTER_EHLO ) ;
}
/ * *
* Send a response to the client
* @param socket - Client socket
* @param response - Response message
* /
private sendResponse ( socket : plugins.net.Socket | plugins . tls . TLSSocket , response : string ) : void {
try {
socket . write ( ` ${ response } ${ SMTP_DEFAULTS . CRLF } ` ) ;
SmtpLogger . logResponse ( response , socket ) ;
} catch ( error ) {
SmtpLogger . error ( ` Error sending response: ${ error instanceof Error ? error.message : String ( error ) } ` , {
response ,
remoteAddress : socket.remoteAddress ,
remotePort : socket.remotePort ,
error : error instanceof Error ? error : new Error ( String ( error ) )
} ) ;
socket . destroy ( ) ;
}
}
}