BREAKING CHANGE(security): implement resilience and lifecycle management for RustSecurityBridge (auto-restart, health checks, state machine and eventing); remove legacy TS SMTP test helper and DNSManager; remove deliverability IP-warmup/sender-reputation integrations and related types; drop unused dependencies
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import * as plugins from '../plugins.js';
|
||||
import * as paths from '../paths.js';
|
||||
import { logger } from '../logger.js';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// IPC command type map — mirrors the methods in mailer-bin's management mode
|
||||
@@ -213,6 +214,35 @@ type TMailerCommands = {
|
||||
};
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Bridge state machine
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export enum BridgeState {
|
||||
Idle = 'idle',
|
||||
Starting = 'starting',
|
||||
Running = 'running',
|
||||
Restarting = 'restarting',
|
||||
Failed = 'failed',
|
||||
Stopped = 'stopped',
|
||||
}
|
||||
|
||||
export interface IBridgeResilienceConfig {
|
||||
maxRestartAttempts: number;
|
||||
healthCheckIntervalMs: number;
|
||||
restartBackoffBaseMs: number;
|
||||
restartBackoffMaxMs: number;
|
||||
healthCheckTimeoutMs: number;
|
||||
}
|
||||
|
||||
const DEFAULT_RESILIENCE_CONFIG: IBridgeResilienceConfig = {
|
||||
maxRestartAttempts: 5,
|
||||
healthCheckIntervalMs: 30_000,
|
||||
restartBackoffBaseMs: 1_000,
|
||||
restartBackoffMaxMs: 30_000,
|
||||
healthCheckTimeoutMs: 5_000,
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// RustSecurityBridge — singleton wrapper around smartrust.RustBridge
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -222,14 +252,26 @@ type TMailerCommands = {
|
||||
*
|
||||
* Uses `@push.rocks/smartrust` for JSON-over-stdin/stdout IPC.
|
||||
* Singleton — access via `RustSecurityBridge.getInstance()`.
|
||||
*
|
||||
* Features resilience via auto-restart with exponential backoff,
|
||||
* periodic health checks, and a state machine that tracks the
|
||||
* bridge lifecycle.
|
||||
*/
|
||||
export class RustSecurityBridge {
|
||||
export class RustSecurityBridge extends EventEmitter {
|
||||
private static instance: RustSecurityBridge | null = null;
|
||||
private static _resilienceConfig: IBridgeResilienceConfig = { ...DEFAULT_RESILIENCE_CONFIG };
|
||||
|
||||
private bridge: InstanceType<typeof plugins.smartrust.RustBridge<TMailerCommands>>;
|
||||
private _running = false;
|
||||
private _state: BridgeState = BridgeState.Idle;
|
||||
private _restartAttempts = 0;
|
||||
private _restartTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private _healthCheckTimer: ReturnType<typeof setInterval> | null = null;
|
||||
private _deliberateStop = false;
|
||||
private _smtpServerConfig: ISmtpServerConfig | null = null;
|
||||
|
||||
private constructor() {
|
||||
super();
|
||||
this.bridge = new plugins.smartrust.RustBridge<TMailerCommands>({
|
||||
binaryName: 'mailer-bin',
|
||||
cliArgs: ['--management'],
|
||||
@@ -252,6 +294,13 @@ export class RustSecurityBridge {
|
||||
this.bridge.on('exit', (code: number | null, signal: string | null) => {
|
||||
this._running = false;
|
||||
logger.log('warn', `Rust security bridge exited (code=${code}, signal=${signal})`);
|
||||
|
||||
if (this._deliberateStop) {
|
||||
this.setState(BridgeState.Stopped);
|
||||
} else if (this._state === BridgeState.Running) {
|
||||
// Unexpected exit — attempt restart
|
||||
this.attemptRestart();
|
||||
}
|
||||
});
|
||||
|
||||
this.bridge.on('stderr', (line: string) => {
|
||||
@@ -259,6 +308,10 @@ export class RustSecurityBridge {
|
||||
});
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Static configuration & singleton
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/** Get or create the singleton instance. */
|
||||
public static getInstance(): RustSecurityBridge {
|
||||
if (!RustSecurityBridge.instance) {
|
||||
@@ -267,11 +320,73 @@ export class RustSecurityBridge {
|
||||
return RustSecurityBridge.instance;
|
||||
}
|
||||
|
||||
/** Reset the singleton instance (for testing). */
|
||||
public static resetInstance(): void {
|
||||
if (RustSecurityBridge.instance) {
|
||||
RustSecurityBridge.instance.stopHealthCheck();
|
||||
if (RustSecurityBridge.instance._restartTimer) {
|
||||
clearTimeout(RustSecurityBridge.instance._restartTimer);
|
||||
RustSecurityBridge.instance._restartTimer = null;
|
||||
}
|
||||
RustSecurityBridge.instance.removeAllListeners();
|
||||
}
|
||||
RustSecurityBridge.instance = null;
|
||||
}
|
||||
|
||||
/** Configure resilience parameters. Can be called before or after getInstance(). */
|
||||
public static configure(config: Partial<IBridgeResilienceConfig>): void {
|
||||
RustSecurityBridge._resilienceConfig = {
|
||||
...RustSecurityBridge._resilienceConfig,
|
||||
...config,
|
||||
};
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// State management
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/** Current bridge state. */
|
||||
public get state(): BridgeState {
|
||||
return this._state;
|
||||
}
|
||||
|
||||
/** Whether the Rust process is currently running and accepting commands. */
|
||||
public get running(): boolean {
|
||||
return this._running;
|
||||
}
|
||||
|
||||
private setState(newState: BridgeState): void {
|
||||
const oldState = this._state;
|
||||
if (oldState === newState) return;
|
||||
this._state = newState;
|
||||
logger.log('info', `Rust bridge state: ${oldState} -> ${newState}`);
|
||||
this.emit('stateChange', { oldState, newState });
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws a descriptive error if the bridge is not in Running state.
|
||||
* Called at the top of every command method.
|
||||
*/
|
||||
private ensureRunning(): void {
|
||||
if (this._state === BridgeState.Running && this._running) {
|
||||
return;
|
||||
}
|
||||
switch (this._state) {
|
||||
case BridgeState.Idle:
|
||||
throw new Error('Rust bridge has not been started yet. Call start() first.');
|
||||
case BridgeState.Starting:
|
||||
throw new Error('Rust bridge is still starting. Wait for start() to resolve.');
|
||||
case BridgeState.Restarting:
|
||||
throw new Error('Rust bridge is restarting after a crash. Commands will resume once it recovers.');
|
||||
case BridgeState.Failed:
|
||||
throw new Error('Rust bridge has failed after exhausting all restart attempts.');
|
||||
case BridgeState.Stopped:
|
||||
throw new Error('Rust bridge has been stopped. Call start() to restart it.');
|
||||
default:
|
||||
throw new Error(`Rust bridge is not running (state=${this._state}).`);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Lifecycle
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -281,55 +396,195 @@ export class RustSecurityBridge {
|
||||
* @returns `true` if the binary started successfully, `false` otherwise.
|
||||
*/
|
||||
public async start(): Promise<boolean> {
|
||||
if (this._running) {
|
||||
if (this._running && this._state === BridgeState.Running) {
|
||||
return true;
|
||||
}
|
||||
|
||||
this._deliberateStop = false;
|
||||
this._restartAttempts = 0;
|
||||
this.setState(BridgeState.Starting);
|
||||
|
||||
try {
|
||||
const ok = await this.bridge.spawn();
|
||||
this._running = ok;
|
||||
if (ok) {
|
||||
this.setState(BridgeState.Running);
|
||||
this.startHealthCheck();
|
||||
logger.log('info', 'Rust security bridge started');
|
||||
} else {
|
||||
this.setState(BridgeState.Failed);
|
||||
logger.log('warn', 'Rust security bridge failed to start (binary not found or timeout)');
|
||||
}
|
||||
return ok;
|
||||
} catch (err) {
|
||||
this.setState(BridgeState.Failed);
|
||||
logger.log('error', `Failed to start Rust security bridge: ${(err as Error).message}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/** Kill the Rust process. */
|
||||
/** Kill the Rust process deliberately. */
|
||||
public async stop(): Promise<void> {
|
||||
this._deliberateStop = true;
|
||||
|
||||
// Cancel any pending restart
|
||||
if (this._restartTimer) {
|
||||
clearTimeout(this._restartTimer);
|
||||
this._restartTimer = null;
|
||||
}
|
||||
|
||||
this.stopHealthCheck();
|
||||
this._smtpServerConfig = null;
|
||||
|
||||
if (!this._running) {
|
||||
this.setState(BridgeState.Stopped);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
this.bridge.kill();
|
||||
this._running = false;
|
||||
this.setState(BridgeState.Stopped);
|
||||
logger.log('info', 'Rust security bridge stopped');
|
||||
} catch (err) {
|
||||
logger.log('error', `Error stopping Rust security bridge: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Auto-restart with exponential backoff
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
private attemptRestart(): void {
|
||||
const config = RustSecurityBridge._resilienceConfig;
|
||||
this._restartAttempts++;
|
||||
|
||||
if (this._restartAttempts > config.maxRestartAttempts) {
|
||||
logger.log('error', `Rust bridge exceeded max restart attempts (${config.maxRestartAttempts}). Giving up.`);
|
||||
this.setState(BridgeState.Failed);
|
||||
return;
|
||||
}
|
||||
|
||||
this.setState(BridgeState.Restarting);
|
||||
this.stopHealthCheck();
|
||||
|
||||
const delay = Math.min(
|
||||
config.restartBackoffBaseMs * Math.pow(2, this._restartAttempts - 1),
|
||||
config.restartBackoffMaxMs,
|
||||
);
|
||||
|
||||
logger.log('info', `Rust bridge restart attempt ${this._restartAttempts}/${config.maxRestartAttempts} in ${delay}ms`);
|
||||
|
||||
this._restartTimer = setTimeout(async () => {
|
||||
this._restartTimer = null;
|
||||
|
||||
// Guard: if stop() was called while we were waiting, don't restart
|
||||
if (this._deliberateStop) {
|
||||
this.setState(BridgeState.Stopped);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const ok = await this.bridge.spawn();
|
||||
this._running = ok;
|
||||
|
||||
if (ok) {
|
||||
logger.log('info', 'Rust bridge restarted successfully');
|
||||
this._restartAttempts = 0;
|
||||
this.setState(BridgeState.Running);
|
||||
this.startHealthCheck();
|
||||
await this.restoreAfterRestart();
|
||||
} else {
|
||||
logger.log('warn', 'Rust bridge restart failed (spawn returned false)');
|
||||
this.attemptRestart();
|
||||
}
|
||||
} catch (err) {
|
||||
logger.log('error', `Rust bridge restart failed: ${(err as Error).message}`);
|
||||
this.attemptRestart();
|
||||
}
|
||||
}, delay);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore state after a successful restart:
|
||||
* - Re-send startSmtpServer command if the SMTP server was running
|
||||
*/
|
||||
private async restoreAfterRestart(): Promise<void> {
|
||||
if (this._smtpServerConfig) {
|
||||
try {
|
||||
logger.log('info', 'Restoring SMTP server after bridge restart');
|
||||
const result = await this.bridge.sendCommand('startSmtpServer', this._smtpServerConfig);
|
||||
if (result?.started) {
|
||||
logger.log('info', 'SMTP server restored after bridge restart');
|
||||
} else {
|
||||
logger.log('warn', 'SMTP server failed to restore after bridge restart');
|
||||
}
|
||||
} catch (err) {
|
||||
logger.log('error', `Failed to restore SMTP server after restart: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Health check
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
private startHealthCheck(): void {
|
||||
this.stopHealthCheck();
|
||||
const config = RustSecurityBridge._resilienceConfig;
|
||||
|
||||
this._healthCheckTimer = setInterval(async () => {
|
||||
if (this._state !== BridgeState.Running || !this._running) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const pongPromise = this.bridge.sendCommand('ping', {} as any);
|
||||
const timeoutPromise = new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Health check timeout')), config.healthCheckTimeoutMs),
|
||||
);
|
||||
const res = await Promise.race([pongPromise, timeoutPromise]);
|
||||
if (!(res as any)?.pong) {
|
||||
throw new Error('Health check: unexpected ping response');
|
||||
}
|
||||
} catch (err) {
|
||||
logger.log('warn', `Rust bridge health check failed: ${(err as Error).message}. Killing process to trigger restart.`);
|
||||
try {
|
||||
this.bridge.kill();
|
||||
} catch {
|
||||
// Already dead
|
||||
}
|
||||
// The exit handler will trigger attemptRestart()
|
||||
}
|
||||
}, config.healthCheckIntervalMs);
|
||||
}
|
||||
|
||||
private stopHealthCheck(): void {
|
||||
if (this._healthCheckTimer) {
|
||||
clearInterval(this._healthCheckTimer);
|
||||
this._healthCheckTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Commands — thin typed wrappers over sendCommand
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/** Ping the Rust process. */
|
||||
public async ping(): Promise<boolean> {
|
||||
this.ensureRunning();
|
||||
const res = await this.bridge.sendCommand('ping', {} as any);
|
||||
return res?.pong === true;
|
||||
}
|
||||
|
||||
/** Get version information for all Rust crates. */
|
||||
public async getVersion(): Promise<IVersionInfo> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('version', {} as any);
|
||||
}
|
||||
|
||||
/** Validate an email address. */
|
||||
public async validateEmail(email: string): Promise<IValidationResult> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('validateEmail', { email });
|
||||
}
|
||||
|
||||
@@ -339,6 +594,7 @@ export class RustSecurityBridge {
|
||||
diagnosticCode?: string;
|
||||
statusCode?: string;
|
||||
}): Promise<IBounceDetection> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('detectBounce', opts);
|
||||
}
|
||||
|
||||
@@ -349,16 +605,19 @@ export class RustSecurityBridge {
|
||||
htmlBody?: string;
|
||||
attachmentNames?: string[];
|
||||
}): Promise<IContentScanResult> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('scanContent', opts);
|
||||
}
|
||||
|
||||
/** Check IP reputation via DNSBL. */
|
||||
public async checkIpReputation(ip: string): Promise<IReputationResult> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('checkIpReputation', { ip });
|
||||
}
|
||||
|
||||
/** Verify DKIM signatures on a raw email message. */
|
||||
public async verifyDkim(rawMessage: string): Promise<IDkimVerificationResult[]> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('verifyDkim', { rawMessage });
|
||||
}
|
||||
|
||||
@@ -369,6 +628,7 @@ export class RustSecurityBridge {
|
||||
selector?: string;
|
||||
privateKey: string;
|
||||
}): Promise<{ header: string; signedMessage: string }> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('signDkim', opts);
|
||||
}
|
||||
|
||||
@@ -379,6 +639,7 @@ export class RustSecurityBridge {
|
||||
hostname?: string;
|
||||
mailFrom: string;
|
||||
}): Promise<ISpfResult> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('checkSpf', opts);
|
||||
}
|
||||
|
||||
@@ -395,6 +656,7 @@ export class RustSecurityBridge {
|
||||
hostname?: string;
|
||||
mailFrom: string;
|
||||
}): Promise<IEmailSecurityResult> {
|
||||
this.ensureRunning();
|
||||
return this.bridge.sendCommand('verifyEmail', opts);
|
||||
}
|
||||
|
||||
@@ -408,12 +670,16 @@ export class RustSecurityBridge {
|
||||
* emailReceived and authRequest that must be handled by the caller.
|
||||
*/
|
||||
public async startSmtpServer(config: ISmtpServerConfig): Promise<boolean> {
|
||||
this.ensureRunning();
|
||||
this._smtpServerConfig = config;
|
||||
const result = await this.bridge.sendCommand('startSmtpServer', config);
|
||||
return result?.started === true;
|
||||
}
|
||||
|
||||
/** Stop the Rust SMTP server. */
|
||||
public async stopSmtpServer(): Promise<void> {
|
||||
this.ensureRunning();
|
||||
this._smtpServerConfig = null;
|
||||
await this.bridge.sendCommand('stopSmtpServer', {} as any);
|
||||
}
|
||||
|
||||
@@ -428,6 +694,7 @@ export class RustSecurityBridge {
|
||||
smtpCode?: number;
|
||||
smtpMessage?: string;
|
||||
}): Promise<void> {
|
||||
this.ensureRunning();
|
||||
await this.bridge.sendCommand('emailProcessingResult', opts);
|
||||
}
|
||||
|
||||
@@ -439,11 +706,13 @@ export class RustSecurityBridge {
|
||||
success: boolean;
|
||||
message?: string;
|
||||
}): Promise<void> {
|
||||
this.ensureRunning();
|
||||
await this.bridge.sendCommand('authResult', opts);
|
||||
}
|
||||
|
||||
/** Update rate limit configuration at runtime. */
|
||||
public async configureRateLimits(config: IRateLimitConfig): Promise<void> {
|
||||
this.ensureRunning();
|
||||
await this.bridge.sendCommand('configureRateLimits', config);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user