feat(transport): introduce transport abstraction and socket-mode support for RustBridge

This commit is contained in:
2026-02-26 08:44:28 +00:00
parent 0c39e157c2
commit deda8cc4ee
14 changed files with 1064 additions and 150 deletions

View File

@@ -1,15 +1,19 @@
import * as plugins from './plugins.js';
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
import { StreamingResponse } from './classes.streamingresponse.js';
import { StdioTransport } from './classes.stdiotransport.js';
import { SocketTransport } from './classes.sockettransport.js';
import type {
IRustBridgeOptions,
IRustBridgeLogger,
ISocketConnectOptions,
TCommandMap,
IManagementRequest,
IManagementResponse,
IManagementEvent,
TStreamingCommandKeys,
TExtractChunk,
IRustTransport,
} from './interfaces/index.js';
const defaultLogger: IRustBridgeLogger = {
@@ -18,7 +22,8 @@ const defaultLogger: IRustBridgeLogger = {
/**
* Generic bridge between TypeScript and a Rust binary.
* Communicates via JSON-over-stdin/stdout IPC protocol.
* Communicates via JSON-over-stdin/stdout IPC protocol (stdio mode)
* or JSON-over-Unix-socket/named-pipe (socket mode).
*
* @typeParam TCommands - Map of command names to their param/result types
*/
@@ -26,9 +31,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
private locator: RustBinaryLocator;
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
private logger: IRustBridgeLogger;
private childProcess: plugins.childProcess.ChildProcess | null = null;
private stdoutBuffer: Buffer = Buffer.alloc(0);
private stderrRemainder: string = '';
private transport: IRustTransport | null = null;
private pendingRequests = new Map<string, {
resolve: (value: any) => void;
reject: (error: Error) => void;
@@ -63,71 +66,93 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
return false;
}
const transport = new StdioTransport({
binaryPath: this.binaryPath,
cliArgs: this.options.cliArgs,
env: this.options.env,
maxPayloadSize: this.options.maxPayloadSize,
logger: this.logger,
});
return this.connectWithTransport(transport);
}
/**
* Connect to an already-running Rust daemon via Unix socket or named pipe.
* Returns true if the connection was established and the daemon signaled readiness.
*
* @param socketPath - Path to Unix socket or Windows named pipe
* @param socketOptions - Optional socket connection options (reconnect, etc.)
*/
public async connect(socketPath: string, socketOptions?: ISocketConnectOptions): Promise<boolean> {
const transport = new SocketTransport({
socketPath,
maxPayloadSize: this.options.maxPayloadSize,
logger: this.logger,
autoReconnect: socketOptions?.autoReconnect,
reconnectBaseDelayMs: socketOptions?.reconnectBaseDelayMs,
reconnectMaxDelayMs: socketOptions?.reconnectMaxDelayMs,
maxReconnectAttempts: socketOptions?.maxReconnectAttempts,
});
return this.connectWithTransport(transport);
}
/**
* Internal: wire up any transport and wait for the ready handshake.
*/
private connectWithTransport(transport: IRustTransport): Promise<boolean> {
return new Promise<boolean>((resolve) => {
try {
const env = this.options.env
? { ...process.env, ...this.options.env }
: { ...process.env };
this.transport = transport;
this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, {
stdio: ['pipe', 'pipe', 'pipe'],
env,
// Wire transport events
transport.on('line', (line: string) => this.handleLine(line));
transport.on('stderr', (line: string) => {
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
this.emit('stderr', line);
});
// Handle stderr with cross-chunk buffering
this.childProcess.stderr?.on('data', (data: Buffer) => {
this.stderrRemainder += data.toString();
const lines = this.stderrRemainder.split('\n');
// Keep the last element (incomplete line) as remainder
this.stderrRemainder = lines.pop()!;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`);
this.emit('stderr', trimmed);
}
}
});
// Handle stdout via Buffer-based newline scanner
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
this.handleStdoutChunk(chunk);
});
// Handle process exit
this.childProcess.on('exit', (code, signal) => {
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
// Flush any remaining stderr
if (this.stderrRemainder.trim()) {
this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`);
this.emit('stderr', this.stderrRemainder.trim());
}
transport.on('close', (...args: any[]) => {
this.logger.log('info', `Transport closed`);
this.cleanup();
this.emit('exit', code, signal);
this.emit('exit', ...args);
});
this.childProcess.on('error', (err) => {
this.logger.log('error', `Process error: ${err.message}`);
transport.on('error', (err: Error) => {
this.logger.log('error', `Transport error: ${err.message}`);
this.cleanup();
resolve(false);
});
// Wait for the ready event
const readyTimeout = setTimeout(() => {
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
this.kill();
resolve(false);
}, this.options.readyTimeoutMs);
transport.on('reconnected', () => {
this.logger.log('info', 'Transport reconnected, waiting for ready event');
this.emit('reconnected');
});
this.once(`management:${this.options.readyEventName}`, () => {
clearTimeout(readyTimeout);
this.isRunning = true;
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
this.emit('ready');
resolve(true);
// Connect the transport
transport.connect().then(() => {
// Wait for the ready event from the protocol layer
const readyTimeout = setTimeout(() => {
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
this.kill();
resolve(false);
}, this.options.readyTimeoutMs);
this.once(`management:${this.options.readyEventName}`, () => {
clearTimeout(readyTimeout);
this.isRunning = true;
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
this.emit('ready');
resolve(true);
});
}).catch((err: Error) => {
this.logger.log('error', `Transport connect failed: ${err.message}`);
resolve(false);
});
} catch (err: any) {
this.logger.log('error', `Failed to spawn: ${err.message}`);
this.logger.log('error', `Failed to connect: ${err.message}`);
resolve(false);
}
});
@@ -140,7 +165,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
method: K,
params: TCommands[K]['params'],
): Promise<TCommands[K]['result']> {
if (!this.childProcess || !this.isRunning) {
if (!this.transport?.connected || !this.isRunning) {
throw new Error(`${this.options.binaryName} bridge is not running`);
}
@@ -164,10 +189,10 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
this.pendingRequests.set(id, { resolve, reject, timer });
this.writeToStdin(json + '\n').catch((err) => {
this.transport!.write(json + '\n').catch((err) => {
clearTimeout(timer);
this.pendingRequests.delete(id);
reject(new Error(`Failed to write to stdin: ${err.message}`));
reject(new Error(`Failed to write to transport: ${err.message}`));
});
});
}
@@ -183,7 +208,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
if (!this.childProcess || !this.isRunning) {
if (!this.transport?.connected || !this.isRunning) {
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
return streaming;
}
@@ -213,28 +238,26 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
streaming,
});
this.writeToStdin(json + '\n').catch((err) => {
this.transport!.write(json + '\n').catch((err) => {
clearTimeout(timer);
this.pendingRequests.delete(id);
streaming.fail(new Error(`Failed to write to stdin: ${err.message}`));
streaming.fail(new Error(`Failed to write to transport: ${err.message}`));
});
return streaming;
}
/**
* Kill the Rust process and clean up all resources.
* Kill the connection and clean up all resources.
* For stdio: kills the child process (SIGTERM, then SIGKILL).
* For socket: closes the socket connection (does NOT kill the daemon).
*/
public kill(): void {
if (this.childProcess) {
const proc = this.childProcess;
this.childProcess = null;
if (this.transport) {
const transport = this.transport;
this.transport = null;
this.isRunning = false;
// Clear buffers
this.stdoutBuffer = Buffer.alloc(0);
this.stderrRemainder = '';
// Reject pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
@@ -242,27 +265,8 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
}
this.pendingRequests.clear();
// Remove all listeners
proc.removeAllListeners();
proc.stdout?.removeAllListeners();
proc.stderr?.removeAllListeners();
proc.stdin?.removeAllListeners();
// Kill the process
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
// Destroy stdio pipes
try { proc.stdin?.destroy(); } catch { /* ignore */ }
try { proc.stdout?.destroy(); } catch { /* ignore */ }
try { proc.stderr?.destroy(); } catch { /* ignore */ }
// Unref so Node doesn't wait
try { proc.unref(); } catch { /* ignore */ }
// Force kill after 5 seconds
setTimeout(() => {
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
}, 5000).unref();
transport.removeAllListeners();
transport.disconnect();
}
}
@@ -273,62 +277,6 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
return this.isRunning;
}
/**
* Buffer-based newline scanner for stdout chunks.
* Replaces readline to handle large payloads without buffering entire lines in a separate abstraction.
*/
private handleStdoutChunk(chunk: Buffer): void {
this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]);
let newlineIndex: number;
while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) {
const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex);
this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1);
if (lineBuffer.length > this.options.maxPayloadSize) {
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
continue;
}
const line = lineBuffer.toString('utf8').trim();
this.handleLine(line);
}
// If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM
if (this.stdoutBuffer.length > this.options.maxPayloadSize) {
this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`);
this.stdoutBuffer = Buffer.alloc(0);
}
}
/**
* Write data to stdin with backpressure support.
* Waits for drain if the internal buffer is full.
*/
private writeToStdin(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.childProcess?.stdin) {
reject(new Error('stdin not available'));
return;
}
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
// Wait for drain before resolving
this.childProcess.stdin.once('drain', () => {
resolve();
});
}
});
}
private handleLine(line: string): void {
if (!line) return;
@@ -381,9 +329,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
private cleanup(): void {
this.isRunning = false;
this.childProcess = null;
this.stdoutBuffer = Buffer.alloc(0);
this.stderrRemainder = '';
this.transport = null;
// Reject all pending requests
for (const [, pending] of this.pendingRequests) {