platformservice/ts/mta/mta.classes.mta.ts

945 lines
27 KiB
TypeScript
Raw Normal View History

2024-02-16 13:41:04 +01:00
import * as plugins from '../plugins.js';
import * as paths from '../paths.js';
2024-02-16 13:28:40 +01:00
import { Email } from './mta.classes.email.js';
import { EmailSendJob, DeliveryStatus } from './mta.classes.emailsendjob.js';
2024-02-16 13:28:40 +01:00
import { DKIMCreator } from './mta.classes.dkimcreator.js';
import { DKIMVerifier } from './mta.classes.dkimverifier.js';
import { SMTPServer, type ISmtpServerOptions } from './mta.classes.smtpserver.js';
2024-02-16 13:28:40 +01:00
import { DNSManager } from './mta.classes.dnsmanager.js';
import { ApiManager } from './mta.classes.apimanager.js';
2024-02-16 20:42:26 +01:00
import type { SzPlatformService } from '../classes.platformservice.js';
2024-02-16 13:28:40 +01:00
/**
* Configuration options for the MTA service
*/
export interface IMtaConfig {
/** SMTP server options */
smtp?: {
/** Whether to enable the SMTP server */
enabled?: boolean;
/** Port to listen on (default: 25) */
port?: number;
/** SMTP server hostname */
hostname?: string;
/** Maximum allowed email size in bytes */
maxSize?: number;
};
/** SSL/TLS configuration */
tls?: {
/** Domain for certificate */
domain?: string;
/** Whether to auto-renew certificates */
autoRenew?: boolean;
/** Custom key/cert paths (if not using auto-provision) */
keyPath?: string;
certPath?: string;
};
/** Outbound email sending configuration */
outbound?: {
/** Maximum concurrent sending jobs */
concurrency?: number;
/** Retry configuration */
retries?: {
/** Maximum number of retries per message */
max?: number;
/** Initial delay between retries (milliseconds) */
delay?: number;
/** Whether to use exponential backoff for retries */
useBackoff?: boolean;
};
/** Rate limiting configuration */
rateLimit?: {
/** Maximum emails per period */
maxPerPeriod?: number;
/** Time period in milliseconds */
periodMs?: number;
/** Whether to apply per domain (vs globally) */
perDomain?: boolean;
};
};
/** Security settings */
security?: {
/** Whether to use DKIM signing */
useDkim?: boolean;
/** Whether to verify inbound DKIM signatures */
verifyDkim?: boolean;
/** Whether to verify SPF on inbound */
verifySpf?: boolean;
/** Whether to use TLS for outbound when available */
useTls?: boolean;
/** Whether to require valid certificates */
requireValidCerts?: boolean;
};
/** Domains configuration */
domains?: {
/** List of domains that this MTA will handle as local */
local?: string[];
/** Whether to auto-create DNS records */
autoCreateDnsRecords?: boolean;
/** DKIM selector to use (default: "mta") */
dkimSelector?: string;
};
}
/**
* Email queue entry
*/
interface QueueEntry {
id: string;
email: Email;
addedAt: Date;
processing: boolean;
attempts: number;
lastAttempt?: Date;
nextAttempt?: Date;
error?: Error;
status: DeliveryStatus;
}
/**
* Certificate information
*/
interface Certificate {
privateKey: string;
publicKey: string;
expiresAt: Date;
}
/**
* Stats for MTA monitoring
*/
interface MtaStats {
startTime: Date;
emailsReceived: number;
emailsSent: number;
emailsFailed: number;
activeConnections: number;
queueSize: number;
certificateInfo?: {
domain: string;
expiresAt: Date;
daysUntilExpiry: number;
};
}
/**
* Main MTA Service class that coordinates all email functionality
*/
2024-02-16 20:42:26 +01:00
export class MtaService {
/** Reference to the platform service */
2024-02-16 20:42:26 +01:00
public platformServiceRef: SzPlatformService;
/** SMTP server instance */
2024-02-16 13:28:40 +01:00
public server: SMTPServer;
/** DKIM creator for signing outgoing emails */
2024-02-16 13:28:40 +01:00
public dkimCreator: DKIMCreator;
/** DKIM verifier for validating incoming emails */
2024-02-16 13:28:40 +01:00
public dkimVerifier: DKIMVerifier;
/** DNS manager for handling DNS records */
2024-02-16 13:28:40 +01:00
public dnsManager: DNSManager;
/** API manager for external integrations */
public apiManager: ApiManager;
/** Email queue for outbound emails */
private emailQueue: Map<string, QueueEntry> = new Map();
/** Email queue processing state */
private queueProcessing = false;
/** Rate limiters for outbound emails */
private rateLimiters: Map<string, {
tokens: number;
lastRefill: number;
}> = new Map();
/** Certificate cache */
private certificate: Certificate = null;
/** MTA configuration */
private config: IMtaConfig;
/** Stats for monitoring */
private stats: MtaStats;
/** Whether the service is currently running */
private running = false;
2024-02-16 13:28:40 +01:00
/**
* Initialize the MTA service
* @param platformServiceRefArg Reference to the platform service
* @param config Configuration options
*/
constructor(platformServiceRefArg: SzPlatformService, config: IMtaConfig = {}) {
2024-02-16 20:42:26 +01:00
this.platformServiceRef = platformServiceRefArg;
// Initialize with default configuration
this.config = this.getDefaultConfig();
// Merge with provided configuration
this.config = this.mergeConfig(this.config, config);
// Initialize components
2024-02-16 13:28:40 +01:00
this.dkimCreator = new DKIMCreator(this);
this.dkimVerifier = new DKIMVerifier(this);
this.dnsManager = new DNSManager(this);
this.apiManager = new ApiManager();
// Initialize stats
this.stats = {
startTime: new Date(),
emailsReceived: 0,
emailsSent: 0,
emailsFailed: 0,
activeConnections: 0,
queueSize: 0
};
// Ensure required directories exist
this.ensureDirectories();
2024-02-16 13:28:40 +01:00
}
/**
* Get default configuration
*/
private getDefaultConfig(): IMtaConfig {
return {
smtp: {
enabled: true,
port: 25,
hostname: 'mta.lossless.one',
maxSize: 10 * 1024 * 1024 // 10MB
},
tls: {
domain: 'mta.lossless.one',
autoRenew: true
},
outbound: {
concurrency: 5,
retries: {
max: 3,
delay: 300000, // 5 minutes
useBackoff: true
},
rateLimit: {
maxPerPeriod: 100,
periodMs: 60000, // 1 minute
perDomain: true
}
},
security: {
useDkim: true,
verifyDkim: true,
verifySpf: true,
useTls: true,
requireValidCerts: false
},
domains: {
local: ['lossless.one'],
autoCreateDnsRecords: true,
dkimSelector: 'mta'
}
2024-02-16 13:28:40 +01:00
};
}
/**
* Merge configurations
*/
private mergeConfig(defaultConfig: IMtaConfig, customConfig: IMtaConfig): IMtaConfig {
// Deep merge of configurations
// (A more robust implementation would use a dedicated deep-merge library)
const merged = { ...defaultConfig };
// Merge first level
for (const [key, value] of Object.entries(customConfig)) {
if (value === null || value === undefined) continue;
if (typeof value === 'object' && !Array.isArray(value)) {
merged[key] = { ...merged[key], ...value };
} else {
merged[key] = value;
}
}
return merged;
}
/**
* Ensure required directories exist
*/
private ensureDirectories(): void {
plugins.smartfile.fs.ensureDirSync(paths.keysDir);
plugins.smartfile.fs.ensureDirSync(paths.sentEmailsDir);
plugins.smartfile.fs.ensureDirSync(paths.receivedEmailsDir);
plugins.smartfile.fs.ensureDirSync(paths.failedEmailsDir);
plugins.smartfile.fs.ensureDirSync(paths.dnsRecordsDir);
plugins.smartfile.fs.ensureDirSync(paths.logsDir);
}
/**
* Start the MTA service
*/
public async start(): Promise<void> {
if (this.running) {
console.warn('MTA service is already running');
return;
}
try {
console.log('Starting MTA service...');
// Load or provision certificate
await this.loadOrProvisionCertificate();
// Start SMTP server if enabled
if (this.config.smtp.enabled) {
const smtpOptions: ISmtpServerOptions = {
port: this.config.smtp.port,
key: this.certificate.privateKey,
cert: this.certificate.publicKey,
hostname: this.config.smtp.hostname
};
this.server = new SMTPServer(this, smtpOptions);
this.server.start();
console.log(`SMTP server started on port ${smtpOptions.port}`);
}
// Start queue processing
this.startQueueProcessing();
// Update DNS records for local domains if configured
if (this.config.domains.autoCreateDnsRecords) {
await this.updateDnsRecordsForLocalDomains();
}
this.running = true;
console.log('MTA service started successfully');
} catch (error) {
console.error('Failed to start MTA service:', error);
throw error;
}
}
/**
* Stop the MTA service
*/
public async stop(): Promise<void> {
if (!this.running) {
console.warn('MTA service is not running');
return;
}
try {
console.log('Stopping MTA service...');
// Stop SMTP server if running
if (this.server) {
await this.server.stop();
this.server = null;
console.log('SMTP server stopped');
}
// Stop queue processing
this.queueProcessing = false;
console.log('Email queue processing stopped');
this.running = false;
console.log('MTA service stopped successfully');
} catch (error) {
console.error('Error stopping MTA service:', error);
throw error;
}
}
/**
* Send an email (add to queue)
*/
public async send(email: Email): Promise<string> {
if (!this.running) {
throw new Error('MTA service is not running');
}
// Generate a unique ID for this email
const id = plugins.uuid.v4();
// Validate email
this.validateEmail(email);
// Create DKIM keys if needed
if (this.config.security.useDkim) {
await this.dkimCreator.handleDKIMKeysForEmail(email);
}
// Add to queue
this.emailQueue.set(id, {
id,
email,
addedAt: new Date(),
processing: false,
attempts: 0,
status: DeliveryStatus.PENDING
2024-02-16 13:28:40 +01:00
});
// Update stats
this.stats.queueSize = this.emailQueue.size;
console.log(`Email added to queue: ${id}`);
return id;
2024-02-16 13:28:40 +01:00
}
/**
* Get status of an email in the queue
*/
public getEmailStatus(id: string): QueueEntry | null {
return this.emailQueue.get(id) || null;
}
/**
* Handle an incoming email
*/
public async processIncomingEmail(email: Email): Promise<boolean> {
if (!this.running) {
throw new Error('MTA service is not running');
}
try {
console.log(`Processing incoming email from ${email.from} to ${email.to}`);
// Update stats
this.stats.emailsReceived++;
// Check if the recipient domain is local
const recipientDomain = email.to[0].split('@')[1];
const isLocalDomain = this.isLocalDomain(recipientDomain);
if (isLocalDomain) {
// Save to local mailbox
await this.saveToLocalMailbox(email);
return true;
} else {
// Forward to another server
const forwardId = await this.send(email);
console.log(`Forwarding email to ${email.to} with queue ID ${forwardId}`);
return true;
}
} catch (error) {
console.error('Error processing incoming email:', error);
return false;
}
}
/**
* Check if a domain is local
*/
private isLocalDomain(domain: string): boolean {
return this.config.domains.local.includes(domain);
}
/**
* Save an email to a local mailbox
*/
private async saveToLocalMailbox(email: Email): Promise<void> {
// Simplified implementation - in a real system, this would store to a user's mailbox
const mailboxPath = plugins.path.join(paths.receivedEmailsDir, 'local');
plugins.smartfile.fs.ensureDirSync(mailboxPath);
const emailContent = email.toRFC822String();
const filename = `${Date.now()}_${email.to[0].replace('@', '_at_')}.eml`;
plugins.smartfile.memory.toFsSync(
emailContent,
plugins.path.join(mailboxPath, filename)
);
console.log(`Email saved to local mailbox: ${filename}`);
}
/**
* Start processing the email queue
*/
private startQueueProcessing(): void {
if (this.queueProcessing) return;
this.queueProcessing = true;
this.processQueue();
console.log('Email queue processing started');
}
/**
* Process emails in the queue
*/
private async processQueue(): Promise<void> {
if (!this.queueProcessing) return;
try {
// Get pending emails ordered by next attempt time
const pendingEmails = Array.from(this.emailQueue.values())
.filter(entry =>
(entry.status === DeliveryStatus.PENDING || entry.status === DeliveryStatus.DEFERRED) &&
!entry.processing &&
(!entry.nextAttempt || entry.nextAttempt <= new Date())
)
.sort((a, b) => {
// Sort by next attempt time, then by added time
if (a.nextAttempt && b.nextAttempt) {
return a.nextAttempt.getTime() - b.nextAttempt.getTime();
} else if (a.nextAttempt) {
return 1;
} else if (b.nextAttempt) {
return -1;
} else {
return a.addedAt.getTime() - b.addedAt.getTime();
}
});
// Determine how many emails we can process concurrently
const availableSlots = Math.max(0, this.config.outbound.concurrency -
Array.from(this.emailQueue.values()).filter(e => e.processing).length);
// Process emails up to our concurrency limit
for (let i = 0; i < Math.min(availableSlots, pendingEmails.length); i++) {
const entry = pendingEmails[i];
// Check rate limits
if (!this.checkRateLimit(entry.email)) {
continue;
}
// Mark as processing
entry.processing = true;
// Process in background
this.processQueueEntry(entry).catch(error => {
console.error(`Error processing queue entry ${entry.id}:`, error);
});
}
} catch (error) {
console.error('Error in queue processing:', error);
} finally {
// Schedule next processing cycle
setTimeout(() => this.processQueue(), 1000);
}
}
/**
* Process a single queue entry
*/
private async processQueueEntry(entry: QueueEntry): Promise<void> {
try {
console.log(`Processing queue entry ${entry.id}`);
// Update attempt counters
entry.attempts++;
entry.lastAttempt = new Date();
// Create send job
const sendJob = new EmailSendJob(this, entry.email, {
maxRetries: 1, // We handle retries at the queue level
tlsOptions: {
rejectUnauthorized: this.config.security.requireValidCerts
}
});
// Send the email
const status = await sendJob.send();
entry.status = status;
if (status === DeliveryStatus.DELIVERED) {
// Success - remove from queue
this.emailQueue.delete(entry.id);
this.stats.emailsSent++;
console.log(`Email ${entry.id} delivered successfully`);
} else if (status === DeliveryStatus.FAILED) {
// Permanent failure
entry.error = sendJob.deliveryInfo.error;
this.stats.emailsFailed++;
console.log(`Email ${entry.id} failed permanently: ${entry.error.message}`);
// Remove from queue
this.emailQueue.delete(entry.id);
} else if (status === DeliveryStatus.DEFERRED) {
// Temporary failure - schedule retry if attempts remain
entry.error = sendJob.deliveryInfo.error;
if (entry.attempts >= this.config.outbound.retries.max) {
// Max retries reached - mark as failed
entry.status = DeliveryStatus.FAILED;
this.stats.emailsFailed++;
console.log(`Email ${entry.id} failed after ${entry.attempts} attempts: ${entry.error.message}`);
// Remove from queue
this.emailQueue.delete(entry.id);
} else {
// Schedule retry
const delay = this.calculateRetryDelay(entry.attempts);
entry.nextAttempt = new Date(Date.now() + delay);
console.log(`Email ${entry.id} deferred, next attempt at ${entry.nextAttempt}`);
}
}
} catch (error) {
console.error(`Unexpected error processing queue entry ${entry.id}:`, error);
// Handle unexpected errors similarly to deferred
entry.error = error;
if (entry.attempts >= this.config.outbound.retries.max) {
entry.status = DeliveryStatus.FAILED;
this.stats.emailsFailed++;
this.emailQueue.delete(entry.id);
} else {
entry.status = DeliveryStatus.DEFERRED;
const delay = this.calculateRetryDelay(entry.attempts);
entry.nextAttempt = new Date(Date.now() + delay);
}
} finally {
// Mark as no longer processing
entry.processing = false;
// Update stats
this.stats.queueSize = this.emailQueue.size;
}
}
/**
* Calculate delay before retry based on attempt number
*/
private calculateRetryDelay(attemptNumber: number): number {
const baseDelay = this.config.outbound.retries.delay;
if (this.config.outbound.retries.useBackoff) {
// Exponential backoff: base_delay * (2^(attempt-1))
return baseDelay * Math.pow(2, attemptNumber - 1);
} else {
return baseDelay;
}
}
/**
* Check if an email can be sent under rate limits
*/
private checkRateLimit(email: Email): boolean {
const config = this.config.outbound.rateLimit;
if (!config || !config.maxPerPeriod) {
return true; // No rate limit configured
}
// Determine which limiter to use
const key = config.perDomain ? email.getFromDomain() : 'global';
// Initialize limiter if needed
if (!this.rateLimiters.has(key)) {
this.rateLimiters.set(key, {
tokens: config.maxPerPeriod,
lastRefill: Date.now()
});
}
const limiter = this.rateLimiters.get(key);
// Refill tokens based on time elapsed
const now = Date.now();
const elapsedMs = now - limiter.lastRefill;
const tokensToAdd = Math.floor(elapsedMs / config.periodMs) * config.maxPerPeriod;
if (tokensToAdd > 0) {
limiter.tokens = Math.min(config.maxPerPeriod, limiter.tokens + tokensToAdd);
limiter.lastRefill = now - (elapsedMs % config.periodMs);
}
// Check if we have tokens available
if (limiter.tokens > 0) {
limiter.tokens--;
return true;
} else {
console.log(`Rate limit exceeded for ${key}`);
return false;
}
}
/**
* Load or provision a TLS certificate
*/
private async loadOrProvisionCertificate(): Promise<void> {
try {
// Check if we have manual cert paths specified
if (this.config.tls.keyPath && this.config.tls.certPath) {
console.log('Using manually specified certificate files');
const [privateKey, publicKey] = await Promise.all([
plugins.fs.promises.readFile(this.config.tls.keyPath, 'utf-8'),
plugins.fs.promises.readFile(this.config.tls.certPath, 'utf-8')
]);
this.certificate = {
privateKey,
publicKey,
expiresAt: this.getCertificateExpiry(publicKey)
};
console.log(`Certificate loaded, expires: ${this.certificate.expiresAt}`);
return;
}
// Otherwise, use auto-provisioning
console.log(`Provisioning certificate for ${this.config.tls.domain}`);
this.certificate = await this.provisionCertificate(this.config.tls.domain);
console.log(`Certificate provisioned, expires: ${this.certificate.expiresAt}`);
// Set up auto-renewal if configured
if (this.config.tls.autoRenew) {
this.setupCertificateRenewal();
}
} catch (error) {
console.error('Error loading or provisioning certificate:', error);
throw error;
}
}
/**
* Provision a certificate from the certificate service
*/
private async provisionCertificate(domain: string): Promise<Certificate> {
try {
// Setup proper authentication
const authToken = await this.getAuthToken();
if (!authToken) {
throw new Error('Failed to obtain authentication token for certificate provisioning');
}
// Initialize client
const typedrouter = new plugins.typedrequest.TypedRouter();
const typedsocketClient = await plugins.typedsocket.TypedSocket.createClient(
typedrouter,
'https://cloudly.lossless.one:443'
);
try {
// Request certificate
const typedCertificateRequest = typedsocketClient.createTypedRequest<any>('getSslCertificate');
const typedResponse = await typedCertificateRequest.fire({
authToken,
requiredCertName: domain,
});
if (!typedResponse || !typedResponse.certificate) {
throw new Error('Invalid response from certificate service');
}
// Extract certificate information
const cert = typedResponse.certificate;
// Determine expiry date
const expiresAt = this.getCertificateExpiry(cert.publicKey);
return {
privateKey: cert.privateKey,
publicKey: cert.publicKey,
expiresAt
};
} finally {
// Always close the client
await typedsocketClient.stop();
}
} catch (error) {
console.error('Certificate provisioning failed:', error);
throw error;
}
}
/**
* Get authentication token for certificate service
*/
private async getAuthToken(): Promise<string> {
// Implementation would depend on authentication mechanism
// This is a simplified example assuming the platform service has an auth method
try {
// For now, return a placeholder token - in production this would
// authenticate properly with the certificate service
return 'mta-service-auth-token';
} catch (error) {
console.error('Failed to obtain auth token:', error);
return null;
}
}
/**
* Extract certificate expiry date from public key
*/
private getCertificateExpiry(publicKey: string): Date {
try {
// This is a simplified implementation
// In a real system, you would parse the certificate properly
// using a certificate parsing library
// For now, set expiry to 90 days from now
const expiresAt = new Date();
expiresAt.setDate(expiresAt.getDate() + 90);
return expiresAt;
} catch (error) {
console.error('Failed to extract certificate expiry:', error);
// Default to 30 days from now
const defaultExpiry = new Date();
defaultExpiry.setDate(defaultExpiry.getDate() + 30);
return defaultExpiry;
}
}
/**
* Set up certificate auto-renewal
*/
private setupCertificateRenewal(): void {
if (!this.certificate || !this.certificate.expiresAt) {
console.warn('Cannot setup certificate renewal: no valid certificate');
2024-02-16 13:28:40 +01:00
return;
}
// Calculate time until renewal (30 days before expiry)
const now = new Date();
const renewalDate = new Date(this.certificate.expiresAt);
renewalDate.setDate(renewalDate.getDate() - 30);
const timeUntilRenewal = Math.max(0, renewalDate.getTime() - now.getTime());
console.log(`Certificate renewal scheduled for ${renewalDate}`);
// Schedule renewal
setTimeout(() => {
this.renewCertificate().catch(error => {
console.error('Certificate renewal failed:', error);
});
}, timeUntilRenewal);
2024-02-16 13:28:40 +01:00
}
/**
* Renew the certificate
*/
private async renewCertificate(): Promise<void> {
try {
console.log('Renewing certificate...');
// Provision new certificate
const newCertificate = await this.provisionCertificate(this.config.tls.domain);
// Replace current certificate
this.certificate = newCertificate;
console.log(`Certificate renewed, new expiry: ${newCertificate.expiresAt}`);
// Update SMTP server with new certificate if running
if (this.server) {
// Restart server with new certificate
await this.server.stop();
const smtpOptions: ISmtpServerOptions = {
port: this.config.smtp.port,
key: this.certificate.privateKey,
cert: this.certificate.publicKey,
hostname: this.config.smtp.hostname
};
this.server = new SMTPServer(this, smtpOptions);
this.server.start();
console.log('SMTP server restarted with new certificate');
}
// Schedule next renewal
this.setupCertificateRenewal();
} catch (error) {
console.error('Certificate renewal failed:', error);
// Schedule retry after 24 hours
setTimeout(() => {
this.renewCertificate().catch(err => {
console.error('Certificate renewal retry failed:', err);
});
}, 24 * 60 * 60 * 1000);
}
2024-02-16 13:28:40 +01:00
}
/**
* Update DNS records for all local domains
*/
private async updateDnsRecordsForLocalDomains(): Promise<void> {
if (!this.config.domains.local || this.config.domains.local.length === 0) {
return;
}
console.log('Updating DNS records for local domains...');
for (const domain of this.config.domains.local) {
try {
console.log(`Updating DNS records for ${domain}`);
// Generate DKIM keys if needed
await this.dkimCreator.handleDKIMKeysForDomain(domain);
// Generate all recommended DNS records
const records = await this.dnsManager.generateAllRecommendedRecords(domain);
console.log(`Generated ${records.length} DNS records for ${domain}`);
} catch (error) {
console.error(`Error updating DNS records for ${domain}:`, error);
}
}
}
/**
* Validate an email before sending
*/
private validateEmail(email: Email): void {
// The Email class constructor already performs basic validation
// Here we can add additional MTA-specific validation
if (!email.from) {
throw new Error('Email must have a sender address');
}
if (!email.to || email.to.length === 0) {
throw new Error('Email must have at least one recipient');
}
// Check if the sender domain is allowed
const senderDomain = email.getFromDomain();
if (!senderDomain) {
throw new Error('Invalid sender domain');
}
// If the sender domain is one of our local domains, ensure we have DKIM keys
if (this.isLocalDomain(senderDomain) && this.config.security.useDkim) {
// DKIM keys will be created if needed in the send method
}
}
/**
* Get MTA service statistics
*/
public getStats(): MtaStats {
// Update queue size
this.stats.queueSize = this.emailQueue.size;
// Update certificate info if available
if (this.certificate) {
const now = new Date();
const daysUntilExpiry = Math.floor(
(this.certificate.expiresAt.getTime() - now.getTime()) / (24 * 60 * 60 * 1000)
);
this.stats.certificateInfo = {
domain: this.config.tls.domain,
expiresAt: this.certificate.expiresAt,
daysUntilExpiry
};
}
return { ...this.stats };
}
}