import * as plugins from './plugins.js'; import { RustBinaryLocator } from './classes.rustbinarylocator.js'; import type { IRustBridgeOptions, IRustBridgeLogger, TCommandMap, IManagementRequest, IManagementResponse, IManagementEvent, } from './interfaces/index.js'; const defaultLogger: IRustBridgeLogger = { log() {}, }; /** * Generic bridge between TypeScript and a Rust binary. * Communicates via JSON-over-stdin/stdout IPC protocol. * * @typeParam TCommands - Map of command names to their param/result types */ export class RustBridge extends plugins.events.EventEmitter { private locator: RustBinaryLocator; private options: Required> & IRustBridgeOptions; private logger: IRustBridgeLogger; private childProcess: plugins.childProcess.ChildProcess | null = null; private readlineInterface: plugins.readline.Interface | null = null; private pendingRequests = new Map void; reject: (error: Error) => void; timer: ReturnType; }>(); private requestCounter = 0; private isRunning = false; private binaryPath: string | null = null; constructor(options: IRustBridgeOptions) { super(); this.logger = options.logger || defaultLogger; this.options = { cliArgs: ['--management'], requestTimeoutMs: 30000, readyTimeoutMs: 10000, readyEventName: 'ready', ...options, }; this.locator = new RustBinaryLocator(options, this.logger); } /** * Spawn the Rust binary and wait for it to signal readiness. * Returns true if the binary was found and spawned successfully. */ public async spawn(): Promise { this.binaryPath = await this.locator.findBinary(); if (!this.binaryPath) { return false; } return new Promise((resolve) => { try { const env = this.options.env ? { ...process.env, ...this.options.env } : { ...process.env }; this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, { stdio: ['pipe', 'pipe', 'pipe'], env, }); // Handle stderr this.childProcess.stderr?.on('data', (data: Buffer) => { const lines = data.toString().split('\n').filter((l: string) => l.trim()); for (const line of lines) { this.logger.log('debug', `[${this.options.binaryName}] ${line}`); this.emit('stderr', line); } }); // Handle stdout via readline for line-delimited JSON this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! }); this.readlineInterface.on('line', (line: string) => { this.handleLine(line.trim()); }); // Handle process exit this.childProcess.on('exit', (code, signal) => { this.logger.log('info', `Process exited (code=${code}, signal=${signal})`); this.cleanup(); this.emit('exit', code, signal); }); this.childProcess.on('error', (err) => { this.logger.log('error', `Process error: ${err.message}`); this.cleanup(); resolve(false); }); // Wait for the ready event const readyTimeout = setTimeout(() => { this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`); this.kill(); resolve(false); }, this.options.readyTimeoutMs); this.once(`management:${this.options.readyEventName}`, () => { clearTimeout(readyTimeout); this.isRunning = true; this.logger.log('info', `Bridge connected to ${this.options.binaryName}`); this.emit('ready'); resolve(true); }); } catch (err: any) { this.logger.log('error', `Failed to spawn: ${err.message}`); resolve(false); } }); } /** * Send a typed command to the Rust process and wait for the response. */ public async sendCommand( method: K, params: TCommands[K]['params'], ): Promise { if (!this.childProcess || !this.isRunning) { throw new Error(`${this.options.binaryName} bridge is not running`); } const id = `req_${++this.requestCounter}`; const request: IManagementRequest = { id, method, params }; return new Promise((resolve, reject) => { const timer = setTimeout(() => { this.pendingRequests.delete(id); reject(new Error(`Command '${method}' timed out after ${this.options.requestTimeoutMs}ms`)); }, this.options.requestTimeoutMs); this.pendingRequests.set(id, { resolve, reject, timer }); const json = JSON.stringify(request) + '\n'; this.childProcess!.stdin!.write(json, (err) => { if (err) { clearTimeout(timer); this.pendingRequests.delete(id); reject(new Error(`Failed to write to stdin: ${err.message}`)); } }); }); } /** * Kill the Rust process and clean up all resources. */ public kill(): void { if (this.childProcess) { const proc = this.childProcess; this.childProcess = null; this.isRunning = false; // Close readline if (this.readlineInterface) { this.readlineInterface.close(); this.readlineInterface = null; } // Reject pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error(`${this.options.binaryName} process killed`)); } this.pendingRequests.clear(); // Remove all listeners proc.removeAllListeners(); proc.stdout?.removeAllListeners(); proc.stderr?.removeAllListeners(); proc.stdin?.removeAllListeners(); // Kill the process try { proc.kill('SIGTERM'); } catch { /* already dead */ } // Destroy stdio pipes try { proc.stdin?.destroy(); } catch { /* ignore */ } try { proc.stdout?.destroy(); } catch { /* ignore */ } try { proc.stderr?.destroy(); } catch { /* ignore */ } // Unref so Node doesn't wait try { proc.unref(); } catch { /* ignore */ } // Force kill after 5 seconds setTimeout(() => { try { proc.kill('SIGKILL'); } catch { /* already dead */ } }, 5000).unref(); } } /** * Whether the bridge is currently running. */ public get running(): boolean { return this.isRunning; } private handleLine(line: string): void { if (!line) return; let parsed: any; try { parsed = JSON.parse(line); } catch { this.logger.log('warn', `Non-JSON output: ${line}`); return; } // Check if it's an event (has 'event' field, no 'id') if ('event' in parsed && !('id' in parsed)) { const event = parsed as IManagementEvent; this.emit(`management:${event.event}`, event.data); return; } // Otherwise it's a response (has 'id' field) if ('id' in parsed) { const response = parsed as IManagementResponse; const pending = this.pendingRequests.get(response.id); if (pending) { clearTimeout(pending.timer); this.pendingRequests.delete(response.id); if (response.success) { pending.resolve(response.result); } else { pending.reject(new Error(response.error || 'Unknown error from Rust process')); } } } } private cleanup(): void { this.isRunning = false; this.childProcess = null; if (this.readlineInterface) { this.readlineInterface.close(); this.readlineInterface = null; } // Reject all pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error(`${this.options.binaryName} process exited`)); } this.pendingRequests.clear(); } }