feat(mta): Enhance MTA service and SMTP server with robust session management, advanced email handling, and integrated API routes
This commit is contained in:
@ -7,133 +7,345 @@ export interface ISmtpServerOptions {
|
||||
port: number;
|
||||
key: string;
|
||||
cert: string;
|
||||
hostname?: string;
|
||||
}
|
||||
|
||||
// SMTP Session States
|
||||
enum SmtpState {
|
||||
GREETING,
|
||||
AFTER_EHLO,
|
||||
MAIL_FROM,
|
||||
RCPT_TO,
|
||||
DATA,
|
||||
DATA_RECEIVING,
|
||||
FINISHED
|
||||
}
|
||||
|
||||
// Structure to store session information
|
||||
interface SmtpSession {
|
||||
state: SmtpState;
|
||||
clientHostname: string;
|
||||
mailFrom: string;
|
||||
rcptTo: string[];
|
||||
emailData: string;
|
||||
useTLS: boolean;
|
||||
connectionEnded: boolean;
|
||||
}
|
||||
|
||||
export class SMTPServer {
|
||||
public mtaRef: MtaService;
|
||||
private smtpServerOptions: ISmtpServerOptions;
|
||||
private server: plugins.net.Server;
|
||||
private emailBufferStringMap: Map<plugins.net.Socket, string>;
|
||||
private sessions: Map<plugins.net.Socket | plugins.tls.TLSSocket, SmtpSession>;
|
||||
private hostname: string;
|
||||
|
||||
constructor(mtaRefArg: MtaService, optionsArg: ISmtpServerOptions) {
|
||||
console.log('SMTPServer instance is being created...');
|
||||
|
||||
this.mtaRef = mtaRefArg;
|
||||
this.smtpServerOptions = optionsArg;
|
||||
this.emailBufferStringMap = new Map();
|
||||
this.sessions = new Map();
|
||||
this.hostname = optionsArg.hostname || 'mta.lossless.one';
|
||||
|
||||
this.server = plugins.net.createServer((socket) => {
|
||||
console.log('New connection established...');
|
||||
|
||||
socket.write('220 mta.lossless.one ESMTP Postfix\r\n');
|
||||
|
||||
socket.on('data', (data) => {
|
||||
this.processData(socket, data);
|
||||
});
|
||||
|
||||
socket.on('end', () => {
|
||||
console.log('Socket closed. Deleting related emailBuffer...');
|
||||
socket.destroy();
|
||||
this.emailBufferStringMap.delete(socket);
|
||||
});
|
||||
|
||||
socket.on('error', () => {
|
||||
console.error('Socket error occurred. Deleting related emailBuffer...');
|
||||
socket.destroy();
|
||||
this.emailBufferStringMap.delete(socket);
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log('Connection was closed by the client');
|
||||
socket.destroy();
|
||||
this.emailBufferStringMap.delete(socket);
|
||||
});
|
||||
this.handleNewConnection(socket);
|
||||
});
|
||||
}
|
||||
|
||||
private startTLS(socket: plugins.net.Socket) {
|
||||
const secureContext = plugins.tls.createSecureContext({
|
||||
key: this.smtpServerOptions.key,
|
||||
cert: this.smtpServerOptions.cert,
|
||||
private handleNewConnection(socket: plugins.net.Socket): void {
|
||||
console.log(`New connection from ${socket.remoteAddress}:${socket.remotePort}`);
|
||||
|
||||
// Initialize a new session
|
||||
this.sessions.set(socket, {
|
||||
state: SmtpState.GREETING,
|
||||
clientHostname: '',
|
||||
mailFrom: '',
|
||||
rcptTo: [],
|
||||
emailData: '',
|
||||
useTLS: false,
|
||||
connectionEnded: false
|
||||
});
|
||||
|
||||
const tlsSocket = new plugins.tls.TLSSocket(socket, {
|
||||
secureContext: secureContext,
|
||||
isServer: true,
|
||||
// Send greeting
|
||||
this.sendResponse(socket, `220 ${this.hostname} ESMTP Service Ready`);
|
||||
|
||||
socket.on('data', (data) => {
|
||||
this.processData(socket, data);
|
||||
});
|
||||
|
||||
tlsSocket.on('secure', () => {
|
||||
console.log('Connection secured.');
|
||||
this.emailBufferStringMap.set(tlsSocket, this.emailBufferStringMap.get(socket) || '');
|
||||
this.emailBufferStringMap.delete(socket);
|
||||
});
|
||||
|
||||
// Use the same handler for the 'data' event as for the unsecured socket.
|
||||
tlsSocket.on('data', (data: Buffer) => {
|
||||
this.processData(tlsSocket, Buffer.from(data));
|
||||
});
|
||||
|
||||
tlsSocket.on('end', () => {
|
||||
console.log('TLS socket closed. Deleting related emailBuffer...');
|
||||
this.emailBufferStringMap.delete(tlsSocket);
|
||||
});
|
||||
|
||||
tlsSocket.on('error', (err) => {
|
||||
console.error('TLS socket error occurred. Deleting related emailBuffer...');
|
||||
this.emailBufferStringMap.delete(tlsSocket);
|
||||
});
|
||||
}
|
||||
|
||||
private processData(socket: plugins.net.Socket | plugins.tls.TLSSocket, data: Buffer) {
|
||||
const dataString = data.toString();
|
||||
console.log(`Received data:`);
|
||||
console.log(`${dataString}`)
|
||||
|
||||
if (dataString.startsWith('EHLO')) {
|
||||
socket.write('250-mta.lossless.one Hello\r\n250 STARTTLS\r\n');
|
||||
} else if (dataString.startsWith('MAIL FROM')) {
|
||||
socket.write('250 Ok\r\n');
|
||||
} else if (dataString.startsWith('RCPT TO')) {
|
||||
socket.write('250 Ok\r\n');
|
||||
} else if (dataString.startsWith('STARTTLS')) {
|
||||
socket.write('220 Ready to start TLS\r\n');
|
||||
this.startTLS(socket);
|
||||
} else if (dataString.startsWith('DATA')) {
|
||||
socket.write('354 End data with <CR><LF>.<CR><LF>\r\n');
|
||||
let emailBuffer = this.emailBufferStringMap.get(socket);
|
||||
if (!emailBuffer) {
|
||||
this.emailBufferStringMap.set(socket, '');
|
||||
socket.on('end', () => {
|
||||
console.log(`Connection ended from ${socket.remoteAddress}:${socket.remotePort}`);
|
||||
const session = this.sessions.get(socket);
|
||||
if (session) {
|
||||
session.connectionEnded = true;
|
||||
}
|
||||
} else if (dataString.startsWith('QUIT')) {
|
||||
socket.write('221 Bye\r\n');
|
||||
console.log('Received QUIT command, closing the socket...');
|
||||
});
|
||||
|
||||
socket.on('error', (err) => {
|
||||
console.error(`Socket error: ${err.message}`);
|
||||
this.sessions.delete(socket);
|
||||
socket.destroy();
|
||||
this.parseEmail(socket);
|
||||
} else {
|
||||
let emailBuffer = this.emailBufferStringMap.get(socket);
|
||||
if (typeof emailBuffer === 'string') {
|
||||
emailBuffer += dataString;
|
||||
this.emailBufferStringMap.set(socket, emailBuffer);
|
||||
}
|
||||
socket.write('250 Ok\r\n');
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log(`Connection closed from ${socket.remoteAddress}:${socket.remotePort}`);
|
||||
this.sessions.delete(socket);
|
||||
});
|
||||
}
|
||||
|
||||
private sendResponse(socket: plugins.net.Socket | plugins.tls.TLSSocket, response: string): void {
|
||||
try {
|
||||
socket.write(`${response}\r\n`);
|
||||
console.log(`→ ${response}`);
|
||||
} catch (error) {
|
||||
console.error(`Error sending response: ${error.message}`);
|
||||
socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
private processData(socket: plugins.net.Socket | plugins.tls.TLSSocket, data: Buffer): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) {
|
||||
console.error('No session found for socket. Closing connection.');
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
// If we're in DATA_RECEIVING state, handle differently
|
||||
if (session.state === SmtpState.DATA_RECEIVING) {
|
||||
return this.processEmailData(socket, data.toString());
|
||||
}
|
||||
|
||||
// Process normal SMTP commands
|
||||
const lines = data.toString().split('\r\n').filter(line => line.length > 0);
|
||||
for (const line of lines) {
|
||||
console.log(`← ${line}`);
|
||||
this.processCommand(socket, line);
|
||||
}
|
||||
}
|
||||
|
||||
private processCommand(socket: plugins.net.Socket | plugins.tls.TLSSocket, commandLine: string): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session || session.connectionEnded) return;
|
||||
|
||||
const [command, ...args] = commandLine.split(' ');
|
||||
const upperCommand = command.toUpperCase();
|
||||
|
||||
switch (upperCommand) {
|
||||
case 'EHLO':
|
||||
case 'HELO':
|
||||
this.handleEhlo(socket, args.join(' '));
|
||||
break;
|
||||
case 'STARTTLS':
|
||||
this.handleStartTls(socket);
|
||||
break;
|
||||
case 'MAIL':
|
||||
this.handleMailFrom(socket, args.join(' '));
|
||||
break;
|
||||
case 'RCPT':
|
||||
this.handleRcptTo(socket, args.join(' '));
|
||||
break;
|
||||
case 'DATA':
|
||||
this.handleData(socket);
|
||||
break;
|
||||
case 'RSET':
|
||||
this.handleRset(socket);
|
||||
break;
|
||||
case 'QUIT':
|
||||
this.handleQuit(socket);
|
||||
break;
|
||||
case 'NOOP':
|
||||
this.sendResponse(socket, '250 OK');
|
||||
break;
|
||||
default:
|
||||
this.sendResponse(socket, '502 Command not implemented');
|
||||
}
|
||||
}
|
||||
|
||||
private handleEhlo(socket: plugins.net.Socket | plugins.tls.TLSSocket, clientHostname: string): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
if (!clientHostname) {
|
||||
this.sendResponse(socket, '501 Syntax error in parameters or arguments');
|
||||
return;
|
||||
}
|
||||
|
||||
session.clientHostname = clientHostname;
|
||||
session.state = SmtpState.AFTER_EHLO;
|
||||
|
||||
// List available extensions
|
||||
this.sendResponse(socket, `250-${this.hostname} Hello ${clientHostname}`);
|
||||
this.sendResponse(socket, '250-SIZE 10485760'); // 10MB max
|
||||
this.sendResponse(socket, '250-8BITMIME');
|
||||
|
||||
// Only offer STARTTLS if we haven't already established it
|
||||
if (!session.useTLS) {
|
||||
this.sendResponse(socket, '250-STARTTLS');
|
||||
}
|
||||
|
||||
if (dataString.endsWith('\r\n.\r\n') ) { // End of data
|
||||
console.log('Received end of data.');
|
||||
this.sendResponse(socket, '250 HELP');
|
||||
}
|
||||
|
||||
private handleStartTls(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
if (session.state !== SmtpState.AFTER_EHLO) {
|
||||
this.sendResponse(socket, '503 Bad sequence of commands');
|
||||
return;
|
||||
}
|
||||
|
||||
if (session.useTLS) {
|
||||
this.sendResponse(socket, '503 TLS already active');
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendResponse(socket, '220 Ready to start TLS');
|
||||
this.startTLS(socket);
|
||||
}
|
||||
|
||||
private handleMailFrom(socket: plugins.net.Socket | plugins.tls.TLSSocket, args: string): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
if (session.state !== SmtpState.AFTER_EHLO) {
|
||||
this.sendResponse(socket, '503 Bad sequence of commands');
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract email from MAIL FROM:<user@example.com>
|
||||
const emailMatch = args.match(/FROM:<([^>]*)>/i);
|
||||
if (!emailMatch) {
|
||||
this.sendResponse(socket, '501 Syntax error in parameters or arguments');
|
||||
return;
|
||||
}
|
||||
|
||||
const email = emailMatch[1];
|
||||
if (!this.isValidEmail(email)) {
|
||||
this.sendResponse(socket, '501 Invalid email address');
|
||||
return;
|
||||
}
|
||||
|
||||
session.mailFrom = email;
|
||||
session.state = SmtpState.MAIL_FROM;
|
||||
this.sendResponse(socket, '250 OK');
|
||||
}
|
||||
|
||||
private handleRcptTo(socket: plugins.net.Socket | plugins.tls.TLSSocket, args: string): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
if (session.state !== SmtpState.MAIL_FROM && session.state !== SmtpState.RCPT_TO) {
|
||||
this.sendResponse(socket, '503 Bad sequence of commands');
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract email from RCPT TO:<user@example.com>
|
||||
const emailMatch = args.match(/TO:<([^>]*)>/i);
|
||||
if (!emailMatch) {
|
||||
this.sendResponse(socket, '501 Syntax error in parameters or arguments');
|
||||
return;
|
||||
}
|
||||
|
||||
const email = emailMatch[1];
|
||||
if (!this.isValidEmail(email)) {
|
||||
this.sendResponse(socket, '501 Invalid email address');
|
||||
return;
|
||||
}
|
||||
|
||||
session.rcptTo.push(email);
|
||||
session.state = SmtpState.RCPT_TO;
|
||||
this.sendResponse(socket, '250 OK');
|
||||
}
|
||||
|
||||
private handleData(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
if (session.state !== SmtpState.RCPT_TO) {
|
||||
this.sendResponse(socket, '503 Bad sequence of commands');
|
||||
return;
|
||||
}
|
||||
|
||||
session.state = SmtpState.DATA_RECEIVING;
|
||||
session.emailData = '';
|
||||
this.sendResponse(socket, '354 End data with <CR><LF>.<CR><LF>');
|
||||
}
|
||||
|
||||
private handleRset(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
// Reset the session data but keep connection information
|
||||
session.state = SmtpState.AFTER_EHLO;
|
||||
session.mailFrom = '';
|
||||
session.rcptTo = [];
|
||||
session.emailData = '';
|
||||
|
||||
this.sendResponse(socket, '250 OK');
|
||||
}
|
||||
|
||||
private handleQuit(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
this.sendResponse(socket, '221 Goodbye');
|
||||
|
||||
// If we have collected email data, try to parse it before closing
|
||||
if (session.state === SmtpState.FINISHED && session.emailData.length > 0) {
|
||||
this.parseEmail(socket);
|
||||
}
|
||||
|
||||
socket.end();
|
||||
this.sessions.delete(socket);
|
||||
}
|
||||
|
||||
private processEmailData(socket: plugins.net.Socket | plugins.tls.TLSSocket, data: string): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
// Check for end of data marker
|
||||
if (data.endsWith('\r\n.\r\n')) {
|
||||
// Remove the end of data marker
|
||||
const emailData = data.slice(0, -5);
|
||||
session.emailData += emailData;
|
||||
session.state = SmtpState.FINISHED;
|
||||
|
||||
// Save and process the email
|
||||
this.saveEmail(socket);
|
||||
this.sendResponse(socket, '250 OK: Message accepted for delivery');
|
||||
} else {
|
||||
// Accumulate the data
|
||||
session.emailData += data;
|
||||
}
|
||||
}
|
||||
|
||||
private async parseEmail(socket: plugins.net.Socket | plugins.tls.TLSSocket) {
|
||||
let emailData = this.emailBufferStringMap.get(socket);
|
||||
// lets strip the end sequence
|
||||
emailData = emailData?.replace(/\r\n\.\r\n$/, '');
|
||||
private saveEmail(socket: plugins.net.Socket | plugins.tls.TLSSocket): void {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session) return;
|
||||
|
||||
plugins.smartfile.fs.ensureDirSync(paths.receivedEmailsDir);
|
||||
plugins.smartfile.memory.toFsSync(emailData, plugins.path.join(paths.receivedEmailsDir, `${Date.now()}.eml`));
|
||||
try {
|
||||
// Ensure the directory exists
|
||||
plugins.smartfile.fs.ensureDirSync(paths.receivedEmailsDir);
|
||||
|
||||
// Write the email to disk
|
||||
plugins.smartfile.memory.toFsSync(
|
||||
session.emailData,
|
||||
plugins.path.join(paths.receivedEmailsDir, `${Date.now()}.eml`)
|
||||
);
|
||||
|
||||
// Parse the email
|
||||
this.parseEmail(socket);
|
||||
} catch (error) {
|
||||
console.error('Error saving email:', error);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!emailData) {
|
||||
console.error('No email data found for socket.');
|
||||
private async parseEmail(socket: plugins.net.Socket | plugins.tls.TLSSocket): Promise<void> {
|
||||
const session = this.sessions.get(socket);
|
||||
if (!session || !session.emailData) {
|
||||
console.error('No email data found for session.');
|
||||
return;
|
||||
}
|
||||
|
||||
@ -141,51 +353,124 @@ export class SMTPServer {
|
||||
|
||||
// Verifying the email with DKIM
|
||||
try {
|
||||
const isVerified = await this.mtaRef.dkimVerifier.verify(emailData);
|
||||
const isVerified = await this.mtaRef.dkimVerifier.verify(session.emailData);
|
||||
mightBeSpam = !isVerified;
|
||||
} catch (error) {
|
||||
console.error('Failed to verify DKIM signature:', error);
|
||||
mightBeSpam = true;
|
||||
}
|
||||
|
||||
const parsedEmail = await plugins.mailparser.simpleParser(emailData);
|
||||
console.log(parsedEmail)
|
||||
const email = new Email({
|
||||
from: parsedEmail.from?.value[0].address || '',
|
||||
to:
|
||||
parsedEmail.to instanceof Array
|
||||
? parsedEmail.to[0].value[0].address
|
||||
: parsedEmail.to?.value[0].address,
|
||||
subject: parsedEmail.subject || '',
|
||||
text: parsedEmail.html || parsedEmail.text,
|
||||
attachments:
|
||||
parsedEmail.attachments?.map((attachment) => ({
|
||||
try {
|
||||
const parsedEmail = await plugins.mailparser.simpleParser(session.emailData);
|
||||
|
||||
const email = new Email({
|
||||
from: parsedEmail.from?.value[0].address || session.mailFrom,
|
||||
to: session.rcptTo[0], // Use the first recipient
|
||||
subject: parsedEmail.subject || '',
|
||||
text: parsedEmail.html || parsedEmail.text || '',
|
||||
attachments: parsedEmail.attachments?.map((attachment) => ({
|
||||
filename: attachment.filename || '',
|
||||
content: attachment.content,
|
||||
contentType: attachment.contentType,
|
||||
})) || [],
|
||||
mightBeSpam: mightBeSpam,
|
||||
});
|
||||
mightBeSpam: mightBeSpam,
|
||||
});
|
||||
|
||||
console.log('mail received!');
|
||||
console.log(email);
|
||||
console.log('Email received and parsed:', {
|
||||
from: email.from,
|
||||
to: email.to,
|
||||
subject: email.subject,
|
||||
attachments: email.attachments.length,
|
||||
mightBeSpam: email.mightBeSpam
|
||||
});
|
||||
|
||||
this.emailBufferStringMap.delete(socket);
|
||||
// Process or forward the email as needed
|
||||
// this.mtaRef.processIncomingEmail(email); // You could add this method to your MTA service
|
||||
} catch (error) {
|
||||
console.error('Error parsing email:', error);
|
||||
}
|
||||
}
|
||||
|
||||
public start() {
|
||||
private startTLS(socket: plugins.net.Socket): void {
|
||||
try {
|
||||
const secureContext = plugins.tls.createSecureContext({
|
||||
key: this.smtpServerOptions.key,
|
||||
cert: this.smtpServerOptions.cert,
|
||||
});
|
||||
|
||||
const tlsSocket = new plugins.tls.TLSSocket(socket, {
|
||||
secureContext: secureContext,
|
||||
isServer: true,
|
||||
server: this.server
|
||||
});
|
||||
|
||||
const originalSession = this.sessions.get(socket);
|
||||
if (!originalSession) {
|
||||
console.error('No session found when upgrading to TLS');
|
||||
return;
|
||||
}
|
||||
|
||||
// Transfer the session data to the new TLS socket
|
||||
this.sessions.set(tlsSocket, {
|
||||
...originalSession,
|
||||
useTLS: true,
|
||||
state: SmtpState.GREETING // Reset state to require a new EHLO
|
||||
});
|
||||
|
||||
this.sessions.delete(socket);
|
||||
|
||||
tlsSocket.on('secure', () => {
|
||||
console.log('TLS negotiation successful');
|
||||
});
|
||||
|
||||
tlsSocket.on('data', (data: Buffer) => {
|
||||
this.processData(tlsSocket, data);
|
||||
});
|
||||
|
||||
tlsSocket.on('end', () => {
|
||||
console.log('TLS socket ended');
|
||||
const session = this.sessions.get(tlsSocket);
|
||||
if (session) {
|
||||
session.connectionEnded = true;
|
||||
}
|
||||
});
|
||||
|
||||
tlsSocket.on('error', (err) => {
|
||||
console.error('TLS socket error:', err);
|
||||
this.sessions.delete(tlsSocket);
|
||||
tlsSocket.destroy();
|
||||
});
|
||||
|
||||
tlsSocket.on('close', () => {
|
||||
console.log('TLS socket closed');
|
||||
this.sessions.delete(tlsSocket);
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error upgrading connection to TLS:', error);
|
||||
socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
private isValidEmail(email: string): boolean {
|
||||
// Basic email validation - more comprehensive validation could be implemented
|
||||
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
|
||||
return emailRegex.test(email);
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
this.server.listen(this.smtpServerOptions.port, () => {
|
||||
console.log(`SMTP Server is now running on port ${this.smtpServerOptions.port}`);
|
||||
});
|
||||
}
|
||||
|
||||
public stop() {
|
||||
public stop(): void {
|
||||
this.server.getConnections((err, count) => {
|
||||
if (err) throw err;
|
||||
console.log('Number of active connections: ', count);
|
||||
});
|
||||
|
||||
this.server.close(() => {
|
||||
console.log('SMTP Server is now stopped');
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user