import * as plugins from './plugins.js'; import { RustBinaryLocator } from './classes.rustbinarylocator.js'; import { StreamingResponse } from './classes.streamingresponse.js'; import type { IRustBridgeOptions, IRustBridgeLogger, TCommandMap, IManagementRequest, IManagementResponse, IManagementEvent, TStreamingCommandKeys, TExtractChunk, } from './interfaces/index.js'; const defaultLogger: IRustBridgeLogger = { log() {}, }; /** * Generic bridge between TypeScript and a Rust binary. * Communicates via JSON-over-stdin/stdout IPC protocol. * * @typeParam TCommands - Map of command names to their param/result types */ export class RustBridge extends plugins.events.EventEmitter { private locator: RustBinaryLocator; private options: Required> & IRustBridgeOptions; private logger: IRustBridgeLogger; private childProcess: plugins.childProcess.ChildProcess | null = null; private stdoutBuffer: Buffer = Buffer.alloc(0); private stderrRemainder: string = ''; private pendingRequests = new Map void; reject: (error: Error) => void; timer: ReturnType; streaming?: StreamingResponse; }>(); private requestCounter = 0; private isRunning = false; private binaryPath: string | null = null; constructor(options: IRustBridgeOptions) { super(); this.logger = options.logger || defaultLogger; this.options = { cliArgs: ['--management'], requestTimeoutMs: 30000, readyTimeoutMs: 10000, readyEventName: 'ready', maxPayloadSize: 50 * 1024 * 1024, ...options, }; this.locator = new RustBinaryLocator(options, this.logger); } /** * Spawn the Rust binary and wait for it to signal readiness. * Returns true if the binary was found and spawned successfully. */ public async spawn(): Promise { this.binaryPath = await this.locator.findBinary(); if (!this.binaryPath) { return false; } return new Promise((resolve) => { try { const env = this.options.env ? { ...process.env, ...this.options.env } : { ...process.env }; this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, { stdio: ['pipe', 'pipe', 'pipe'], env, }); // Handle stderr with cross-chunk buffering this.childProcess.stderr?.on('data', (data: Buffer) => { this.stderrRemainder += data.toString(); const lines = this.stderrRemainder.split('\n'); // Keep the last element (incomplete line) as remainder this.stderrRemainder = lines.pop()!; for (const line of lines) { const trimmed = line.trim(); if (trimmed) { this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`); this.emit('stderr', trimmed); } } }); // Handle stdout via Buffer-based newline scanner this.childProcess.stdout!.on('data', (chunk: Buffer) => { this.handleStdoutChunk(chunk); }); // Handle process exit this.childProcess.on('exit', (code, signal) => { this.logger.log('info', `Process exited (code=${code}, signal=${signal})`); // Flush any remaining stderr if (this.stderrRemainder.trim()) { this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`); this.emit('stderr', this.stderrRemainder.trim()); } this.cleanup(); this.emit('exit', code, signal); }); this.childProcess.on('error', (err) => { this.logger.log('error', `Process error: ${err.message}`); this.cleanup(); resolve(false); }); // Wait for the ready event const readyTimeout = setTimeout(() => { this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`); this.kill(); resolve(false); }, this.options.readyTimeoutMs); this.once(`management:${this.options.readyEventName}`, () => { clearTimeout(readyTimeout); this.isRunning = true; this.logger.log('info', `Bridge connected to ${this.options.binaryName}`); this.emit('ready'); resolve(true); }); } catch (err: any) { this.logger.log('error', `Failed to spawn: ${err.message}`); resolve(false); } }); } /** * Send a typed command to the Rust process and wait for the response. */ public async sendCommand( method: K, params: TCommands[K]['params'], ): Promise { if (!this.childProcess || !this.isRunning) { throw new Error(`${this.options.binaryName} bridge is not running`); } const id = `req_${++this.requestCounter}`; const request: IManagementRequest = { id, method, params }; const json = JSON.stringify(request); // Check outbound payload size const byteLength = Buffer.byteLength(json, 'utf8'); if (byteLength > this.options.maxPayloadSize) { throw new Error( `Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})` ); } return new Promise((resolve, reject) => { const timer = setTimeout(() => { this.pendingRequests.delete(id); reject(new Error(`Command '${method}' timed out after ${this.options.requestTimeoutMs}ms`)); }, this.options.requestTimeoutMs); this.pendingRequests.set(id, { resolve, reject, timer }); this.writeToStdin(json + '\n').catch((err) => { clearTimeout(timer); this.pendingRequests.delete(id); reject(new Error(`Failed to write to stdin: ${err.message}`)); }); }); } /** * Send a streaming command to the Rust process. * Returns a StreamingResponse that yields chunks via `for await...of` * and exposes `.result` for the final response. */ public sendCommandStreaming>( method: K, params: TCommands[K]['params'], ): StreamingResponse, TCommands[K]['result']> { const streaming = new StreamingResponse, TCommands[K]['result']>(); if (!this.childProcess || !this.isRunning) { streaming.fail(new Error(`${this.options.binaryName} bridge is not running`)); return streaming; } const id = `req_${++this.requestCounter}`; const request: IManagementRequest = { id, method, params }; const json = JSON.stringify(request); const byteLength = Buffer.byteLength(json, 'utf8'); if (byteLength > this.options.maxPayloadSize) { streaming.fail( new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`) ); return streaming; } const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs; const timer = setTimeout(() => { this.pendingRequests.delete(id); streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`)); }, timeoutMs); this.pendingRequests.set(id, { resolve: (result: any) => streaming.finish(result), reject: (error: Error) => streaming.fail(error), timer, streaming, }); this.writeToStdin(json + '\n').catch((err) => { clearTimeout(timer); this.pendingRequests.delete(id); streaming.fail(new Error(`Failed to write to stdin: ${err.message}`)); }); return streaming; } /** * Kill the Rust process and clean up all resources. */ public kill(): void { if (this.childProcess) { const proc = this.childProcess; this.childProcess = null; this.isRunning = false; // Clear buffers this.stdoutBuffer = Buffer.alloc(0); this.stderrRemainder = ''; // Reject pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error(`${this.options.binaryName} process killed`)); } this.pendingRequests.clear(); // Remove all listeners proc.removeAllListeners(); proc.stdout?.removeAllListeners(); proc.stderr?.removeAllListeners(); proc.stdin?.removeAllListeners(); // Kill the process try { proc.kill('SIGTERM'); } catch { /* already dead */ } // Destroy stdio pipes try { proc.stdin?.destroy(); } catch { /* ignore */ } try { proc.stdout?.destroy(); } catch { /* ignore */ } try { proc.stderr?.destroy(); } catch { /* ignore */ } // Unref so Node doesn't wait try { proc.unref(); } catch { /* ignore */ } // Force kill after 5 seconds setTimeout(() => { try { proc.kill('SIGKILL'); } catch { /* already dead */ } }, 5000).unref(); } } /** * Whether the bridge is currently running. */ public get running(): boolean { return this.isRunning; } /** * Buffer-based newline scanner for stdout chunks. * Replaces readline to handle large payloads without buffering entire lines in a separate abstraction. */ private handleStdoutChunk(chunk: Buffer): void { this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]); let newlineIndex: number; while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) { const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex); this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1); if (lineBuffer.length > this.options.maxPayloadSize) { this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`); continue; } const line = lineBuffer.toString('utf8').trim(); this.handleLine(line); } // If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM if (this.stdoutBuffer.length > this.options.maxPayloadSize) { this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`); this.stdoutBuffer = Buffer.alloc(0); } } /** * Write data to stdin with backpressure support. * Waits for drain if the internal buffer is full. */ private writeToStdin(data: string): Promise { return new Promise((resolve, reject) => { if (!this.childProcess?.stdin) { reject(new Error('stdin not available')); return; } const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => { if (err) { reject(err); } }); if (canContinue) { resolve(); } else { // Wait for drain before resolving this.childProcess.stdin.once('drain', () => { resolve(); }); } }); } private handleLine(line: string): void { if (!line) return; let parsed: any; try { parsed = JSON.parse(line); } catch { this.logger.log('warn', `Non-JSON output: ${line}`); return; } // Check if it's an event (has 'event' field, no 'id') if ('event' in parsed && !('id' in parsed)) { const event = parsed as IManagementEvent; this.emit(`management:${event.event}`, event.data); return; } // Stream chunk (has 'id' + stream === true + 'data') if ('id' in parsed && parsed.stream === true && 'data' in parsed) { const pending = this.pendingRequests.get(parsed.id); if (pending?.streaming) { // Reset inactivity timeout clearTimeout(pending.timer); const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs; pending.timer = setTimeout(() => { this.pendingRequests.delete(parsed.id); pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`)); }, timeoutMs); pending.streaming.pushChunk(parsed.data); } return; } // Otherwise it's a response (has 'id' field) if ('id' in parsed) { const response = parsed as IManagementResponse; const pending = this.pendingRequests.get(response.id); if (pending) { clearTimeout(pending.timer); this.pendingRequests.delete(response.id); if (response.success) { pending.resolve(response.result); } else { pending.reject(new Error(response.error || 'Unknown error from Rust process')); } } } } private cleanup(): void { this.isRunning = false; this.childProcess = null; this.stdoutBuffer = Buffer.alloc(0); this.stderrRemainder = ''; // Reject all pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error(`${this.options.binaryName} process exited`)); } this.pendingRequests.clear(); } }