Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0c39e157c2 | |||
| b7e3e30ce5 | |||
| 35971a395f | |||
| 5fb991ff51 | |||
| dcb88ef4b5 | |||
| 6e4947ef7d |
27
changelog.md
27
changelog.md
@@ -1,5 +1,32 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-02-12 - 1.2.1 - fix(rust-binary-locator)
|
||||
auto-fix missing execute permission for located Rust binaries
|
||||
|
||||
- If a located binary exists but lacks the execute bit, attempt to chmod it to 0o755 and treat it as executable.
|
||||
- Logs an info message when the auto-fix is applied: 'Auto-fixed missing execute permission on: <filePath>'.
|
||||
- Addresses cases where npm/pnpm installs remove the execute permission from bundled binaries.
|
||||
|
||||
## 2026-02-11 - 1.2.0 - feat(rustbridge)
|
||||
add streaming responses and robust large-payload/backpressure handling to RustBridge
|
||||
|
||||
- Introduce StreamingResponse type and export it (for-await-of iterator + .result promise)
|
||||
- Add sendCommandStreaming API to send streaming commands and receive chunks + final result
|
||||
- Implement buffer-based stdout newline scanner to handle large messages and avoid readline limits
|
||||
- Add backpressure-aware writeToStdin to wait for drain when writing large outbound payloads
|
||||
- Add maxPayloadSize option and enforce outbound/inbound size checks to prevent OOMs
|
||||
- Add streamTimeoutMs (inactivity timeout) and reset timeout on each received chunk
|
||||
- Improve stderr handling (cross-chunk buffering and trimmed emits)
|
||||
- Update mock test binary and extensive tests for streaming, large payloads, concurrency, and error cases
|
||||
- Add TypeScript types for streaming commands (TStreamingCommandKeys, TExtractChunk, IManagementStreamChunk)
|
||||
|
||||
## 2026-02-10 - 1.1.2 - fix(rust-binary-locator)
|
||||
use import.meta.resolve and url.fileURLToPath to locate bundled Rust binary in ESM environments
|
||||
|
||||
- Replace require.resolve with import.meta.resolve to support ESM module resolution
|
||||
- Convert resolved file URL to a filesystem path using url.fileURLToPath
|
||||
- Export the url module from ts/plugins to provide fileURLToPath
|
||||
|
||||
## 2026-02-10 - 1.1.1 - fix(readme)
|
||||
update README with comprehensive documentation, usage examples, API reference, installation instructions, and legal/company information
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartrust",
|
||||
"version": "1.1.1",
|
||||
"version": "1.2.1",
|
||||
"private": false,
|
||||
"description": "a bridge between JS engines and rust",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
180
readme.md
180
readme.md
@@ -1,6 +1,6 @@
|
||||
# @push.rocks/smartrust
|
||||
|
||||
A type-safe, standardized bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC.
|
||||
A type-safe, production-ready bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC — with support for request/response, streaming, and event patterns.
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
@@ -16,18 +16,19 @@ pnpm install @push.rocks/smartrust
|
||||
|
||||
## Overview 🔭
|
||||
|
||||
`@push.rocks/smartrust` provides a production-ready bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation with timeouts, event streaming, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
|
||||
`@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
|
||||
|
||||
### Why?
|
||||
### Why? 🤔
|
||||
|
||||
If you're integrating Rust into a Node.js project, you'll inevitably need:
|
||||
- A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages)
|
||||
- A way to **spawn** it and establish reliable two-way communication
|
||||
- **Type-safe** request/response patterns with proper error handling
|
||||
- **Streaming responses** for progressive data processing, log tailing, or chunked transfers
|
||||
- **Event streaming** from Rust to TypeScript
|
||||
- **Graceful lifecycle management** (ready detection, clean shutdown, force kill)
|
||||
|
||||
`smartrust` wraps all of this into two classes: `RustBridge` and `RustBinaryLocator`.
|
||||
`smartrust` wraps all of this into three classes: `RustBridge`, `RustBinaryLocator`, and `StreamingResponse`.
|
||||
|
||||
## Usage 🚀
|
||||
|
||||
@@ -38,8 +39,9 @@ If you're integrating Rust into a Node.js project, you'll inevitably need:
|
||||
| Direction | Format | Description |
|
||||
|-----------|--------|-------------|
|
||||
| **TS → Rust** (Request) | `{"id": "req_1", "method": "start", "params": {...}}` | Command with unique ID |
|
||||
| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Response correlated by ID |
|
||||
| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Final response correlated by ID |
|
||||
| **Rust → TS** (Error) | `{"id": "req_1", "success": false, "error": "msg"}` | Error correlated by ID |
|
||||
| **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) |
|
||||
| **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) |
|
||||
|
||||
Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. That's it. Stderr is free for logging.
|
||||
@@ -49,7 +51,7 @@ Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. Th
|
||||
Start by defining a type map of commands your Rust binary supports:
|
||||
|
||||
```typescript
|
||||
import { RustBridge, type ICommandDefinition } from '@push.rocks/smartrust';
|
||||
import { RustBridge } from '@push.rocks/smartrust';
|
||||
|
||||
// Define your command types
|
||||
type TMyCommands = {
|
||||
@@ -92,7 +94,91 @@ bridge.on('management:configChanged', (data) => {
|
||||
bridge.kill();
|
||||
```
|
||||
|
||||
### Binary Locator
|
||||
### Streaming Commands 🌊
|
||||
|
||||
For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output.
|
||||
|
||||
#### Defining Streaming Commands
|
||||
|
||||
Add a `chunk` field to your command type definition to mark it as streamable:
|
||||
|
||||
```typescript
|
||||
type TMyCommands = {
|
||||
// Regular command (request → response)
|
||||
ping: { params: {}; result: { pong: boolean } };
|
||||
|
||||
// Streaming command (request → chunks... → final result)
|
||||
processData: { params: { count: number }; chunk: { index: number; progress: number }; result: { totalProcessed: number } };
|
||||
tailLogs: { params: { lines: number }; chunk: string; result: { linesRead: number } };
|
||||
};
|
||||
```
|
||||
|
||||
#### Consuming Streams
|
||||
|
||||
```typescript
|
||||
// Returns a StreamingResponse immediately (does NOT block)
|
||||
const stream = bridge.sendCommandStreaming('processData', { count: 1000 });
|
||||
|
||||
// Consume chunks with for-await-of
|
||||
for await (const chunk of stream) {
|
||||
console.log(`Processing item ${chunk.index}, progress: ${chunk.progress}%`);
|
||||
}
|
||||
|
||||
// Get the final result after all chunks are consumed
|
||||
const result = await stream.result;
|
||||
console.log(`Done! Processed ${result.totalProcessed} items`);
|
||||
```
|
||||
|
||||
#### Error Handling in Streams
|
||||
|
||||
Errors propagate to both the iterator and the `.result` promise:
|
||||
|
||||
```typescript
|
||||
const stream = bridge.sendCommandStreaming('processData', { count: 100 });
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
console.log(chunk);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Stream failed:', err.message);
|
||||
}
|
||||
|
||||
// .result also rejects on error
|
||||
try {
|
||||
await stream.result;
|
||||
} catch (err) {
|
||||
console.error('Same error here:', err.message);
|
||||
}
|
||||
```
|
||||
|
||||
#### Stream Timeout
|
||||
|
||||
By default, streaming commands use the same timeout as regular commands (`requestTimeoutMs`). The timeout **resets on each chunk received**, so it acts as an inactivity timeout rather than an absolute timeout. You can configure it separately:
|
||||
|
||||
```typescript
|
||||
const bridge = new RustBridge<TMyCommands>({
|
||||
binaryName: 'my-server',
|
||||
requestTimeoutMs: 30000, // regular command timeout: 30s
|
||||
streamTimeoutMs: 60000, // streaming inactivity timeout: 60s
|
||||
});
|
||||
```
|
||||
|
||||
#### Implementing Streaming on the Rust Side
|
||||
|
||||
Your Rust binary sends stream chunks by writing lines with `"stream": true` before the final response:
|
||||
|
||||
```rust
|
||||
// For each chunk:
|
||||
println!(r#"{{"id":"{}","stream":true,"data":{{"index":{},"progress":{}}}}}"#, req.id, i, pct);
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
// When done, send the final response (same as non-streaming):
|
||||
println!(r#"{{"id":"{}","success":true,"result":{{"totalProcessed":{}}}}}"#, req.id, total);
|
||||
io::stdout().flush().unwrap();
|
||||
```
|
||||
|
||||
### Binary Locator 🔍
|
||||
|
||||
The `RustBinaryLocator` searches for your binary using a priority-ordered strategy:
|
||||
|
||||
@@ -119,7 +205,7 @@ const binaryPath = await locator.findBinary();
|
||||
// Result is cached — call clearCache() to force re-search
|
||||
```
|
||||
|
||||
### Configuration Reference
|
||||
### Configuration Reference ⚙️
|
||||
|
||||
The `RustBridge` constructor accepts an `IRustBridgeOptions` object:
|
||||
|
||||
@@ -136,14 +222,16 @@ const bridge = new RustBridge<TMyCommands>({
|
||||
// --- Bridge Options ---
|
||||
cliArgs: ['--management'], // optional: args passed to binary (default: ['--management'])
|
||||
requestTimeoutMs: 30000, // optional: per-request timeout (default: 30000)
|
||||
streamTimeoutMs: 30000, // optional: streaming inactivity timeout (default: requestTimeoutMs)
|
||||
readyTimeoutMs: 10000, // optional: ready event timeout (default: 10000)
|
||||
maxPayloadSize: 50 * 1024 * 1024, // optional: max message size in bytes (default: 50MB)
|
||||
env: { RUST_LOG: 'debug' }, // optional: extra env vars for the child process
|
||||
readyEventName: 'ready', // optional: name of the ready event (default: 'ready')
|
||||
logger: myLogger, // optional: logger implementing IRustBridgeLogger
|
||||
});
|
||||
```
|
||||
|
||||
### Events
|
||||
### Events 📡
|
||||
|
||||
`RustBridge` extends `EventEmitter` and emits the following events:
|
||||
|
||||
@@ -154,7 +242,7 @@ const bridge = new RustBridge<TMyCommands>({
|
||||
| `stderr` | `string` | A line from the binary's stderr |
|
||||
| `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) |
|
||||
|
||||
### Custom Logger
|
||||
### Custom Logger 📝
|
||||
|
||||
Plug in your own logger by implementing the `IRustBridgeLogger` interface:
|
||||
|
||||
@@ -173,7 +261,7 @@ const bridge = new RustBridge<TMyCommands>({
|
||||
});
|
||||
```
|
||||
|
||||
### Writing the Rust Side
|
||||
### Writing the Rust Side 🦀
|
||||
|
||||
Your Rust binary needs to implement a simple protocol:
|
||||
|
||||
@@ -186,9 +274,11 @@ Your Rust binary needs to implement a simple protocol:
|
||||
|
||||
3. **Write JSON responses to stdout**, each as `{"id": "...", "success": true, "result": {...}}\n`
|
||||
|
||||
4. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout
|
||||
4. **For streaming commands**, write zero or more `{"id": "...", "stream": true, "data": {...}}\n` chunks before the final response
|
||||
|
||||
5. **Use stderr** for logging — it won't interfere with the IPC protocol
|
||||
5. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout
|
||||
|
||||
6. **Use stderr** for logging — it won't interfere with the IPC protocol
|
||||
|
||||
Here's a minimal Rust skeleton:
|
||||
|
||||
@@ -213,6 +303,13 @@ struct Response {
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StreamChunk {
|
||||
id: String,
|
||||
stream: bool,
|
||||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
// Signal ready
|
||||
println!(r#"{{"event":"ready","data":{{"version":"1.0.0"}}}}"#);
|
||||
@@ -223,26 +320,52 @@ fn main() {
|
||||
let line = line.unwrap();
|
||||
let req: Request = serde_json::from_str(&line).unwrap();
|
||||
|
||||
let response = match req.method.as_str() {
|
||||
"ping" => Response {
|
||||
match req.method.as_str() {
|
||||
"ping" => {
|
||||
let resp = Response {
|
||||
id: req.id,
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"pong": true})),
|
||||
error: None,
|
||||
},
|
||||
_ => Response {
|
||||
};
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
"processData" => {
|
||||
let count = req.params["count"].as_u64().unwrap_or(0);
|
||||
// Send stream chunks
|
||||
for i in 0..count {
|
||||
let chunk = StreamChunk {
|
||||
id: req.id.clone(),
|
||||
stream: true,
|
||||
data: serde_json::json!({"index": i, "progress": ((i+1) * 100 / count)}),
|
||||
};
|
||||
println!("{}", serde_json::to_string(&chunk).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
// Send final response
|
||||
let resp = Response {
|
||||
id: req.id,
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"totalProcessed": count})),
|
||||
error: None,
|
||||
};
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
_ => {
|
||||
let resp = Response {
|
||||
id: req.id,
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!("Unknown method: {}", req.method)),
|
||||
},
|
||||
};
|
||||
|
||||
let json = serde_json::to_string(&response).unwrap();
|
||||
println!("{json}");
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## API Reference 📖
|
||||
@@ -254,9 +377,17 @@ fn main() {
|
||||
| `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance |
|
||||
| `spawn()` | `Promise<boolean>` | Spawn the binary and wait for ready; returns `false` on failure |
|
||||
| `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response |
|
||||
| `sendCommandStreaming(method, params)` | `StreamingResponse<TChunk, TResult>` | Send a streaming command; returns immediately |
|
||||
| `kill()` | `void` | SIGTERM the process, reject pending requests, force SIGKILL after 5s |
|
||||
| `running` | `boolean` | Whether the bridge is currently connected |
|
||||
|
||||
### `StreamingResponse<TChunk, TResult>`
|
||||
|
||||
| Method / Property | Type | Description |
|
||||
|---|---|---|
|
||||
| `[Symbol.asyncIterator]()` | `AsyncIterator<TChunk>` | Enables `for await...of` consumption of chunks |
|
||||
| `result` | `Promise<TResult>` | Resolves with the final result after stream ends |
|
||||
|
||||
### `RustBinaryLocator`
|
||||
|
||||
| Method / Property | Signature | Description |
|
||||
@@ -265,9 +396,9 @@ fn main() {
|
||||
| `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached |
|
||||
| `clearCache()` | `void` | Clear the cached path to force a fresh search |
|
||||
|
||||
### Exported Interfaces
|
||||
### Exported Interfaces & Types
|
||||
|
||||
| Interface | Description |
|
||||
| Interface / Type | Description |
|
||||
|---|---|
|
||||
| `IRustBridgeOptions` | Full configuration for `RustBridge` |
|
||||
| `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` |
|
||||
@@ -275,8 +406,11 @@ fn main() {
|
||||
| `IManagementRequest` | IPC request shape: `{ id, method, params }` |
|
||||
| `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` |
|
||||
| `IManagementEvent` | IPC event shape: `{ event, data }` |
|
||||
| `IManagementStreamChunk` | IPC stream chunk shape: `{ id, stream: true, data }` |
|
||||
| `ICommandDefinition` | Single command definition: `{ params, result }` |
|
||||
| `TCommandMap` | `Record<string, ICommandDefinition>` |
|
||||
| `TStreamingCommandKeys<T>` | Extracts keys from a command map that have a `chunk` field |
|
||||
| `TExtractChunk<T>` | Extracts the chunk type from a streaming command definition |
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
|
||||
@@ -2,22 +2,46 @@
|
||||
|
||||
/**
|
||||
* Mock "Rust binary" for testing the RustBridge IPC protocol.
|
||||
* Reads JSON lines from stdin, writes JSON lines to stdout.
|
||||
* Reads JSON lines from stdin via Buffer-based scanner, writes JSON lines to stdout.
|
||||
* Emits a ready event on startup.
|
||||
*/
|
||||
|
||||
import { createInterface } from 'readline';
|
||||
|
||||
// Emit ready event
|
||||
const readyEvent = JSON.stringify({ event: 'ready', data: { version: '1.0.0' } });
|
||||
process.stdout.write(readyEvent + '\n');
|
||||
|
||||
const rl = createInterface({ input: process.stdin });
|
||||
// Buffer-based newline scanner for stdin (mirrors the RustBridge approach)
|
||||
let stdinBuffer = Buffer.alloc(0);
|
||||
|
||||
rl.on('line', (line) => {
|
||||
process.stdin.on('data', (chunk) => {
|
||||
stdinBuffer = Buffer.concat([stdinBuffer, chunk]);
|
||||
|
||||
let newlineIndex;
|
||||
while ((newlineIndex = stdinBuffer.indexOf(0x0A)) !== -1) {
|
||||
const lineBuffer = stdinBuffer.subarray(0, newlineIndex);
|
||||
stdinBuffer = stdinBuffer.subarray(newlineIndex + 1);
|
||||
const line = lineBuffer.toString('utf8').trim();
|
||||
if (line) {
|
||||
handleLine(line);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Backpressure-aware write to stdout.
|
||||
*/
|
||||
function writeResponse(data) {
|
||||
const json = JSON.stringify(data) + '\n';
|
||||
if (!process.stdout.write(json)) {
|
||||
// Wait for drain before continuing
|
||||
process.stdout.once('drain', () => {});
|
||||
}
|
||||
}
|
||||
|
||||
function handleLine(line) {
|
||||
let request;
|
||||
try {
|
||||
request = JSON.parse(line.trim());
|
||||
request = JSON.parse(line);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
@@ -26,35 +50,53 @@ rl.on('line', (line) => {
|
||||
|
||||
if (method === 'echo') {
|
||||
// Echo back the params as result
|
||||
const response = JSON.stringify({ id, success: true, result: params });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: true, result: params });
|
||||
} else if (method === 'largeEcho') {
|
||||
// Echo back params (same as echo, named distinctly for large payload tests)
|
||||
writeResponse({ id, success: true, result: params });
|
||||
} else if (method === 'error') {
|
||||
// Return an error
|
||||
const response = JSON.stringify({ id, success: false, error: 'Test error message' });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: false, error: 'Test error message' });
|
||||
} else if (method === 'emitEvent') {
|
||||
// Emit a custom event, then respond with success
|
||||
const event = JSON.stringify({ event: params.eventName, data: params.eventData });
|
||||
process.stdout.write(event + '\n');
|
||||
const response = JSON.stringify({ id, success: true, result: null });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ event: params.eventName, data: params.eventData });
|
||||
writeResponse({ id, success: true, result: null });
|
||||
} else if (method === 'slow') {
|
||||
// Respond after a delay
|
||||
setTimeout(() => {
|
||||
const response = JSON.stringify({ id, success: true, result: { delayed: true } });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: true, result: { delayed: true } });
|
||||
}, 100);
|
||||
} else if (method === 'streamEcho') {
|
||||
// Send params.count stream chunks, then final response
|
||||
const count = params.count || 0;
|
||||
let sent = 0;
|
||||
const interval = setInterval(() => {
|
||||
if (sent < count) {
|
||||
writeResponse({ id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
|
||||
sent++;
|
||||
} else {
|
||||
clearInterval(interval);
|
||||
writeResponse({ id, success: true, result: { totalChunks: count } });
|
||||
}
|
||||
}, 10);
|
||||
} else if (method === 'streamError') {
|
||||
// Send 1 chunk, then error
|
||||
writeResponse({ id, stream: true, data: { index: 0, value: 'before_error' } });
|
||||
setTimeout(() => {
|
||||
writeResponse({ id, success: false, error: 'Stream error after chunk' });
|
||||
}, 20);
|
||||
} else if (method === 'streamEmpty') {
|
||||
// Zero chunks, immediate final response
|
||||
writeResponse({ id, success: true, result: { totalChunks: 0 } });
|
||||
} else if (method === 'exit') {
|
||||
// Graceful exit
|
||||
const response = JSON.stringify({ id, success: true, result: null });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: true, result: null });
|
||||
process.exit(0);
|
||||
} else {
|
||||
// Unknown command
|
||||
const response = JSON.stringify({ id, success: false, error: `Unknown method: ${method}` });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: false, error: `Unknown method: ${method}` });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Handle SIGTERM gracefully
|
||||
process.on('SIGTERM', () => {
|
||||
|
||||
@@ -9,10 +9,14 @@ const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
|
||||
// Define the command types for our mock binary
|
||||
type TMockCommands = {
|
||||
echo: { params: Record<string, any>; result: Record<string, any> };
|
||||
largeEcho: { params: Record<string, any>; result: Record<string, any> };
|
||||
error: { params: {}; result: never };
|
||||
emitEvent: { params: { eventName: string; eventData: any }; result: null };
|
||||
slow: { params: {}; result: { delayed: boolean } };
|
||||
exit: { params: {}; result: null };
|
||||
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
|
||||
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
|
||||
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
|
||||
};
|
||||
|
||||
tap.test('should spawn and receive ready event', async () => {
|
||||
@@ -188,4 +192,249 @@ tap.test('should emit exit event when process exits', async () => {
|
||||
expect(bridge.running).toBeFalse();
|
||||
});
|
||||
|
||||
tap.test('should handle 1MB payload round-trip', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 30000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
// Create a ~1MB payload
|
||||
const largeString = 'x'.repeat(1024 * 1024);
|
||||
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||
expect(result.data).toEqual(largeString);
|
||||
expect(result.data.length).toEqual(1024 * 1024);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('should handle 10MB payload round-trip', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 60000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
// Create a ~10MB payload
|
||||
const largeString = 'y'.repeat(10 * 1024 * 1024);
|
||||
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||
expect(result.data).toEqual(largeString);
|
||||
expect(result.data.length).toEqual(10 * 1024 * 1024);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('should reject outbound messages exceeding maxPayloadSize', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
maxPayloadSize: 1000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
let threw = false;
|
||||
try {
|
||||
await bridge.sendCommand('largeEcho', { data: 'z'.repeat(2000) });
|
||||
} catch (err: any) {
|
||||
threw = true;
|
||||
expect(err.message).toInclude('maxPayloadSize');
|
||||
}
|
||||
expect(threw).toBeTrue();
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('should handle multiple large concurrent commands', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 30000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const size = 500 * 1024; // 500KB each
|
||||
const results = await Promise.all([
|
||||
bridge.sendCommand('largeEcho', { data: 'a'.repeat(size), id: 1 }),
|
||||
bridge.sendCommand('largeEcho', { data: 'b'.repeat(size), id: 2 }),
|
||||
bridge.sendCommand('largeEcho', { data: 'c'.repeat(size), id: 3 }),
|
||||
]);
|
||||
|
||||
expect(results[0].data.length).toEqual(size);
|
||||
expect(results[0].data[0]).toEqual('a');
|
||||
expect(results[1].data.length).toEqual(size);
|
||||
expect(results[1].data[0]).toEqual('b');
|
||||
expect(results[2].data.length).toEqual(size);
|
||||
expect(results[2].data[0]).toEqual('c');
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
// === Streaming tests ===
|
||||
|
||||
tap.test('streaming: should receive chunks via for-await-of and final result', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 10000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
|
||||
const chunks: Array<{ index: number; value: string }> = [];
|
||||
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
expect(chunks.length).toEqual(5);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(chunks[i].index).toEqual(i);
|
||||
expect(chunks[i].value).toEqual(`chunk_${i}`);
|
||||
}
|
||||
|
||||
const result = await stream.result;
|
||||
expect(result.totalChunks).toEqual(5);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('streaming: should handle zero chunks (immediate result)', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamEmpty', {});
|
||||
const chunks: any[] = [];
|
||||
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
expect(chunks.length).toEqual(0);
|
||||
|
||||
const result = await stream.result;
|
||||
expect(result.totalChunks).toEqual(0);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('streaming: should propagate error to iterator and .result', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 10000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamError', {});
|
||||
const chunks: any[] = [];
|
||||
let iteratorError: Error | null = null;
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
} catch (err: any) {
|
||||
iteratorError = err;
|
||||
}
|
||||
|
||||
// Should have received at least one chunk before error
|
||||
expect(chunks.length).toEqual(1);
|
||||
expect(chunks[0].value).toEqual('before_error');
|
||||
|
||||
// Iterator should have thrown
|
||||
expect(iteratorError).toBeTruthy();
|
||||
expect(iteratorError!.message).toInclude('Stream error after chunk');
|
||||
|
||||
// .result should also reject
|
||||
let resultError: Error | null = null;
|
||||
try {
|
||||
await stream.result;
|
||||
} catch (err: any) {
|
||||
resultError = err;
|
||||
}
|
||||
expect(resultError).toBeTruthy();
|
||||
expect(resultError!.message).toInclude('Stream error after chunk');
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('streaming: should fail when bridge is not running', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
});
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamEcho', { count: 3 });
|
||||
|
||||
let resultError: Error | null = null;
|
||||
try {
|
||||
await stream.result;
|
||||
} catch (err: any) {
|
||||
resultError = err;
|
||||
}
|
||||
expect(resultError).toBeTruthy();
|
||||
expect(resultError!.message).toInclude('not running');
|
||||
});
|
||||
|
||||
tap.test('streaming: should fail when killed mid-stream', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 30000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
// Request many chunks so we can kill mid-stream
|
||||
const stream = bridge.sendCommandStreaming('streamEcho', { count: 100 });
|
||||
const chunks: any[] = [];
|
||||
let iteratorError: Error | null = null;
|
||||
|
||||
// Kill after a short delay
|
||||
setTimeout(() => {
|
||||
bridge.kill();
|
||||
}, 50);
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
} catch (err: any) {
|
||||
iteratorError = err;
|
||||
}
|
||||
|
||||
// Should have gotten some chunks but not all
|
||||
expect(iteratorError).toBeTruthy();
|
||||
expect(iteratorError!.message).toInclude('killed');
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartrust',
|
||||
version: '1.1.1',
|
||||
version: '1.2.1',
|
||||
description: 'a bridge between JS engines and rust'
|
||||
}
|
||||
|
||||
@@ -108,7 +108,8 @@ export class RustBinaryLocator {
|
||||
const packageName = `${platformPackagePrefix}-${platform}-${arch}`;
|
||||
|
||||
try {
|
||||
const packagePath = require.resolve(`${packageName}/${binaryName}`);
|
||||
const resolved = import.meta.resolve(`${packageName}/${binaryName}`);
|
||||
const packagePath = plugins.url.fileURLToPath(resolved);
|
||||
if (await this.isExecutable(packagePath)) {
|
||||
return packagePath;
|
||||
}
|
||||
@@ -122,10 +123,19 @@ export class RustBinaryLocator {
|
||||
try {
|
||||
await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK);
|
||||
return true;
|
||||
} catch {
|
||||
// File may exist but lack execute bit (common after npm/pnpm install).
|
||||
// Try to make it executable.
|
||||
try {
|
||||
await plugins.fs.promises.access(filePath, plugins.fs.constants.F_OK);
|
||||
await plugins.fs.promises.chmod(filePath, 0o755);
|
||||
this.logger.log('info', `Auto-fixed missing execute permission on: ${filePath}`);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async findInPath(binaryName: string): Promise<string | null> {
|
||||
const pathDirs = (process.env.PATH || '').split(plugins.path.delimiter);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from './plugins.js';
|
||||
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
import { StreamingResponse } from './classes.streamingresponse.js';
|
||||
import type {
|
||||
IRustBridgeOptions,
|
||||
IRustBridgeLogger,
|
||||
@@ -7,6 +8,8 @@ import type {
|
||||
IManagementRequest,
|
||||
IManagementResponse,
|
||||
IManagementEvent,
|
||||
TStreamingCommandKeys,
|
||||
TExtractChunk,
|
||||
} from './interfaces/index.js';
|
||||
|
||||
const defaultLogger: IRustBridgeLogger = {
|
||||
@@ -21,14 +24,16 @@ const defaultLogger: IRustBridgeLogger = {
|
||||
*/
|
||||
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
|
||||
private locator: RustBinaryLocator;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName'>> & IRustBridgeOptions;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
|
||||
private logger: IRustBridgeLogger;
|
||||
private childProcess: plugins.childProcess.ChildProcess | null = null;
|
||||
private readlineInterface: plugins.readline.Interface | null = null;
|
||||
private stdoutBuffer: Buffer = Buffer.alloc(0);
|
||||
private stderrRemainder: string = '';
|
||||
private pendingRequests = new Map<string, {
|
||||
resolve: (value: any) => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
streaming?: StreamingResponse<any, any>;
|
||||
}>();
|
||||
private requestCounter = 0;
|
||||
private isRunning = false;
|
||||
@@ -42,6 +47,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
requestTimeoutMs: 30000,
|
||||
readyTimeoutMs: 10000,
|
||||
readyEventName: 'ready',
|
||||
maxPayloadSize: 50 * 1024 * 1024,
|
||||
...options,
|
||||
};
|
||||
this.locator = new RustBinaryLocator(options, this.logger);
|
||||
@@ -68,24 +74,34 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
env,
|
||||
});
|
||||
|
||||
// Handle stderr
|
||||
// Handle stderr with cross-chunk buffering
|
||||
this.childProcess.stderr?.on('data', (data: Buffer) => {
|
||||
const lines = data.toString().split('\n').filter((l: string) => l.trim());
|
||||
this.stderrRemainder += data.toString();
|
||||
const lines = this.stderrRemainder.split('\n');
|
||||
// Keep the last element (incomplete line) as remainder
|
||||
this.stderrRemainder = lines.pop()!;
|
||||
for (const line of lines) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
|
||||
this.emit('stderr', line);
|
||||
const trimmed = line.trim();
|
||||
if (trimmed) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`);
|
||||
this.emit('stderr', trimmed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stdout via readline for line-delimited JSON
|
||||
this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! });
|
||||
this.readlineInterface.on('line', (line: string) => {
|
||||
this.handleLine(line.trim());
|
||||
// Handle stdout via Buffer-based newline scanner
|
||||
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
|
||||
this.handleStdoutChunk(chunk);
|
||||
});
|
||||
|
||||
// Handle process exit
|
||||
this.childProcess.on('exit', (code, signal) => {
|
||||
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
|
||||
// Flush any remaining stderr
|
||||
if (this.stderrRemainder.trim()) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`);
|
||||
this.emit('stderr', this.stderrRemainder.trim());
|
||||
}
|
||||
this.cleanup();
|
||||
this.emit('exit', code, signal);
|
||||
});
|
||||
@@ -130,6 +146,15 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
const json = JSON.stringify(request);
|
||||
|
||||
// Check outbound payload size
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
if (byteLength > this.options.maxPayloadSize) {
|
||||
throw new Error(
|
||||
`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`
|
||||
);
|
||||
}
|
||||
|
||||
return new Promise<TCommands[K]['result']>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
@@ -139,15 +164,62 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
|
||||
this.pendingRequests.set(id, { resolve, reject, timer });
|
||||
|
||||
const json = JSON.stringify(request) + '\n';
|
||||
this.childProcess!.stdin!.write(json, (err) => {
|
||||
if (err) {
|
||||
this.writeToStdin(json + '\n').catch((err) => {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a streaming command to the Rust process.
|
||||
* Returns a StreamingResponse that yields chunks via `for await...of`
|
||||
* and exposes `.result` for the final response.
|
||||
*/
|
||||
public sendCommandStreaming<K extends string & TStreamingCommandKeys<TCommands>>(
|
||||
method: K,
|
||||
params: TCommands[K]['params'],
|
||||
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
|
||||
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
|
||||
|
||||
if (!this.childProcess || !this.isRunning) {
|
||||
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
|
||||
return streaming;
|
||||
}
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
const json = JSON.stringify(request);
|
||||
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
if (byteLength > this.options.maxPayloadSize) {
|
||||
streaming.fail(
|
||||
new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`)
|
||||
);
|
||||
return streaming;
|
||||
}
|
||||
|
||||
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||
const timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(id);
|
||||
streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
|
||||
this.pendingRequests.set(id, {
|
||||
resolve: (result: any) => streaming.finish(result),
|
||||
reject: (error: Error) => streaming.fail(error),
|
||||
timer,
|
||||
streaming,
|
||||
});
|
||||
|
||||
this.writeToStdin(json + '\n').catch((err) => {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
streaming.fail(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
});
|
||||
|
||||
return streaming;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -159,11 +231,9 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
this.childProcess = null;
|
||||
this.isRunning = false;
|
||||
|
||||
// Close readline
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
// Clear buffers
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
this.stderrRemainder = '';
|
||||
|
||||
// Reject pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
@@ -203,6 +273,62 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
return this.isRunning;
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer-based newline scanner for stdout chunks.
|
||||
* Replaces readline to handle large payloads without buffering entire lines in a separate abstraction.
|
||||
*/
|
||||
private handleStdoutChunk(chunk: Buffer): void {
|
||||
this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]);
|
||||
|
||||
let newlineIndex: number;
|
||||
while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) {
|
||||
const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex);
|
||||
this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1);
|
||||
|
||||
if (lineBuffer.length > this.options.maxPayloadSize) {
|
||||
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const line = lineBuffer.toString('utf8').trim();
|
||||
this.handleLine(line);
|
||||
}
|
||||
|
||||
// If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM
|
||||
if (this.stdoutBuffer.length > this.options.maxPayloadSize) {
|
||||
this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`);
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to stdin with backpressure support.
|
||||
* Waits for drain if the internal buffer is full.
|
||||
*/
|
||||
private writeToStdin(data: string): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (!this.childProcess?.stdin) {
|
||||
reject(new Error('stdin not available'));
|
||||
return;
|
||||
}
|
||||
|
||||
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
if (canContinue) {
|
||||
resolve();
|
||||
} else {
|
||||
// Wait for drain before resolving
|
||||
this.childProcess.stdin.once('drain', () => {
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private handleLine(line: string): void {
|
||||
if (!line) return;
|
||||
|
||||
@@ -221,6 +347,22 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
return;
|
||||
}
|
||||
|
||||
// Stream chunk (has 'id' + stream === true + 'data')
|
||||
if ('id' in parsed && parsed.stream === true && 'data' in parsed) {
|
||||
const pending = this.pendingRequests.get(parsed.id);
|
||||
if (pending?.streaming) {
|
||||
// Reset inactivity timeout
|
||||
clearTimeout(pending.timer);
|
||||
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||
pending.timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(parsed.id);
|
||||
pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`));
|
||||
}, timeoutMs);
|
||||
pending.streaming.pushChunk(parsed.data);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise it's a response (has 'id' field)
|
||||
if ('id' in parsed) {
|
||||
const response = parsed as IManagementResponse;
|
||||
@@ -240,11 +382,8 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
private cleanup(): void {
|
||||
this.isRunning = false;
|
||||
this.childProcess = null;
|
||||
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
this.stderrRemainder = '';
|
||||
|
||||
// Reject all pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
|
||||
110
ts/classes.streamingresponse.ts
Normal file
110
ts/classes.streamingresponse.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Represents a streaming response from a Rust bridge command.
|
||||
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
|
||||
* and exposes `.result` for the final response once the stream ends.
|
||||
*
|
||||
* @typeParam TChunk - Type of each streamed chunk
|
||||
* @typeParam TResult - Type of the final result
|
||||
*/
|
||||
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
|
||||
/** Resolves with the final result when the stream ends successfully. */
|
||||
public readonly result: Promise<TResult>;
|
||||
|
||||
private resolveResult!: (value: TResult) => void;
|
||||
private rejectResult!: (error: Error) => void;
|
||||
|
||||
/** Buffered chunks not yet consumed by the iterator. */
|
||||
private buffer: TChunk[] = [];
|
||||
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
|
||||
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
|
||||
/** Waiting consumer reject callback. */
|
||||
private waitingReject: ((error: Error) => void) | null = null;
|
||||
|
||||
private done = false;
|
||||
private error: Error | null = null;
|
||||
|
||||
constructor() {
|
||||
this.result = new Promise<TResult>((resolve, reject) => {
|
||||
this.resolveResult = resolve;
|
||||
this.rejectResult = reject;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a chunk into the stream. Called internally by RustBridge.
|
||||
*/
|
||||
public pushChunk(chunk: TChunk): void {
|
||||
if (this.done) return;
|
||||
|
||||
if (this.waiting) {
|
||||
// A consumer is waiting — deliver immediately
|
||||
const resolve = this.waiting;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
resolve({ value: chunk, done: false });
|
||||
} else {
|
||||
// No consumer waiting — buffer the chunk
|
||||
this.buffer.push(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the stream successfully with a final result. Called internally by RustBridge.
|
||||
*/
|
||||
public finish(result: TResult): void {
|
||||
if (this.done) return;
|
||||
this.done = true;
|
||||
this.resolveResult(result);
|
||||
|
||||
// If a consumer is waiting, signal end of iteration
|
||||
if (this.waiting) {
|
||||
const resolve = this.waiting;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
resolve({ value: undefined as any, done: true });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the stream with an error. Called internally by RustBridge.
|
||||
*/
|
||||
public fail(error: Error): void {
|
||||
if (this.done) return;
|
||||
this.done = true;
|
||||
this.error = error;
|
||||
this.rejectResult(error);
|
||||
|
||||
// If a consumer is waiting, reject it
|
||||
if (this.waitingReject) {
|
||||
const reject = this.waitingReject;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
|
||||
return {
|
||||
next: (): Promise<IteratorResult<TChunk>> => {
|
||||
// If there are buffered chunks, deliver one
|
||||
if (this.buffer.length > 0) {
|
||||
return Promise.resolve({ value: this.buffer.shift()!, done: false });
|
||||
}
|
||||
|
||||
// If the stream is done, signal end
|
||||
if (this.done) {
|
||||
if (this.error) {
|
||||
return Promise.reject(this.error);
|
||||
}
|
||||
return Promise.resolve({ value: undefined as any, done: true });
|
||||
}
|
||||
|
||||
// No buffered chunks and not done — wait for the next push
|
||||
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
|
||||
this.waiting = resolve;
|
||||
this.waitingReject = reject;
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
export { RustBridge } from './classes.rustbridge.js';
|
||||
export { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
export { StreamingResponse } from './classes.streamingresponse.js';
|
||||
export * from './interfaces/index.js';
|
||||
|
||||
@@ -39,4 +39,9 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions {
|
||||
readyEventName?: string;
|
||||
/** Optional logger instance */
|
||||
logger?: IRustBridgeLogger;
|
||||
/** Maximum message size in bytes (default: 50MB). Messages exceeding this are rejected. */
|
||||
maxPayloadSize?: number;
|
||||
/** Inactivity timeout for streaming commands in ms (default: same as requestTimeoutMs).
|
||||
* Resets on each chunk received. */
|
||||
streamTimeoutMs?: number;
|
||||
}
|
||||
|
||||
@@ -38,3 +38,25 @@ export interface ICommandDefinition<TParams = any, TResult = any> {
|
||||
* Used to type-safe the bridge's sendCommand method.
|
||||
*/
|
||||
export type TCommandMap = Record<string, ICommandDefinition>;
|
||||
|
||||
/**
|
||||
* Stream chunk message received from the Rust binary during a streaming command.
|
||||
*/
|
||||
export interface IManagementStreamChunk {
|
||||
id: string;
|
||||
stream: true;
|
||||
data: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract keys from a command map whose definitions include a `chunk` field,
|
||||
* indicating they support streaming responses.
|
||||
*/
|
||||
export type TStreamingCommandKeys<TCommands extends TCommandMap> = {
|
||||
[K in keyof TCommands]: TCommands[K] extends { chunk: any } ? K : never;
|
||||
}[keyof TCommands];
|
||||
|
||||
/**
|
||||
* Extract the chunk type from a command definition that has a `chunk` field.
|
||||
*/
|
||||
export type TExtractChunk<TDef> = TDef extends { chunk: infer C } ? C : never;
|
||||
|
||||
@@ -2,10 +2,10 @@
|
||||
import * as path from 'path';
|
||||
import * as fs from 'fs';
|
||||
import * as childProcess from 'child_process';
|
||||
import * as readline from 'readline';
|
||||
import * as events from 'events';
|
||||
import * as url from 'url';
|
||||
|
||||
export { path, fs, childProcess, readline, events };
|
||||
export { path, fs, childProcess, events, url };
|
||||
|
||||
// @push.rocks scope
|
||||
import * as smartpath from '@push.rocks/smartpath';
|
||||
|
||||
Reference in New Issue
Block a user