import * as plugins from './plugins.js'; import { LineScanner } from './classes.linescanner.js'; import type { IRustBridgeLogger } from './interfaces/index.js'; import type { IRustTransport } from './interfaces/transport.js'; export interface IStdioTransportOptions { binaryPath: string; cliArgs: string[]; env?: Record; maxPayloadSize: number; logger: IRustBridgeLogger; } /** * Transport that spawns a child process and communicates via stdin/stdout. * Extracted from the original RustBridge process management logic. */ export class StdioTransport extends plugins.events.EventEmitter implements IRustTransport { private options: IStdioTransportOptions; private childProcess: plugins.childProcess.ChildProcess | null = null; private lineScanner: LineScanner; private stderrRemainder: string = ''; private _connected: boolean = false; constructor(options: IStdioTransportOptions) { super(); this.options = options; this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger); } public get connected(): boolean { return this._connected; } /** * Spawn the child process. Resolves when the process is running (not necessarily ready). */ public async connect(): Promise { const env = this.options.env ? { ...process.env, ...this.options.env } : { ...process.env }; this.childProcess = plugins.childProcess.spawn( this.options.binaryPath, this.options.cliArgs, { stdio: ['pipe', 'pipe', 'pipe'], env }, ); // Handle stderr with cross-chunk buffering this.childProcess.stderr?.on('data', (data: Buffer) => { this.stderrRemainder += data.toString(); const lines = this.stderrRemainder.split('\n'); this.stderrRemainder = lines.pop()!; for (const line of lines) { const trimmed = line.trim(); if (trimmed) { this.emit('stderr', trimmed); } } }); // Handle stdout via LineScanner this.childProcess.stdout!.on('data', (chunk: Buffer) => { this.lineScanner.push(chunk, (line) => { this.emit('line', line); }); }); // Handle process exit this.childProcess.on('exit', (code: number | null, signal: string | null) => { // Flush remaining stderr if (this.stderrRemainder.trim()) { this.emit('stderr', this.stderrRemainder.trim()); } this._connected = false; this.lineScanner.clear(); this.stderrRemainder = ''; this.emit('close', code, signal); }); this.childProcess.on('error', (err: Error) => { this._connected = false; this.emit('error', err); }); this._connected = true; } /** * Write data to stdin with backpressure support. */ public async write(data: string): Promise { return new Promise((resolve, reject) => { if (!this.childProcess?.stdin) { reject(new Error('stdin not available')); return; } const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => { if (err) { reject(err); } }); if (canContinue) { resolve(); } else { this.childProcess.stdin.once('drain', () => { resolve(); }); } }); } /** * Kill the child process. Sends SIGTERM, then SIGKILL after 5s. */ public disconnect(): void { if (!this.childProcess) return; const proc = this.childProcess; this.childProcess = null; this._connected = false; this.lineScanner.clear(); this.stderrRemainder = ''; // Remove all listeners proc.removeAllListeners(); proc.stdout?.removeAllListeners(); proc.stderr?.removeAllListeners(); proc.stdin?.removeAllListeners(); // Kill the process try { proc.kill('SIGTERM'); } catch { /* already dead */ } // Destroy stdio pipes try { proc.stdin?.destroy(); } catch { /* ignore */ } try { proc.stdout?.destroy(); } catch { /* ignore */ } try { proc.stderr?.destroy(); } catch { /* ignore */ } // Unref so Node doesn't wait try { proc.unref(); } catch { /* ignore */ } // Force kill after 5 seconds setTimeout(() => { try { proc.kill('SIGKILL'); } catch { /* already dead */ } }, 5000).unref(); } }