feat(rustbridge): add streaming responses and robust large-payload/backpressure handling to RustBridge
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartrust',
|
||||
version: '1.1.2',
|
||||
version: '1.2.0',
|
||||
description: 'a bridge between JS engines and rust'
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from './plugins.js';
|
||||
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
import { StreamingResponse } from './classes.streamingresponse.js';
|
||||
import type {
|
||||
IRustBridgeOptions,
|
||||
IRustBridgeLogger,
|
||||
@@ -7,6 +8,8 @@ import type {
|
||||
IManagementRequest,
|
||||
IManagementResponse,
|
||||
IManagementEvent,
|
||||
TStreamingCommandKeys,
|
||||
TExtractChunk,
|
||||
} from './interfaces/index.js';
|
||||
|
||||
const defaultLogger: IRustBridgeLogger = {
|
||||
@@ -21,14 +24,16 @@ const defaultLogger: IRustBridgeLogger = {
|
||||
*/
|
||||
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
|
||||
private locator: RustBinaryLocator;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName'>> & IRustBridgeOptions;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
|
||||
private logger: IRustBridgeLogger;
|
||||
private childProcess: plugins.childProcess.ChildProcess | null = null;
|
||||
private readlineInterface: plugins.readline.Interface | null = null;
|
||||
private stdoutBuffer: Buffer = Buffer.alloc(0);
|
||||
private stderrRemainder: string = '';
|
||||
private pendingRequests = new Map<string, {
|
||||
resolve: (value: any) => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
streaming?: StreamingResponse<any, any>;
|
||||
}>();
|
||||
private requestCounter = 0;
|
||||
private isRunning = false;
|
||||
@@ -42,6 +47,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
requestTimeoutMs: 30000,
|
||||
readyTimeoutMs: 10000,
|
||||
readyEventName: 'ready',
|
||||
maxPayloadSize: 50 * 1024 * 1024,
|
||||
...options,
|
||||
};
|
||||
this.locator = new RustBinaryLocator(options, this.logger);
|
||||
@@ -68,24 +74,34 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
env,
|
||||
});
|
||||
|
||||
// Handle stderr
|
||||
// Handle stderr with cross-chunk buffering
|
||||
this.childProcess.stderr?.on('data', (data: Buffer) => {
|
||||
const lines = data.toString().split('\n').filter((l: string) => l.trim());
|
||||
this.stderrRemainder += data.toString();
|
||||
const lines = this.stderrRemainder.split('\n');
|
||||
// Keep the last element (incomplete line) as remainder
|
||||
this.stderrRemainder = lines.pop()!;
|
||||
for (const line of lines) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
|
||||
this.emit('stderr', line);
|
||||
const trimmed = line.trim();
|
||||
if (trimmed) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`);
|
||||
this.emit('stderr', trimmed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stdout via readline for line-delimited JSON
|
||||
this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! });
|
||||
this.readlineInterface.on('line', (line: string) => {
|
||||
this.handleLine(line.trim());
|
||||
// Handle stdout via Buffer-based newline scanner
|
||||
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
|
||||
this.handleStdoutChunk(chunk);
|
||||
});
|
||||
|
||||
// Handle process exit
|
||||
this.childProcess.on('exit', (code, signal) => {
|
||||
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
|
||||
// Flush any remaining stderr
|
||||
if (this.stderrRemainder.trim()) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`);
|
||||
this.emit('stderr', this.stderrRemainder.trim());
|
||||
}
|
||||
this.cleanup();
|
||||
this.emit('exit', code, signal);
|
||||
});
|
||||
@@ -130,6 +146,15 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
const json = JSON.stringify(request);
|
||||
|
||||
// Check outbound payload size
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
if (byteLength > this.options.maxPayloadSize) {
|
||||
throw new Error(
|
||||
`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`
|
||||
);
|
||||
}
|
||||
|
||||
return new Promise<TCommands[K]['result']>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
@@ -139,17 +164,64 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
|
||||
this.pendingRequests.set(id, { resolve, reject, timer });
|
||||
|
||||
const json = JSON.stringify(request) + '\n';
|
||||
this.childProcess!.stdin!.write(json, (err) => {
|
||||
if (err) {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
}
|
||||
this.writeToStdin(json + '\n').catch((err) => {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a streaming command to the Rust process.
|
||||
* Returns a StreamingResponse that yields chunks via `for await...of`
|
||||
* and exposes `.result` for the final response.
|
||||
*/
|
||||
public sendCommandStreaming<K extends string & TStreamingCommandKeys<TCommands>>(
|
||||
method: K,
|
||||
params: TCommands[K]['params'],
|
||||
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
|
||||
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
|
||||
|
||||
if (!this.childProcess || !this.isRunning) {
|
||||
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
|
||||
return streaming;
|
||||
}
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
const json = JSON.stringify(request);
|
||||
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
if (byteLength > this.options.maxPayloadSize) {
|
||||
streaming.fail(
|
||||
new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`)
|
||||
);
|
||||
return streaming;
|
||||
}
|
||||
|
||||
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||
const timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(id);
|
||||
streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
|
||||
this.pendingRequests.set(id, {
|
||||
resolve: (result: any) => streaming.finish(result),
|
||||
reject: (error: Error) => streaming.fail(error),
|
||||
timer,
|
||||
streaming,
|
||||
});
|
||||
|
||||
this.writeToStdin(json + '\n').catch((err) => {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
streaming.fail(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
});
|
||||
|
||||
return streaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill the Rust process and clean up all resources.
|
||||
*/
|
||||
@@ -159,11 +231,9 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
this.childProcess = null;
|
||||
this.isRunning = false;
|
||||
|
||||
// Close readline
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
// Clear buffers
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
this.stderrRemainder = '';
|
||||
|
||||
// Reject pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
@@ -203,6 +273,62 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
return this.isRunning;
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer-based newline scanner for stdout chunks.
|
||||
* Replaces readline to handle large payloads without buffering entire lines in a separate abstraction.
|
||||
*/
|
||||
private handleStdoutChunk(chunk: Buffer): void {
|
||||
this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]);
|
||||
|
||||
let newlineIndex: number;
|
||||
while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) {
|
||||
const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex);
|
||||
this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1);
|
||||
|
||||
if (lineBuffer.length > this.options.maxPayloadSize) {
|
||||
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const line = lineBuffer.toString('utf8').trim();
|
||||
this.handleLine(line);
|
||||
}
|
||||
|
||||
// If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM
|
||||
if (this.stdoutBuffer.length > this.options.maxPayloadSize) {
|
||||
this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`);
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to stdin with backpressure support.
|
||||
* Waits for drain if the internal buffer is full.
|
||||
*/
|
||||
private writeToStdin(data: string): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (!this.childProcess?.stdin) {
|
||||
reject(new Error('stdin not available'));
|
||||
return;
|
||||
}
|
||||
|
||||
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
if (canContinue) {
|
||||
resolve();
|
||||
} else {
|
||||
// Wait for drain before resolving
|
||||
this.childProcess.stdin.once('drain', () => {
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private handleLine(line: string): void {
|
||||
if (!line) return;
|
||||
|
||||
@@ -221,6 +347,22 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
return;
|
||||
}
|
||||
|
||||
// Stream chunk (has 'id' + stream === true + 'data')
|
||||
if ('id' in parsed && parsed.stream === true && 'data' in parsed) {
|
||||
const pending = this.pendingRequests.get(parsed.id);
|
||||
if (pending?.streaming) {
|
||||
// Reset inactivity timeout
|
||||
clearTimeout(pending.timer);
|
||||
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||
pending.timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(parsed.id);
|
||||
pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`));
|
||||
}, timeoutMs);
|
||||
pending.streaming.pushChunk(parsed.data);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise it's a response (has 'id' field)
|
||||
if ('id' in parsed) {
|
||||
const response = parsed as IManagementResponse;
|
||||
@@ -240,11 +382,8 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
private cleanup(): void {
|
||||
this.isRunning = false;
|
||||
this.childProcess = null;
|
||||
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
this.stderrRemainder = '';
|
||||
|
||||
// Reject all pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
|
||||
110
ts/classes.streamingresponse.ts
Normal file
110
ts/classes.streamingresponse.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Represents a streaming response from a Rust bridge command.
|
||||
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
|
||||
* and exposes `.result` for the final response once the stream ends.
|
||||
*
|
||||
* @typeParam TChunk - Type of each streamed chunk
|
||||
* @typeParam TResult - Type of the final result
|
||||
*/
|
||||
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
|
||||
/** Resolves with the final result when the stream ends successfully. */
|
||||
public readonly result: Promise<TResult>;
|
||||
|
||||
private resolveResult!: (value: TResult) => void;
|
||||
private rejectResult!: (error: Error) => void;
|
||||
|
||||
/** Buffered chunks not yet consumed by the iterator. */
|
||||
private buffer: TChunk[] = [];
|
||||
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
|
||||
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
|
||||
/** Waiting consumer reject callback. */
|
||||
private waitingReject: ((error: Error) => void) | null = null;
|
||||
|
||||
private done = false;
|
||||
private error: Error | null = null;
|
||||
|
||||
constructor() {
|
||||
this.result = new Promise<TResult>((resolve, reject) => {
|
||||
this.resolveResult = resolve;
|
||||
this.rejectResult = reject;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a chunk into the stream. Called internally by RustBridge.
|
||||
*/
|
||||
public pushChunk(chunk: TChunk): void {
|
||||
if (this.done) return;
|
||||
|
||||
if (this.waiting) {
|
||||
// A consumer is waiting — deliver immediately
|
||||
const resolve = this.waiting;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
resolve({ value: chunk, done: false });
|
||||
} else {
|
||||
// No consumer waiting — buffer the chunk
|
||||
this.buffer.push(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the stream successfully with a final result. Called internally by RustBridge.
|
||||
*/
|
||||
public finish(result: TResult): void {
|
||||
if (this.done) return;
|
||||
this.done = true;
|
||||
this.resolveResult(result);
|
||||
|
||||
// If a consumer is waiting, signal end of iteration
|
||||
if (this.waiting) {
|
||||
const resolve = this.waiting;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
resolve({ value: undefined as any, done: true });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the stream with an error. Called internally by RustBridge.
|
||||
*/
|
||||
public fail(error: Error): void {
|
||||
if (this.done) return;
|
||||
this.done = true;
|
||||
this.error = error;
|
||||
this.rejectResult(error);
|
||||
|
||||
// If a consumer is waiting, reject it
|
||||
if (this.waitingReject) {
|
||||
const reject = this.waitingReject;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
|
||||
return {
|
||||
next: (): Promise<IteratorResult<TChunk>> => {
|
||||
// If there are buffered chunks, deliver one
|
||||
if (this.buffer.length > 0) {
|
||||
return Promise.resolve({ value: this.buffer.shift()!, done: false });
|
||||
}
|
||||
|
||||
// If the stream is done, signal end
|
||||
if (this.done) {
|
||||
if (this.error) {
|
||||
return Promise.reject(this.error);
|
||||
}
|
||||
return Promise.resolve({ value: undefined as any, done: true });
|
||||
}
|
||||
|
||||
// No buffered chunks and not done — wait for the next push
|
||||
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
|
||||
this.waiting = resolve;
|
||||
this.waitingReject = reject;
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
export { RustBridge } from './classes.rustbridge.js';
|
||||
export { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
export { StreamingResponse } from './classes.streamingresponse.js';
|
||||
export * from './interfaces/index.js';
|
||||
|
||||
@@ -39,4 +39,9 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions {
|
||||
readyEventName?: string;
|
||||
/** Optional logger instance */
|
||||
logger?: IRustBridgeLogger;
|
||||
/** Maximum message size in bytes (default: 50MB). Messages exceeding this are rejected. */
|
||||
maxPayloadSize?: number;
|
||||
/** Inactivity timeout for streaming commands in ms (default: same as requestTimeoutMs).
|
||||
* Resets on each chunk received. */
|
||||
streamTimeoutMs?: number;
|
||||
}
|
||||
|
||||
@@ -38,3 +38,25 @@ export interface ICommandDefinition<TParams = any, TResult = any> {
|
||||
* Used to type-safe the bridge's sendCommand method.
|
||||
*/
|
||||
export type TCommandMap = Record<string, ICommandDefinition>;
|
||||
|
||||
/**
|
||||
* Stream chunk message received from the Rust binary during a streaming command.
|
||||
*/
|
||||
export interface IManagementStreamChunk {
|
||||
id: string;
|
||||
stream: true;
|
||||
data: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract keys from a command map whose definitions include a `chunk` field,
|
||||
* indicating they support streaming responses.
|
||||
*/
|
||||
export type TStreamingCommandKeys<TCommands extends TCommandMap> = {
|
||||
[K in keyof TCommands]: TCommands[K] extends { chunk: any } ? K : never;
|
||||
}[keyof TCommands];
|
||||
|
||||
/**
|
||||
* Extract the chunk type from a command definition that has a `chunk` field.
|
||||
*/
|
||||
export type TExtractChunk<TDef> = TDef extends { chunk: infer C } ? C : never;
|
||||
|
||||
@@ -2,11 +2,10 @@
|
||||
import * as path from 'path';
|
||||
import * as fs from 'fs';
|
||||
import * as childProcess from 'child_process';
|
||||
import * as readline from 'readline';
|
||||
import * as events from 'events';
|
||||
import * as url from 'url';
|
||||
|
||||
export { path, fs, childProcess, readline, events, url };
|
||||
export { path, fs, childProcess, events, url };
|
||||
|
||||
// @push.rocks scope
|
||||
import * as smartpath from '@push.rocks/smartpath';
|
||||
|
||||
Reference in New Issue
Block a user