import * as plugins from './plugins.js'; import { RustBinaryLocator } from './classes.rustbinarylocator.js'; import { StreamingResponse } from './classes.streamingresponse.js'; import { StdioTransport } from './classes.stdiotransport.js'; import { SocketTransport } from './classes.sockettransport.js'; import type { IRustBridgeOptions, IRustBridgeLogger, ISocketConnectOptions, TCommandMap, IManagementRequest, IManagementResponse, IManagementEvent, TStreamingCommandKeys, TExtractChunk, IRustTransport, } from './interfaces/index.js'; const defaultLogger: IRustBridgeLogger = { log() {}, }; /** * Generic bridge between TypeScript and a Rust binary. * Communicates via JSON-over-stdin/stdout IPC protocol (stdio mode) * or JSON-over-Unix-socket/named-pipe (socket mode). * * @typeParam TCommands - Map of command names to their param/result types */ export class RustBridge extends plugins.events.EventEmitter { private locator: RustBinaryLocator; private options: Required> & IRustBridgeOptions; private logger: IRustBridgeLogger; private transport: IRustTransport | null = null; private pendingRequests = new Map void; reject: (error: Error) => void; timer: ReturnType; streaming?: StreamingResponse; }>(); private requestCounter = 0; private isRunning = false; private binaryPath: string | null = null; constructor(options: IRustBridgeOptions) { super(); this.logger = options.logger || defaultLogger; this.options = { cliArgs: ['--management'], requestTimeoutMs: 30000, readyTimeoutMs: 10000, readyEventName: 'ready', maxPayloadSize: 50 * 1024 * 1024, ...options, }; this.locator = new RustBinaryLocator(options, this.logger); } /** * Spawn the Rust binary and wait for it to signal readiness. * Returns true if the binary was found and spawned successfully. */ public async spawn(): Promise { this.binaryPath = await this.locator.findBinary(); if (!this.binaryPath) { return false; } const transport = new StdioTransport({ binaryPath: this.binaryPath, cliArgs: this.options.cliArgs, env: this.options.env, maxPayloadSize: this.options.maxPayloadSize, logger: this.logger, }); return this.connectWithTransport(transport); } /** * Connect to an already-running Rust daemon via Unix socket or named pipe. * Returns true if the connection was established and the daemon signaled readiness. * * @param socketPath - Path to Unix socket or Windows named pipe * @param socketOptions - Optional socket connection options (reconnect, etc.) */ public async connect(socketPath: string, socketOptions?: ISocketConnectOptions): Promise { const transport = new SocketTransport({ socketPath, maxPayloadSize: this.options.maxPayloadSize, logger: this.logger, autoReconnect: socketOptions?.autoReconnect, reconnectBaseDelayMs: socketOptions?.reconnectBaseDelayMs, reconnectMaxDelayMs: socketOptions?.reconnectMaxDelayMs, maxReconnectAttempts: socketOptions?.maxReconnectAttempts, }); return this.connectWithTransport(transport); } /** * Internal: wire up any transport and wait for the ready handshake. */ private connectWithTransport(transport: IRustTransport): Promise { return new Promise((resolve) => { try { this.transport = transport; // Wire transport events transport.on('line', (line: string) => this.handleLine(line)); transport.on('stderr', (line: string) => { this.logger.log('debug', `[${this.options.binaryName}] ${line}`); this.emit('stderr', line); }); transport.on('close', (...args: any[]) => { this.logger.log('info', `Transport closed`); this.cleanup(); this.emit('exit', ...args); }); transport.on('error', (err: Error) => { this.logger.log('error', `Transport error: ${err.message}`); this.cleanup(); resolve(false); }); transport.on('reconnected', () => { this.logger.log('info', 'Transport reconnected, waiting for ready event'); this.emit('reconnected'); }); // Connect the transport transport.connect().then(() => { // Wait for the ready event from the protocol layer const readyTimeout = setTimeout(() => { this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`); this.kill(); resolve(false); }, this.options.readyTimeoutMs); this.once(`management:${this.options.readyEventName}`, () => { clearTimeout(readyTimeout); this.isRunning = true; this.logger.log('info', `Bridge connected to ${this.options.binaryName}`); this.emit('ready'); resolve(true); }); }).catch((err: Error) => { this.logger.log('error', `Transport connect failed: ${err.message}`); resolve(false); }); } catch (err: any) { this.logger.log('error', `Failed to connect: ${err.message}`); resolve(false); } }); } /** * Send a typed command to the Rust process and wait for the response. */ public async sendCommand( method: K, params: TCommands[K]['params'], ): Promise { if (!this.transport?.connected || !this.isRunning) { throw new Error(`${this.options.binaryName} bridge is not running`); } const id = `req_${++this.requestCounter}`; const request: IManagementRequest = { id, method, params }; const json = JSON.stringify(request); // Check outbound payload size const byteLength = Buffer.byteLength(json, 'utf8'); if (byteLength > this.options.maxPayloadSize) { throw new Error( `Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})` ); } return new Promise((resolve, reject) => { const timer = setTimeout(() => { this.pendingRequests.delete(id); reject(new Error(`Command '${method}' timed out after ${this.options.requestTimeoutMs}ms`)); }, this.options.requestTimeoutMs); this.pendingRequests.set(id, { resolve, reject, timer }); this.transport!.write(json + '\n').catch((err) => { clearTimeout(timer); this.pendingRequests.delete(id); reject(new Error(`Failed to write to transport: ${err.message}`)); }); }); } /** * Send a streaming command to the Rust process. * Returns a StreamingResponse that yields chunks via `for await...of` * and exposes `.result` for the final response. */ public sendCommandStreaming>( method: K, params: TCommands[K]['params'], ): StreamingResponse, TCommands[K]['result']> { const streaming = new StreamingResponse, TCommands[K]['result']>(); if (!this.transport?.connected || !this.isRunning) { streaming.fail(new Error(`${this.options.binaryName} bridge is not running`)); return streaming; } const id = `req_${++this.requestCounter}`; const request: IManagementRequest = { id, method, params }; const json = JSON.stringify(request); const byteLength = Buffer.byteLength(json, 'utf8'); if (byteLength > this.options.maxPayloadSize) { streaming.fail( new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`) ); return streaming; } const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs; const timer = setTimeout(() => { this.pendingRequests.delete(id); streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`)); }, timeoutMs); this.pendingRequests.set(id, { resolve: (result: any) => streaming.finish(result), reject: (error: Error) => streaming.fail(error), timer, streaming, }); this.transport!.write(json + '\n').catch((err) => { clearTimeout(timer); this.pendingRequests.delete(id); streaming.fail(new Error(`Failed to write to transport: ${err.message}`)); }); return streaming; } /** * Kill the connection and clean up all resources. * For stdio: kills the child process (SIGTERM, then SIGKILL). * For socket: closes the socket connection (does NOT kill the daemon). */ public kill(): void { if (this.transport) { const transport = this.transport; this.transport = null; this.isRunning = false; // Reject pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error(`${this.options.binaryName} process killed`)); } this.pendingRequests.clear(); transport.removeAllListeners(); transport.disconnect(); } } /** * Whether the bridge is currently running. */ public get running(): boolean { return this.isRunning; } private handleLine(line: string): void { if (!line) return; let parsed: any; try { parsed = JSON.parse(line); } catch { this.logger.log('warn', `Non-JSON output: ${line}`); return; } // Check if it's an event (has 'event' field, no 'id') if ('event' in parsed && !('id' in parsed)) { const event = parsed as IManagementEvent; this.emit(`management:${event.event}`, event.data); return; } // Stream chunk (has 'id' + stream === true + 'data') if ('id' in parsed && parsed.stream === true && 'data' in parsed) { const pending = this.pendingRequests.get(parsed.id); if (pending?.streaming) { // Reset inactivity timeout clearTimeout(pending.timer); const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs; pending.timer = setTimeout(() => { this.pendingRequests.delete(parsed.id); pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`)); }, timeoutMs); pending.streaming.pushChunk(parsed.data); } return; } // Otherwise it's a response (has 'id' field) if ('id' in parsed) { const response = parsed as IManagementResponse; const pending = this.pendingRequests.get(response.id); if (pending) { clearTimeout(pending.timer); this.pendingRequests.delete(response.id); if (response.success) { pending.resolve(response.result); } else { pending.reject(new Error(response.error || 'Unknown error from Rust process')); } } } } private cleanup(): void { this.isRunning = false; this.transport = null; // Reject all pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error(`${this.options.binaryName} process exited`)); } this.pendingRequests.clear(); } }