Files
smartrust/ts/classes.rustbridge.ts

396 lines
13 KiB
TypeScript
Raw Permalink Normal View History

import * as plugins from './plugins.js';
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
import { StreamingResponse } from './classes.streamingresponse.js';
import type {
IRustBridgeOptions,
IRustBridgeLogger,
TCommandMap,
IManagementRequest,
IManagementResponse,
IManagementEvent,
TStreamingCommandKeys,
TExtractChunk,
} from './interfaces/index.js';
const defaultLogger: IRustBridgeLogger = {
log() {},
};
/**
* Generic bridge between TypeScript and a Rust binary.
* Communicates via JSON-over-stdin/stdout IPC protocol.
*
* @typeParam TCommands - Map of command names to their param/result types
*/
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
private locator: RustBinaryLocator;
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
private logger: IRustBridgeLogger;
private childProcess: plugins.childProcess.ChildProcess | null = null;
private stdoutBuffer: Buffer = Buffer.alloc(0);
private stderrRemainder: string = '';
private pendingRequests = new Map<string, {
resolve: (value: any) => void;
reject: (error: Error) => void;
timer: ReturnType<typeof setTimeout>;
streaming?: StreamingResponse<any, any>;
}>();
private requestCounter = 0;
private isRunning = false;
private binaryPath: string | null = null;
constructor(options: IRustBridgeOptions) {
super();
this.logger = options.logger || defaultLogger;
this.options = {
cliArgs: ['--management'],
requestTimeoutMs: 30000,
readyTimeoutMs: 10000,
readyEventName: 'ready',
maxPayloadSize: 50 * 1024 * 1024,
...options,
};
this.locator = new RustBinaryLocator(options, this.logger);
}
/**
* Spawn the Rust binary and wait for it to signal readiness.
* Returns true if the binary was found and spawned successfully.
*/
public async spawn(): Promise<boolean> {
this.binaryPath = await this.locator.findBinary();
if (!this.binaryPath) {
return false;
}
return new Promise<boolean>((resolve) => {
try {
const env = this.options.env
? { ...process.env, ...this.options.env }
: { ...process.env };
this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, {
stdio: ['pipe', 'pipe', 'pipe'],
env,
});
// Handle stderr with cross-chunk buffering
this.childProcess.stderr?.on('data', (data: Buffer) => {
this.stderrRemainder += data.toString();
const lines = this.stderrRemainder.split('\n');
// Keep the last element (incomplete line) as remainder
this.stderrRemainder = lines.pop()!;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`);
this.emit('stderr', trimmed);
}
}
});
// Handle stdout via Buffer-based newline scanner
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
this.handleStdoutChunk(chunk);
});
// Handle process exit
this.childProcess.on('exit', (code, signal) => {
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
// Flush any remaining stderr
if (this.stderrRemainder.trim()) {
this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`);
this.emit('stderr', this.stderrRemainder.trim());
}
this.cleanup();
this.emit('exit', code, signal);
});
this.childProcess.on('error', (err) => {
this.logger.log('error', `Process error: ${err.message}`);
this.cleanup();
resolve(false);
});
// Wait for the ready event
const readyTimeout = setTimeout(() => {
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
this.kill();
resolve(false);
}, this.options.readyTimeoutMs);
this.once(`management:${this.options.readyEventName}`, () => {
clearTimeout(readyTimeout);
this.isRunning = true;
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
this.emit('ready');
resolve(true);
});
} catch (err: any) {
this.logger.log('error', `Failed to spawn: ${err.message}`);
resolve(false);
}
});
}
/**
* Send a typed command to the Rust process and wait for the response.
*/
public async sendCommand<K extends string & keyof TCommands>(
method: K,
params: TCommands[K]['params'],
): Promise<TCommands[K]['result']> {
if (!this.childProcess || !this.isRunning) {
throw new Error(`${this.options.binaryName} bridge is not running`);
}
const id = `req_${++this.requestCounter}`;
const request: IManagementRequest = { id, method, params };
const json = JSON.stringify(request);
// Check outbound payload size
const byteLength = Buffer.byteLength(json, 'utf8');
if (byteLength > this.options.maxPayloadSize) {
throw new Error(
`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`
);
}
return new Promise<TCommands[K]['result']>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`Command '${method}' timed out after ${this.options.requestTimeoutMs}ms`));
}, this.options.requestTimeoutMs);
this.pendingRequests.set(id, { resolve, reject, timer });
this.writeToStdin(json + '\n').catch((err) => {
clearTimeout(timer);
this.pendingRequests.delete(id);
reject(new Error(`Failed to write to stdin: ${err.message}`));
});
});
}
/**
* Send a streaming command to the Rust process.
* Returns a StreamingResponse that yields chunks via `for await...of`
* and exposes `.result` for the final response.
*/
public sendCommandStreaming<K extends string & TStreamingCommandKeys<TCommands>>(
method: K,
params: TCommands[K]['params'],
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
if (!this.childProcess || !this.isRunning) {
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
return streaming;
}
const id = `req_${++this.requestCounter}`;
const request: IManagementRequest = { id, method, params };
const json = JSON.stringify(request);
const byteLength = Buffer.byteLength(json, 'utf8');
if (byteLength > this.options.maxPayloadSize) {
streaming.fail(
new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`)
);
return streaming;
}
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`));
}, timeoutMs);
this.pendingRequests.set(id, {
resolve: (result: any) => streaming.finish(result),
reject: (error: Error) => streaming.fail(error),
timer,
streaming,
});
this.writeToStdin(json + '\n').catch((err) => {
clearTimeout(timer);
this.pendingRequests.delete(id);
streaming.fail(new Error(`Failed to write to stdin: ${err.message}`));
});
return streaming;
}
/**
* Kill the Rust process and clean up all resources.
*/
public kill(): void {
if (this.childProcess) {
const proc = this.childProcess;
this.childProcess = null;
this.isRunning = false;
// Clear buffers
this.stdoutBuffer = Buffer.alloc(0);
this.stderrRemainder = '';
// Reject pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error(`${this.options.binaryName} process killed`));
}
this.pendingRequests.clear();
// Remove all listeners
proc.removeAllListeners();
proc.stdout?.removeAllListeners();
proc.stderr?.removeAllListeners();
proc.stdin?.removeAllListeners();
// Kill the process
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
// Destroy stdio pipes
try { proc.stdin?.destroy(); } catch { /* ignore */ }
try { proc.stdout?.destroy(); } catch { /* ignore */ }
try { proc.stderr?.destroy(); } catch { /* ignore */ }
// Unref so Node doesn't wait
try { proc.unref(); } catch { /* ignore */ }
// Force kill after 5 seconds
setTimeout(() => {
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
}, 5000).unref();
}
}
/**
* Whether the bridge is currently running.
*/
public get running(): boolean {
return this.isRunning;
}
/**
* Buffer-based newline scanner for stdout chunks.
* Replaces readline to handle large payloads without buffering entire lines in a separate abstraction.
*/
private handleStdoutChunk(chunk: Buffer): void {
this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]);
let newlineIndex: number;
while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) {
const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex);
this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1);
if (lineBuffer.length > this.options.maxPayloadSize) {
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
continue;
}
const line = lineBuffer.toString('utf8').trim();
this.handleLine(line);
}
// If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM
if (this.stdoutBuffer.length > this.options.maxPayloadSize) {
this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`);
this.stdoutBuffer = Buffer.alloc(0);
}
}
/**
* Write data to stdin with backpressure support.
* Waits for drain if the internal buffer is full.
*/
private writeToStdin(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.childProcess?.stdin) {
reject(new Error('stdin not available'));
return;
}
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
// Wait for drain before resolving
this.childProcess.stdin.once('drain', () => {
resolve();
});
}
});
}
private handleLine(line: string): void {
if (!line) return;
let parsed: any;
try {
parsed = JSON.parse(line);
} catch {
this.logger.log('warn', `Non-JSON output: ${line}`);
return;
}
// Check if it's an event (has 'event' field, no 'id')
if ('event' in parsed && !('id' in parsed)) {
const event = parsed as IManagementEvent;
this.emit(`management:${event.event}`, event.data);
return;
}
// Stream chunk (has 'id' + stream === true + 'data')
if ('id' in parsed && parsed.stream === true && 'data' in parsed) {
const pending = this.pendingRequests.get(parsed.id);
if (pending?.streaming) {
// Reset inactivity timeout
clearTimeout(pending.timer);
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
pending.timer = setTimeout(() => {
this.pendingRequests.delete(parsed.id);
pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`));
}, timeoutMs);
pending.streaming.pushChunk(parsed.data);
}
return;
}
// Otherwise it's a response (has 'id' field)
if ('id' in parsed) {
const response = parsed as IManagementResponse;
const pending = this.pendingRequests.get(response.id);
if (pending) {
clearTimeout(pending.timer);
this.pendingRequests.delete(response.id);
if (response.success) {
pending.resolve(response.result);
} else {
pending.reject(new Error(response.error || 'Unknown error from Rust process'));
}
}
}
}
private cleanup(): void {
this.isRunning = false;
this.childProcess = null;
this.stdoutBuffer = Buffer.alloc(0);
this.stderrRemainder = '';
// Reject all pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error(`${this.options.binaryName} process exited`));
}
this.pendingRequests.clear();
}
}