diff --git a/changelog.md b/changelog.md index 384719b..aadbc07 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,18 @@ # Changelog +## 2026-02-11 - 1.2.0 - feat(rustbridge) +add streaming responses and robust large-payload/backpressure handling to RustBridge + +- Introduce StreamingResponse type and export it (for-await-of iterator + .result promise) +- Add sendCommandStreaming API to send streaming commands and receive chunks + final result +- Implement buffer-based stdout newline scanner to handle large messages and avoid readline limits +- Add backpressure-aware writeToStdin to wait for drain when writing large outbound payloads +- Add maxPayloadSize option and enforce outbound/inbound size checks to prevent OOMs +- Add streamTimeoutMs (inactivity timeout) and reset timeout on each received chunk +- Improve stderr handling (cross-chunk buffering and trimmed emits) +- Update mock test binary and extensive tests for streaming, large payloads, concurrency, and error cases +- Add TypeScript types for streaming commands (TStreamingCommandKeys, TExtractChunk, IManagementStreamChunk) + ## 2026-02-10 - 1.1.2 - fix(rust-binary-locator) use import.meta.resolve and url.fileURLToPath to locate bundled Rust binary in ESM environments diff --git a/readme.md b/readme.md index c1bb913..251511b 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,6 @@ # @push.rocks/smartrust -A type-safe, standardized bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC. +A type-safe, production-ready bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC — with support for request/response, streaming, and event patterns. ## Issue Reporting and Security @@ -16,18 +16,19 @@ pnpm install @push.rocks/smartrust ## Overview 🔭 -`@push.rocks/smartrust` provides a production-ready bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation with timeouts, event streaming, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing. +`@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing. -### Why? +### Why? 🤔 If you're integrating Rust into a Node.js project, you'll inevitably need: - A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages) - A way to **spawn** it and establish reliable two-way communication - **Type-safe** request/response patterns with proper error handling +- **Streaming responses** for progressive data processing, log tailing, or chunked transfers - **Event streaming** from Rust to TypeScript - **Graceful lifecycle management** (ready detection, clean shutdown, force kill) -`smartrust` wraps all of this into two classes: `RustBridge` and `RustBinaryLocator`. +`smartrust` wraps all of this into three classes: `RustBridge`, `RustBinaryLocator`, and `StreamingResponse`. ## Usage 🚀 @@ -38,8 +39,9 @@ If you're integrating Rust into a Node.js project, you'll inevitably need: | Direction | Format | Description | |-----------|--------|-------------| | **TS → Rust** (Request) | `{"id": "req_1", "method": "start", "params": {...}}` | Command with unique ID | -| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Response correlated by ID | +| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Final response correlated by ID | | **Rust → TS** (Error) | `{"id": "req_1", "success": false, "error": "msg"}` | Error correlated by ID | +| **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) | | **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) | Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. That's it. Stderr is free for logging. @@ -49,7 +51,7 @@ Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. Th Start by defining a type map of commands your Rust binary supports: ```typescript -import { RustBridge, type ICommandDefinition } from '@push.rocks/smartrust'; +import { RustBridge } from '@push.rocks/smartrust'; // Define your command types type TMyCommands = { @@ -92,7 +94,91 @@ bridge.on('management:configChanged', (data) => { bridge.kill(); ``` -### Binary Locator +### Streaming Commands 🌊 + +For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output. + +#### Defining Streaming Commands + +Add a `chunk` field to your command type definition to mark it as streamable: + +```typescript +type TMyCommands = { + // Regular command (request → response) + ping: { params: {}; result: { pong: boolean } }; + + // Streaming command (request → chunks... → final result) + processData: { params: { count: number }; chunk: { index: number; progress: number }; result: { totalProcessed: number } }; + tailLogs: { params: { lines: number }; chunk: string; result: { linesRead: number } }; +}; +``` + +#### Consuming Streams + +```typescript +// Returns a StreamingResponse immediately (does NOT block) +const stream = bridge.sendCommandStreaming('processData', { count: 1000 }); + +// Consume chunks with for-await-of +for await (const chunk of stream) { + console.log(`Processing item ${chunk.index}, progress: ${chunk.progress}%`); +} + +// Get the final result after all chunks are consumed +const result = await stream.result; +console.log(`Done! Processed ${result.totalProcessed} items`); +``` + +#### Error Handling in Streams + +Errors propagate to both the iterator and the `.result` promise: + +```typescript +const stream = bridge.sendCommandStreaming('processData', { count: 100 }); + +try { + for await (const chunk of stream) { + console.log(chunk); + } +} catch (err) { + console.error('Stream failed:', err.message); +} + +// .result also rejects on error +try { + await stream.result; +} catch (err) { + console.error('Same error here:', err.message); +} +``` + +#### Stream Timeout + +By default, streaming commands use the same timeout as regular commands (`requestTimeoutMs`). The timeout **resets on each chunk received**, so it acts as an inactivity timeout rather than an absolute timeout. You can configure it separately: + +```typescript +const bridge = new RustBridge({ + binaryName: 'my-server', + requestTimeoutMs: 30000, // regular command timeout: 30s + streamTimeoutMs: 60000, // streaming inactivity timeout: 60s +}); +``` + +#### Implementing Streaming on the Rust Side + +Your Rust binary sends stream chunks by writing lines with `"stream": true` before the final response: + +```rust +// For each chunk: +println!(r#"{{"id":"{}","stream":true,"data":{{"index":{},"progress":{}}}}}"#, req.id, i, pct); +io::stdout().flush().unwrap(); + +// When done, send the final response (same as non-streaming): +println!(r#"{{"id":"{}","success":true,"result":{{"totalProcessed":{}}}}}"#, req.id, total); +io::stdout().flush().unwrap(); +``` + +### Binary Locator 🔍 The `RustBinaryLocator` searches for your binary using a priority-ordered strategy: @@ -119,7 +205,7 @@ const binaryPath = await locator.findBinary(); // Result is cached — call clearCache() to force re-search ``` -### Configuration Reference +### Configuration Reference ⚙️ The `RustBridge` constructor accepts an `IRustBridgeOptions` object: @@ -136,14 +222,16 @@ const bridge = new RustBridge({ // --- Bridge Options --- cliArgs: ['--management'], // optional: args passed to binary (default: ['--management']) requestTimeoutMs: 30000, // optional: per-request timeout (default: 30000) + streamTimeoutMs: 30000, // optional: streaming inactivity timeout (default: requestTimeoutMs) readyTimeoutMs: 10000, // optional: ready event timeout (default: 10000) + maxPayloadSize: 50 * 1024 * 1024, // optional: max message size in bytes (default: 50MB) env: { RUST_LOG: 'debug' }, // optional: extra env vars for the child process readyEventName: 'ready', // optional: name of the ready event (default: 'ready') logger: myLogger, // optional: logger implementing IRustBridgeLogger }); ``` -### Events +### Events 📡 `RustBridge` extends `EventEmitter` and emits the following events: @@ -154,7 +242,7 @@ const bridge = new RustBridge({ | `stderr` | `string` | A line from the binary's stderr | | `management:` | `any` | Custom event from Rust (e.g. `management:configChanged`) | -### Custom Logger +### Custom Logger 📝 Plug in your own logger by implementing the `IRustBridgeLogger` interface: @@ -173,7 +261,7 @@ const bridge = new RustBridge({ }); ``` -### Writing the Rust Side +### Writing the Rust Side 🦀 Your Rust binary needs to implement a simple protocol: @@ -186,9 +274,11 @@ Your Rust binary needs to implement a simple protocol: 3. **Write JSON responses to stdout**, each as `{"id": "...", "success": true, "result": {...}}\n` -4. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout +4. **For streaming commands**, write zero or more `{"id": "...", "stream": true, "data": {...}}\n` chunks before the final response -5. **Use stderr** for logging — it won't interfere with the IPC protocol +5. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout + +6. **Use stderr** for logging — it won't interfere with the IPC protocol Here's a minimal Rust skeleton: @@ -213,6 +303,13 @@ struct Response { error: Option, } +#[derive(Serialize)] +struct StreamChunk { + id: String, + stream: bool, + data: serde_json::Value, +} + fn main() { // Signal ready println!(r#"{{"event":"ready","data":{{"version":"1.0.0"}}}}"#); @@ -223,24 +320,50 @@ fn main() { let line = line.unwrap(); let req: Request = serde_json::from_str(&line).unwrap(); - let response = match req.method.as_str() { - "ping" => Response { - id: req.id, - success: true, - result: Some(serde_json::json!({"pong": true})), - error: None, - }, - _ => Response { - id: req.id, - success: false, - result: None, - error: Some(format!("Unknown method: {}", req.method)), - }, - }; - - let json = serde_json::to_string(&response).unwrap(); - println!("{json}"); - io::stdout().flush().unwrap(); + match req.method.as_str() { + "ping" => { + let resp = Response { + id: req.id, + success: true, + result: Some(serde_json::json!({"pong": true})), + error: None, + }; + println!("{}", serde_json::to_string(&resp).unwrap()); + io::stdout().flush().unwrap(); + } + "processData" => { + let count = req.params["count"].as_u64().unwrap_or(0); + // Send stream chunks + for i in 0..count { + let chunk = StreamChunk { + id: req.id.clone(), + stream: true, + data: serde_json::json!({"index": i, "progress": ((i+1) * 100 / count)}), + }; + println!("{}", serde_json::to_string(&chunk).unwrap()); + io::stdout().flush().unwrap(); + } + // Send final response + let resp = Response { + id: req.id, + success: true, + result: Some(serde_json::json!({"totalProcessed": count})), + error: None, + }; + println!("{}", serde_json::to_string(&resp).unwrap()); + io::stdout().flush().unwrap(); + } + _ => { + let resp = Response { + id: req.id, + success: false, + result: None, + error: Some(format!("Unknown method: {}", req.method)), + }; + println!("{}", serde_json::to_string(&resp).unwrap()); + io::stdout().flush().unwrap(); + } + } } } ``` @@ -254,9 +377,17 @@ fn main() { | `constructor` | `new RustBridge(options: IRustBridgeOptions)` | Create a new bridge instance | | `spawn()` | `Promise` | Spawn the binary and wait for ready; returns `false` on failure | | `sendCommand(method, params)` | `Promise` | Send a typed command and await the response | +| `sendCommandStreaming(method, params)` | `StreamingResponse` | Send a streaming command; returns immediately | | `kill()` | `void` | SIGTERM the process, reject pending requests, force SIGKILL after 5s | | `running` | `boolean` | Whether the bridge is currently connected | +### `StreamingResponse` + +| Method / Property | Type | Description | +|---|---|---| +| `[Symbol.asyncIterator]()` | `AsyncIterator` | Enables `for await...of` consumption of chunks | +| `result` | `Promise` | Resolves with the final result after stream ends | + ### `RustBinaryLocator` | Method / Property | Signature | Description | @@ -265,9 +396,9 @@ fn main() { | `findBinary()` | `Promise` | Find the binary using the priority search; result is cached | | `clearCache()` | `void` | Clear the cached path to force a fresh search | -### Exported Interfaces +### Exported Interfaces & Types -| Interface | Description | +| Interface / Type | Description | |---|---| | `IRustBridgeOptions` | Full configuration for `RustBridge` | | `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` | @@ -275,8 +406,11 @@ fn main() { | `IManagementRequest` | IPC request shape: `{ id, method, params }` | | `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` | | `IManagementEvent` | IPC event shape: `{ event, data }` | +| `IManagementStreamChunk` | IPC stream chunk shape: `{ id, stream: true, data }` | | `ICommandDefinition` | Single command definition: `{ params, result }` | | `TCommandMap` | `Record` | +| `TStreamingCommandKeys` | Extracts keys from a command map that have a `chunk` field | +| `TExtractChunk` | Extracts the chunk type from a streaming command definition | ## License and Legal Information diff --git a/test/helpers/mock-rust-binary.mjs b/test/helpers/mock-rust-binary.mjs index caa67bf..588a78a 100755 --- a/test/helpers/mock-rust-binary.mjs +++ b/test/helpers/mock-rust-binary.mjs @@ -2,22 +2,46 @@ /** * Mock "Rust binary" for testing the RustBridge IPC protocol. - * Reads JSON lines from stdin, writes JSON lines to stdout. + * Reads JSON lines from stdin via Buffer-based scanner, writes JSON lines to stdout. * Emits a ready event on startup. */ -import { createInterface } from 'readline'; - // Emit ready event const readyEvent = JSON.stringify({ event: 'ready', data: { version: '1.0.0' } }); process.stdout.write(readyEvent + '\n'); -const rl = createInterface({ input: process.stdin }); +// Buffer-based newline scanner for stdin (mirrors the RustBridge approach) +let stdinBuffer = Buffer.alloc(0); -rl.on('line', (line) => { +process.stdin.on('data', (chunk) => { + stdinBuffer = Buffer.concat([stdinBuffer, chunk]); + + let newlineIndex; + while ((newlineIndex = stdinBuffer.indexOf(0x0A)) !== -1) { + const lineBuffer = stdinBuffer.subarray(0, newlineIndex); + stdinBuffer = stdinBuffer.subarray(newlineIndex + 1); + const line = lineBuffer.toString('utf8').trim(); + if (line) { + handleLine(line); + } + } +}); + +/** + * Backpressure-aware write to stdout. + */ +function writeResponse(data) { + const json = JSON.stringify(data) + '\n'; + if (!process.stdout.write(json)) { + // Wait for drain before continuing + process.stdout.once('drain', () => {}); + } +} + +function handleLine(line) { let request; try { - request = JSON.parse(line.trim()); + request = JSON.parse(line); } catch { return; } @@ -26,35 +50,53 @@ rl.on('line', (line) => { if (method === 'echo') { // Echo back the params as result - const response = JSON.stringify({ id, success: true, result: params }); - process.stdout.write(response + '\n'); + writeResponse({ id, success: true, result: params }); + } else if (method === 'largeEcho') { + // Echo back params (same as echo, named distinctly for large payload tests) + writeResponse({ id, success: true, result: params }); } else if (method === 'error') { // Return an error - const response = JSON.stringify({ id, success: false, error: 'Test error message' }); - process.stdout.write(response + '\n'); + writeResponse({ id, success: false, error: 'Test error message' }); } else if (method === 'emitEvent') { // Emit a custom event, then respond with success - const event = JSON.stringify({ event: params.eventName, data: params.eventData }); - process.stdout.write(event + '\n'); - const response = JSON.stringify({ id, success: true, result: null }); - process.stdout.write(response + '\n'); + writeResponse({ event: params.eventName, data: params.eventData }); + writeResponse({ id, success: true, result: null }); } else if (method === 'slow') { // Respond after a delay setTimeout(() => { - const response = JSON.stringify({ id, success: true, result: { delayed: true } }); - process.stdout.write(response + '\n'); + writeResponse({ id, success: true, result: { delayed: true } }); }, 100); + } else if (method === 'streamEcho') { + // Send params.count stream chunks, then final response + const count = params.count || 0; + let sent = 0; + const interval = setInterval(() => { + if (sent < count) { + writeResponse({ id, stream: true, data: { index: sent, value: `chunk_${sent}` } }); + sent++; + } else { + clearInterval(interval); + writeResponse({ id, success: true, result: { totalChunks: count } }); + } + }, 10); + } else if (method === 'streamError') { + // Send 1 chunk, then error + writeResponse({ id, stream: true, data: { index: 0, value: 'before_error' } }); + setTimeout(() => { + writeResponse({ id, success: false, error: 'Stream error after chunk' }); + }, 20); + } else if (method === 'streamEmpty') { + // Zero chunks, immediate final response + writeResponse({ id, success: true, result: { totalChunks: 0 } }); } else if (method === 'exit') { // Graceful exit - const response = JSON.stringify({ id, success: true, result: null }); - process.stdout.write(response + '\n'); + writeResponse({ id, success: true, result: null }); process.exit(0); } else { // Unknown command - const response = JSON.stringify({ id, success: false, error: `Unknown method: ${method}` }); - process.stdout.write(response + '\n'); + writeResponse({ id, success: false, error: `Unknown method: ${method}` }); } -}); +} // Handle SIGTERM gracefully process.on('SIGTERM', () => { diff --git a/test/test.rustbridge.node.ts b/test/test.rustbridge.node.ts index c527a68..c502710 100644 --- a/test/test.rustbridge.node.ts +++ b/test/test.rustbridge.node.ts @@ -9,10 +9,14 @@ const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs'); // Define the command types for our mock binary type TMockCommands = { echo: { params: Record; result: Record }; + largeEcho: { params: Record; result: Record }; error: { params: {}; result: never }; emitEvent: { params: { eventName: string; eventData: any }; result: null }; slow: { params: {}; result: { delayed: boolean } }; exit: { params: {}; result: null }; + streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } }; + streamError: { params: {}; chunk: { index: number; value: string }; result: never }; + streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } }; }; tap.test('should spawn and receive ready event', async () => { @@ -188,4 +192,249 @@ tap.test('should emit exit event when process exits', async () => { expect(bridge.running).toBeFalse(); }); +tap.test('should handle 1MB payload round-trip', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + requestTimeoutMs: 30000, + }); + + await bridge.spawn(); + + // Create a ~1MB payload + const largeString = 'x'.repeat(1024 * 1024); + const result = await bridge.sendCommand('largeEcho', { data: largeString }); + expect(result.data).toEqual(largeString); + expect(result.data.length).toEqual(1024 * 1024); + + bridge.kill(); +}); + +tap.test('should handle 10MB payload round-trip', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + requestTimeoutMs: 60000, + }); + + await bridge.spawn(); + + // Create a ~10MB payload + const largeString = 'y'.repeat(10 * 1024 * 1024); + const result = await bridge.sendCommand('largeEcho', { data: largeString }); + expect(result.data).toEqual(largeString); + expect(result.data.length).toEqual(10 * 1024 * 1024); + + bridge.kill(); +}); + +tap.test('should reject outbound messages exceeding maxPayloadSize', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + maxPayloadSize: 1000, + }); + + await bridge.spawn(); + + let threw = false; + try { + await bridge.sendCommand('largeEcho', { data: 'z'.repeat(2000) }); + } catch (err: any) { + threw = true; + expect(err.message).toInclude('maxPayloadSize'); + } + expect(threw).toBeTrue(); + + bridge.kill(); +}); + +tap.test('should handle multiple large concurrent commands', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + requestTimeoutMs: 30000, + }); + + await bridge.spawn(); + + const size = 500 * 1024; // 500KB each + const results = await Promise.all([ + bridge.sendCommand('largeEcho', { data: 'a'.repeat(size), id: 1 }), + bridge.sendCommand('largeEcho', { data: 'b'.repeat(size), id: 2 }), + bridge.sendCommand('largeEcho', { data: 'c'.repeat(size), id: 3 }), + ]); + + expect(results[0].data.length).toEqual(size); + expect(results[0].data[0]).toEqual('a'); + expect(results[1].data.length).toEqual(size); + expect(results[1].data[0]).toEqual('b'); + expect(results[2].data.length).toEqual(size); + expect(results[2].data[0]).toEqual('c'); + + bridge.kill(); +}); + +// === Streaming tests === + +tap.test('streaming: should receive chunks via for-await-of and final result', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + requestTimeoutMs: 10000, + }); + + await bridge.spawn(); + + const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 }); + const chunks: Array<{ index: number; value: string }> = []; + + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks.length).toEqual(5); + for (let i = 0; i < 5; i++) { + expect(chunks[i].index).toEqual(i); + expect(chunks[i].value).toEqual(`chunk_${i}`); + } + + const result = await stream.result; + expect(result.totalChunks).toEqual(5); + + bridge.kill(); +}); + +tap.test('streaming: should handle zero chunks (immediate result)', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + }); + + await bridge.spawn(); + + const stream = bridge.sendCommandStreaming('streamEmpty', {}); + const chunks: any[] = []; + + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks.length).toEqual(0); + + const result = await stream.result; + expect(result.totalChunks).toEqual(0); + + bridge.kill(); +}); + +tap.test('streaming: should propagate error to iterator and .result', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + requestTimeoutMs: 10000, + }); + + await bridge.spawn(); + + const stream = bridge.sendCommandStreaming('streamError', {}); + const chunks: any[] = []; + let iteratorError: Error | null = null; + + try { + for await (const chunk of stream) { + chunks.push(chunk); + } + } catch (err: any) { + iteratorError = err; + } + + // Should have received at least one chunk before error + expect(chunks.length).toEqual(1); + expect(chunks[0].value).toEqual('before_error'); + + // Iterator should have thrown + expect(iteratorError).toBeTruthy(); + expect(iteratorError!.message).toInclude('Stream error after chunk'); + + // .result should also reject + let resultError: Error | null = null; + try { + await stream.result; + } catch (err: any) { + resultError = err; + } + expect(resultError).toBeTruthy(); + expect(resultError!.message).toInclude('Stream error after chunk'); + + bridge.kill(); +}); + +tap.test('streaming: should fail when bridge is not running', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + }); + + const stream = bridge.sendCommandStreaming('streamEcho', { count: 3 }); + + let resultError: Error | null = null; + try { + await stream.result; + } catch (err: any) { + resultError = err; + } + expect(resultError).toBeTruthy(); + expect(resultError!.message).toInclude('not running'); +}); + +tap.test('streaming: should fail when killed mid-stream', async () => { + const bridge = new RustBridge({ + binaryName: 'node', + binaryPath: 'node', + cliArgs: [mockBinaryPath], + readyTimeoutMs: 5000, + requestTimeoutMs: 30000, + }); + + await bridge.spawn(); + + // Request many chunks so we can kill mid-stream + const stream = bridge.sendCommandStreaming('streamEcho', { count: 100 }); + const chunks: any[] = []; + let iteratorError: Error | null = null; + + // Kill after a short delay + setTimeout(() => { + bridge.kill(); + }, 50); + + try { + for await (const chunk of stream) { + chunks.push(chunk); + } + } catch (err: any) { + iteratorError = err; + } + + // Should have gotten some chunks but not all + expect(iteratorError).toBeTruthy(); + expect(iteratorError!.message).toInclude('killed'); +}); + export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 9340a86..48dcc14 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartrust', - version: '1.1.2', + version: '1.2.0', description: 'a bridge between JS engines and rust' } diff --git a/ts/classes.rustbridge.ts b/ts/classes.rustbridge.ts index 5b79c65..1537898 100644 --- a/ts/classes.rustbridge.ts +++ b/ts/classes.rustbridge.ts @@ -1,5 +1,6 @@ import * as plugins from './plugins.js'; import { RustBinaryLocator } from './classes.rustbinarylocator.js'; +import { StreamingResponse } from './classes.streamingresponse.js'; import type { IRustBridgeOptions, IRustBridgeLogger, @@ -7,6 +8,8 @@ import type { IManagementRequest, IManagementResponse, IManagementEvent, + TStreamingCommandKeys, + TExtractChunk, } from './interfaces/index.js'; const defaultLogger: IRustBridgeLogger = { @@ -21,14 +24,16 @@ const defaultLogger: IRustBridgeLogger = { */ export class RustBridge extends plugins.events.EventEmitter { private locator: RustBinaryLocator; - private options: Required> & IRustBridgeOptions; + private options: Required> & IRustBridgeOptions; private logger: IRustBridgeLogger; private childProcess: plugins.childProcess.ChildProcess | null = null; - private readlineInterface: plugins.readline.Interface | null = null; + private stdoutBuffer: Buffer = Buffer.alloc(0); + private stderrRemainder: string = ''; private pendingRequests = new Map void; reject: (error: Error) => void; timer: ReturnType; + streaming?: StreamingResponse; }>(); private requestCounter = 0; private isRunning = false; @@ -42,6 +47,7 @@ export class RustBridge extends plu requestTimeoutMs: 30000, readyTimeoutMs: 10000, readyEventName: 'ready', + maxPayloadSize: 50 * 1024 * 1024, ...options, }; this.locator = new RustBinaryLocator(options, this.logger); @@ -68,24 +74,34 @@ export class RustBridge extends plu env, }); - // Handle stderr + // Handle stderr with cross-chunk buffering this.childProcess.stderr?.on('data', (data: Buffer) => { - const lines = data.toString().split('\n').filter((l: string) => l.trim()); + this.stderrRemainder += data.toString(); + const lines = this.stderrRemainder.split('\n'); + // Keep the last element (incomplete line) as remainder + this.stderrRemainder = lines.pop()!; for (const line of lines) { - this.logger.log('debug', `[${this.options.binaryName}] ${line}`); - this.emit('stderr', line); + const trimmed = line.trim(); + if (trimmed) { + this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`); + this.emit('stderr', trimmed); + } } }); - // Handle stdout via readline for line-delimited JSON - this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! }); - this.readlineInterface.on('line', (line: string) => { - this.handleLine(line.trim()); + // Handle stdout via Buffer-based newline scanner + this.childProcess.stdout!.on('data', (chunk: Buffer) => { + this.handleStdoutChunk(chunk); }); // Handle process exit this.childProcess.on('exit', (code, signal) => { this.logger.log('info', `Process exited (code=${code}, signal=${signal})`); + // Flush any remaining stderr + if (this.stderrRemainder.trim()) { + this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`); + this.emit('stderr', this.stderrRemainder.trim()); + } this.cleanup(); this.emit('exit', code, signal); }); @@ -130,6 +146,15 @@ export class RustBridge extends plu const id = `req_${++this.requestCounter}`; const request: IManagementRequest = { id, method, params }; + const json = JSON.stringify(request); + + // Check outbound payload size + const byteLength = Buffer.byteLength(json, 'utf8'); + if (byteLength > this.options.maxPayloadSize) { + throw new Error( + `Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})` + ); + } return new Promise((resolve, reject) => { const timer = setTimeout(() => { @@ -139,17 +164,64 @@ export class RustBridge extends plu this.pendingRequests.set(id, { resolve, reject, timer }); - const json = JSON.stringify(request) + '\n'; - this.childProcess!.stdin!.write(json, (err) => { - if (err) { - clearTimeout(timer); - this.pendingRequests.delete(id); - reject(new Error(`Failed to write to stdin: ${err.message}`)); - } + this.writeToStdin(json + '\n').catch((err) => { + clearTimeout(timer); + this.pendingRequests.delete(id); + reject(new Error(`Failed to write to stdin: ${err.message}`)); }); }); } + /** + * Send a streaming command to the Rust process. + * Returns a StreamingResponse that yields chunks via `for await...of` + * and exposes `.result` for the final response. + */ + public sendCommandStreaming>( + method: K, + params: TCommands[K]['params'], + ): StreamingResponse, TCommands[K]['result']> { + const streaming = new StreamingResponse, TCommands[K]['result']>(); + + if (!this.childProcess || !this.isRunning) { + streaming.fail(new Error(`${this.options.binaryName} bridge is not running`)); + return streaming; + } + + const id = `req_${++this.requestCounter}`; + const request: IManagementRequest = { id, method, params }; + const json = JSON.stringify(request); + + const byteLength = Buffer.byteLength(json, 'utf8'); + if (byteLength > this.options.maxPayloadSize) { + streaming.fail( + new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`) + ); + return streaming; + } + + const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs; + const timer = setTimeout(() => { + this.pendingRequests.delete(id); + streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`)); + }, timeoutMs); + + this.pendingRequests.set(id, { + resolve: (result: any) => streaming.finish(result), + reject: (error: Error) => streaming.fail(error), + timer, + streaming, + }); + + this.writeToStdin(json + '\n').catch((err) => { + clearTimeout(timer); + this.pendingRequests.delete(id); + streaming.fail(new Error(`Failed to write to stdin: ${err.message}`)); + }); + + return streaming; + } + /** * Kill the Rust process and clean up all resources. */ @@ -159,11 +231,9 @@ export class RustBridge extends plu this.childProcess = null; this.isRunning = false; - // Close readline - if (this.readlineInterface) { - this.readlineInterface.close(); - this.readlineInterface = null; - } + // Clear buffers + this.stdoutBuffer = Buffer.alloc(0); + this.stderrRemainder = ''; // Reject pending requests for (const [, pending] of this.pendingRequests) { @@ -203,6 +273,62 @@ export class RustBridge extends plu return this.isRunning; } + /** + * Buffer-based newline scanner for stdout chunks. + * Replaces readline to handle large payloads without buffering entire lines in a separate abstraction. + */ + private handleStdoutChunk(chunk: Buffer): void { + this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]); + + let newlineIndex: number; + while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) { + const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex); + this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1); + + if (lineBuffer.length > this.options.maxPayloadSize) { + this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`); + continue; + } + + const line = lineBuffer.toString('utf8').trim(); + this.handleLine(line); + } + + // If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM + if (this.stdoutBuffer.length > this.options.maxPayloadSize) { + this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`); + this.stdoutBuffer = Buffer.alloc(0); + } + } + + /** + * Write data to stdin with backpressure support. + * Waits for drain if the internal buffer is full. + */ + private writeToStdin(data: string): Promise { + return new Promise((resolve, reject) => { + if (!this.childProcess?.stdin) { + reject(new Error('stdin not available')); + return; + } + + const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => { + if (err) { + reject(err); + } + }); + + if (canContinue) { + resolve(); + } else { + // Wait for drain before resolving + this.childProcess.stdin.once('drain', () => { + resolve(); + }); + } + }); + } + private handleLine(line: string): void { if (!line) return; @@ -221,6 +347,22 @@ export class RustBridge extends plu return; } + // Stream chunk (has 'id' + stream === true + 'data') + if ('id' in parsed && parsed.stream === true && 'data' in parsed) { + const pending = this.pendingRequests.get(parsed.id); + if (pending?.streaming) { + // Reset inactivity timeout + clearTimeout(pending.timer); + const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs; + pending.timer = setTimeout(() => { + this.pendingRequests.delete(parsed.id); + pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`)); + }, timeoutMs); + pending.streaming.pushChunk(parsed.data); + } + return; + } + // Otherwise it's a response (has 'id' field) if ('id' in parsed) { const response = parsed as IManagementResponse; @@ -240,11 +382,8 @@ export class RustBridge extends plu private cleanup(): void { this.isRunning = false; this.childProcess = null; - - if (this.readlineInterface) { - this.readlineInterface.close(); - this.readlineInterface = null; - } + this.stdoutBuffer = Buffer.alloc(0); + this.stderrRemainder = ''; // Reject all pending requests for (const [, pending] of this.pendingRequests) { diff --git a/ts/classes.streamingresponse.ts b/ts/classes.streamingresponse.ts new file mode 100644 index 0000000..ca57618 --- /dev/null +++ b/ts/classes.streamingresponse.ts @@ -0,0 +1,110 @@ +/** + * Represents a streaming response from a Rust bridge command. + * Implements AsyncIterable to allow `for await...of` consumption of chunks, + * and exposes `.result` for the final response once the stream ends. + * + * @typeParam TChunk - Type of each streamed chunk + * @typeParam TResult - Type of the final result + */ +export class StreamingResponse implements AsyncIterable { + /** Resolves with the final result when the stream ends successfully. */ + public readonly result: Promise; + + private resolveResult!: (value: TResult) => void; + private rejectResult!: (error: Error) => void; + + /** Buffered chunks not yet consumed by the iterator. */ + private buffer: TChunk[] = []; + /** Waiting consumer resolve callback (when iterator is ahead of producer). */ + private waiting: ((value: IteratorResult) => void) | null = null; + /** Waiting consumer reject callback. */ + private waitingReject: ((error: Error) => void) | null = null; + + private done = false; + private error: Error | null = null; + + constructor() { + this.result = new Promise((resolve, reject) => { + this.resolveResult = resolve; + this.rejectResult = reject; + }); + } + + /** + * Push a chunk into the stream. Called internally by RustBridge. + */ + public pushChunk(chunk: TChunk): void { + if (this.done) return; + + if (this.waiting) { + // A consumer is waiting — deliver immediately + const resolve = this.waiting; + this.waiting = null; + this.waitingReject = null; + resolve({ value: chunk, done: false }); + } else { + // No consumer waiting — buffer the chunk + this.buffer.push(chunk); + } + } + + /** + * End the stream successfully with a final result. Called internally by RustBridge. + */ + public finish(result: TResult): void { + if (this.done) return; + this.done = true; + this.resolveResult(result); + + // If a consumer is waiting, signal end of iteration + if (this.waiting) { + const resolve = this.waiting; + this.waiting = null; + this.waitingReject = null; + resolve({ value: undefined as any, done: true }); + } + } + + /** + * End the stream with an error. Called internally by RustBridge. + */ + public fail(error: Error): void { + if (this.done) return; + this.done = true; + this.error = error; + this.rejectResult(error); + + // If a consumer is waiting, reject it + if (this.waitingReject) { + const reject = this.waitingReject; + this.waiting = null; + this.waitingReject = null; + reject(error); + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: (): Promise> => { + // If there are buffered chunks, deliver one + if (this.buffer.length > 0) { + return Promise.resolve({ value: this.buffer.shift()!, done: false }); + } + + // If the stream is done, signal end + if (this.done) { + if (this.error) { + return Promise.reject(this.error); + } + return Promise.resolve({ value: undefined as any, done: true }); + } + + // No buffered chunks and not done — wait for the next push + return new Promise>((resolve, reject) => { + this.waiting = resolve; + this.waitingReject = reject; + }); + }, + }; + } +} diff --git a/ts/index.ts b/ts/index.ts index d661bad..385e4b6 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,3 +1,4 @@ export { RustBridge } from './classes.rustbridge.js'; export { RustBinaryLocator } from './classes.rustbinarylocator.js'; +export { StreamingResponse } from './classes.streamingresponse.js'; export * from './interfaces/index.js'; diff --git a/ts/interfaces/config.ts b/ts/interfaces/config.ts index 4d13714..6e6ba5d 100644 --- a/ts/interfaces/config.ts +++ b/ts/interfaces/config.ts @@ -39,4 +39,9 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions { readyEventName?: string; /** Optional logger instance */ logger?: IRustBridgeLogger; + /** Maximum message size in bytes (default: 50MB). Messages exceeding this are rejected. */ + maxPayloadSize?: number; + /** Inactivity timeout for streaming commands in ms (default: same as requestTimeoutMs). + * Resets on each chunk received. */ + streamTimeoutMs?: number; } diff --git a/ts/interfaces/ipc.ts b/ts/interfaces/ipc.ts index a36ad11..06f538f 100644 --- a/ts/interfaces/ipc.ts +++ b/ts/interfaces/ipc.ts @@ -38,3 +38,25 @@ export interface ICommandDefinition { * Used to type-safe the bridge's sendCommand method. */ export type TCommandMap = Record; + +/** + * Stream chunk message received from the Rust binary during a streaming command. + */ +export interface IManagementStreamChunk { + id: string; + stream: true; + data: any; +} + +/** + * Extract keys from a command map whose definitions include a `chunk` field, + * indicating they support streaming responses. + */ +export type TStreamingCommandKeys = { + [K in keyof TCommands]: TCommands[K] extends { chunk: any } ? K : never; +}[keyof TCommands]; + +/** + * Extract the chunk type from a command definition that has a `chunk` field. + */ +export type TExtractChunk = TDef extends { chunk: infer C } ? C : never; diff --git a/ts/plugins.ts b/ts/plugins.ts index 8f7a422..722734b 100644 --- a/ts/plugins.ts +++ b/ts/plugins.ts @@ -2,11 +2,10 @@ import * as path from 'path'; import * as fs from 'fs'; import * as childProcess from 'child_process'; -import * as readline from 'readline'; import * as events from 'events'; import * as url from 'url'; -export { path, fs, childProcess, readline, events, url }; +export { path, fs, childProcess, events, url }; // @push.rocks scope import * as smartpath from '@push.rocks/smartpath';