Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 35971a395f | |||
| 5fb991ff51 | |||
| dcb88ef4b5 | |||
| 6e4947ef7d | |||
| 38249ea20f | |||
| 9d30a283b5 |
27
changelog.md
27
changelog.md
@@ -1,5 +1,32 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-02-11 - 1.2.0 - feat(rustbridge)
|
||||
add streaming responses and robust large-payload/backpressure handling to RustBridge
|
||||
|
||||
- Introduce StreamingResponse type and export it (for-await-of iterator + .result promise)
|
||||
- Add sendCommandStreaming API to send streaming commands and receive chunks + final result
|
||||
- Implement buffer-based stdout newline scanner to handle large messages and avoid readline limits
|
||||
- Add backpressure-aware writeToStdin to wait for drain when writing large outbound payloads
|
||||
- Add maxPayloadSize option and enforce outbound/inbound size checks to prevent OOMs
|
||||
- Add streamTimeoutMs (inactivity timeout) and reset timeout on each received chunk
|
||||
- Improve stderr handling (cross-chunk buffering and trimmed emits)
|
||||
- Update mock test binary and extensive tests for streaming, large payloads, concurrency, and error cases
|
||||
- Add TypeScript types for streaming commands (TStreamingCommandKeys, TExtractChunk, IManagementStreamChunk)
|
||||
|
||||
## 2026-02-10 - 1.1.2 - fix(rust-binary-locator)
|
||||
use import.meta.resolve and url.fileURLToPath to locate bundled Rust binary in ESM environments
|
||||
|
||||
- Replace require.resolve with import.meta.resolve to support ESM module resolution
|
||||
- Convert resolved file URL to a filesystem path using url.fileURLToPath
|
||||
- Export the url module from ts/plugins to provide fileURLToPath
|
||||
|
||||
## 2026-02-10 - 1.1.1 - fix(readme)
|
||||
update README with comprehensive documentation, usage examples, API reference, installation instructions, and legal/company information
|
||||
|
||||
- Rewrote readme.md (≈ +298 −3 lines) to add detailed install, overview, IPC protocol, command definition examples, usage, API reference, issue reporting & security guidance, and legal/trademark/company information.
|
||||
- Documentation-only change — no source code modified.
|
||||
- Current package version is 1.1.0; recommend a patch release
|
||||
|
||||
## 2026-02-10 - 1.1.0 - feat(rustbridge)
|
||||
add RustBridge and RustBinaryLocator with typed IPC interfaces, plugins, tests and mock runner; export from index; add npm registries
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartrust",
|
||||
"version": "1.1.0",
|
||||
"version": "1.2.0",
|
||||
"private": false,
|
||||
"description": "a bridge between JS engines and rust",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
435
readme.md
435
readme.md
@@ -1,5 +1,434 @@
|
||||
# @push.rocks/smartrust
|
||||
a bridge between JS engines and rust
|
||||
|
||||
## How to create the docs
|
||||
To create docs run gitzone aidoc.
|
||||
A type-safe, production-ready bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC — with support for request/response, streaming, and event patterns.
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
For reporting bugs, issues, or security vulnerabilities, please visit [community.foss.global/](https://community.foss.global/). This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a [code.foss.global/](https://code.foss.global/) account to submit Pull Requests directly.
|
||||
|
||||
## Install 📦
|
||||
|
||||
```bash
|
||||
npm install @push.rocks/smartrust
|
||||
# or
|
||||
pnpm install @push.rocks/smartrust
|
||||
```
|
||||
|
||||
## Overview 🔭
|
||||
|
||||
`@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
|
||||
|
||||
### Why? 🤔
|
||||
|
||||
If you're integrating Rust into a Node.js project, you'll inevitably need:
|
||||
- A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages)
|
||||
- A way to **spawn** it and establish reliable two-way communication
|
||||
- **Type-safe** request/response patterns with proper error handling
|
||||
- **Streaming responses** for progressive data processing, log tailing, or chunked transfers
|
||||
- **Event streaming** from Rust to TypeScript
|
||||
- **Graceful lifecycle management** (ready detection, clean shutdown, force kill)
|
||||
|
||||
`smartrust` wraps all of this into three classes: `RustBridge`, `RustBinaryLocator`, and `StreamingResponse`.
|
||||
|
||||
## Usage 🚀
|
||||
|
||||
### The IPC Protocol
|
||||
|
||||
`smartrust` uses a simple, newline-delimited JSON protocol over stdin/stdout:
|
||||
|
||||
| Direction | Format | Description |
|
||||
|-----------|--------|-------------|
|
||||
| **TS → Rust** (Request) | `{"id": "req_1", "method": "start", "params": {...}}` | Command with unique ID |
|
||||
| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Final response correlated by ID |
|
||||
| **Rust → TS** (Error) | `{"id": "req_1", "success": false, "error": "msg"}` | Error correlated by ID |
|
||||
| **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) |
|
||||
| **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) |
|
||||
|
||||
Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. That's it. Stderr is free for logging.
|
||||
|
||||
### Defining Your Commands
|
||||
|
||||
Start by defining a type map of commands your Rust binary supports:
|
||||
|
||||
```typescript
|
||||
import { RustBridge } from '@push.rocks/smartrust';
|
||||
|
||||
// Define your command types
|
||||
type TMyCommands = {
|
||||
start: { params: { port: number; host: string }; result: { pid: number } };
|
||||
stop: { params: {}; result: void };
|
||||
getMetrics: { params: {}; result: { connections: number; uptime: number } };
|
||||
reload: { params: { configPath: string }; result: void };
|
||||
};
|
||||
```
|
||||
|
||||
### Creating and Using the Bridge
|
||||
|
||||
```typescript
|
||||
const bridge = new RustBridge<TMyCommands>({
|
||||
binaryName: 'my-rust-server',
|
||||
envVarName: 'MY_SERVER_BINARY', // optional: env var override
|
||||
platformPackagePrefix: '@myorg/my-server', // optional: platform npm packages
|
||||
});
|
||||
|
||||
// Spawn the binary and wait for it to signal readiness
|
||||
const ok = await bridge.spawn();
|
||||
if (!ok) {
|
||||
console.error('Failed to start Rust binary');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Send type-safe commands — params and return types are inferred!
|
||||
const { pid } = await bridge.sendCommand('start', { port: 8080, host: '0.0.0.0' });
|
||||
console.log(`Server started with PID ${pid}`);
|
||||
|
||||
const metrics = await bridge.sendCommand('getMetrics', {});
|
||||
console.log(`Active connections: ${metrics.connections}`);
|
||||
|
||||
// Listen for events from Rust
|
||||
bridge.on('management:configChanged', (data) => {
|
||||
console.log('Config was changed:', data);
|
||||
});
|
||||
|
||||
// Clean shutdown
|
||||
bridge.kill();
|
||||
```
|
||||
|
||||
### Streaming Commands 🌊
|
||||
|
||||
For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output.
|
||||
|
||||
#### Defining Streaming Commands
|
||||
|
||||
Add a `chunk` field to your command type definition to mark it as streamable:
|
||||
|
||||
```typescript
|
||||
type TMyCommands = {
|
||||
// Regular command (request → response)
|
||||
ping: { params: {}; result: { pong: boolean } };
|
||||
|
||||
// Streaming command (request → chunks... → final result)
|
||||
processData: { params: { count: number }; chunk: { index: number; progress: number }; result: { totalProcessed: number } };
|
||||
tailLogs: { params: { lines: number }; chunk: string; result: { linesRead: number } };
|
||||
};
|
||||
```
|
||||
|
||||
#### Consuming Streams
|
||||
|
||||
```typescript
|
||||
// Returns a StreamingResponse immediately (does NOT block)
|
||||
const stream = bridge.sendCommandStreaming('processData', { count: 1000 });
|
||||
|
||||
// Consume chunks with for-await-of
|
||||
for await (const chunk of stream) {
|
||||
console.log(`Processing item ${chunk.index}, progress: ${chunk.progress}%`);
|
||||
}
|
||||
|
||||
// Get the final result after all chunks are consumed
|
||||
const result = await stream.result;
|
||||
console.log(`Done! Processed ${result.totalProcessed} items`);
|
||||
```
|
||||
|
||||
#### Error Handling in Streams
|
||||
|
||||
Errors propagate to both the iterator and the `.result` promise:
|
||||
|
||||
```typescript
|
||||
const stream = bridge.sendCommandStreaming('processData', { count: 100 });
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
console.log(chunk);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Stream failed:', err.message);
|
||||
}
|
||||
|
||||
// .result also rejects on error
|
||||
try {
|
||||
await stream.result;
|
||||
} catch (err) {
|
||||
console.error('Same error here:', err.message);
|
||||
}
|
||||
```
|
||||
|
||||
#### Stream Timeout
|
||||
|
||||
By default, streaming commands use the same timeout as regular commands (`requestTimeoutMs`). The timeout **resets on each chunk received**, so it acts as an inactivity timeout rather than an absolute timeout. You can configure it separately:
|
||||
|
||||
```typescript
|
||||
const bridge = new RustBridge<TMyCommands>({
|
||||
binaryName: 'my-server',
|
||||
requestTimeoutMs: 30000, // regular command timeout: 30s
|
||||
streamTimeoutMs: 60000, // streaming inactivity timeout: 60s
|
||||
});
|
||||
```
|
||||
|
||||
#### Implementing Streaming on the Rust Side
|
||||
|
||||
Your Rust binary sends stream chunks by writing lines with `"stream": true` before the final response:
|
||||
|
||||
```rust
|
||||
// For each chunk:
|
||||
println!(r#"{{"id":"{}","stream":true,"data":{{"index":{},"progress":{}}}}}"#, req.id, i, pct);
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
// When done, send the final response (same as non-streaming):
|
||||
println!(r#"{{"id":"{}","success":true,"result":{{"totalProcessed":{}}}}}"#, req.id, total);
|
||||
io::stdout().flush().unwrap();
|
||||
```
|
||||
|
||||
### Binary Locator 🔍
|
||||
|
||||
The `RustBinaryLocator` searches for your binary using a priority-ordered strategy:
|
||||
|
||||
| Priority | Source | Description |
|
||||
|----------|--------|-------------|
|
||||
| 1 | `binaryPath` option | Explicit path — skips all other search |
|
||||
| 2 | Environment variable | e.g. `MY_SERVER_BINARY=/usr/local/bin/server` |
|
||||
| 3 | Platform npm package | e.g. `@myorg/my-server-linux-x64/my-rust-server` |
|
||||
| 4 | Local dev paths | `./rust/target/release/<name>` and `./rust/target/debug/<name>` |
|
||||
| 5 | System PATH | Standard `$PATH` lookup |
|
||||
|
||||
You can also use the locator standalone:
|
||||
|
||||
```typescript
|
||||
import { RustBinaryLocator } from '@push.rocks/smartrust';
|
||||
|
||||
const locator = new RustBinaryLocator({
|
||||
binaryName: 'my-rust-server',
|
||||
envVarName: 'MY_SERVER_BINARY',
|
||||
localPaths: ['/opt/myapp/bin/server'], // custom search paths
|
||||
});
|
||||
|
||||
const binaryPath = await locator.findBinary();
|
||||
// Result is cached — call clearCache() to force re-search
|
||||
```
|
||||
|
||||
### Configuration Reference ⚙️
|
||||
|
||||
The `RustBridge` constructor accepts an `IRustBridgeOptions` object:
|
||||
|
||||
```typescript
|
||||
const bridge = new RustBridge<TMyCommands>({
|
||||
// --- Binary Locator Options ---
|
||||
binaryName: 'my-server', // required: name of the binary
|
||||
binaryPath: '/explicit/path/to/binary', // optional: skip search entirely
|
||||
envVarName: 'MY_SERVER_BINARY', // optional: env var for path override
|
||||
platformPackagePrefix: '@myorg/my-server', // optional: platform npm package prefix
|
||||
localPaths: ['./build/server'], // optional: custom local search paths
|
||||
searchSystemPath: true, // optional: search $PATH (default: true)
|
||||
|
||||
// --- Bridge Options ---
|
||||
cliArgs: ['--management'], // optional: args passed to binary (default: ['--management'])
|
||||
requestTimeoutMs: 30000, // optional: per-request timeout (default: 30000)
|
||||
streamTimeoutMs: 30000, // optional: streaming inactivity timeout (default: requestTimeoutMs)
|
||||
readyTimeoutMs: 10000, // optional: ready event timeout (default: 10000)
|
||||
maxPayloadSize: 50 * 1024 * 1024, // optional: max message size in bytes (default: 50MB)
|
||||
env: { RUST_LOG: 'debug' }, // optional: extra env vars for the child process
|
||||
readyEventName: 'ready', // optional: name of the ready event (default: 'ready')
|
||||
logger: myLogger, // optional: logger implementing IRustBridgeLogger
|
||||
});
|
||||
```
|
||||
|
||||
### Events 📡
|
||||
|
||||
`RustBridge` extends `EventEmitter` and emits the following events:
|
||||
|
||||
| Event | Payload | Description |
|
||||
|-------|---------|-------------|
|
||||
| `ready` | — | Bridge connected and binary reported ready |
|
||||
| `exit` | `(code, signal)` | Rust process exited |
|
||||
| `stderr` | `string` | A line from the binary's stderr |
|
||||
| `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) |
|
||||
|
||||
### Custom Logger 📝
|
||||
|
||||
Plug in your own logger by implementing the `IRustBridgeLogger` interface:
|
||||
|
||||
```typescript
|
||||
import type { IRustBridgeLogger } from '@push.rocks/smartrust';
|
||||
|
||||
const logger: IRustBridgeLogger = {
|
||||
log(level: string, message: string, data?: Record<string, any>) {
|
||||
console.log(`[${level}] ${message}`, data || '');
|
||||
},
|
||||
};
|
||||
|
||||
const bridge = new RustBridge<TMyCommands>({
|
||||
binaryName: 'my-server',
|
||||
logger,
|
||||
});
|
||||
```
|
||||
|
||||
### Writing the Rust Side 🦀
|
||||
|
||||
Your Rust binary needs to implement a simple protocol:
|
||||
|
||||
1. **On startup**, write a ready event to stdout:
|
||||
```
|
||||
{"event":"ready","data":{"version":"1.0.0"}}\n
|
||||
```
|
||||
|
||||
2. **Read JSON lines from stdin**, parse each as `{"id": "...", "method": "...", "params": {...}}`
|
||||
|
||||
3. **Write JSON responses to stdout**, each as `{"id": "...", "success": true, "result": {...}}\n`
|
||||
|
||||
4. **For streaming commands**, write zero or more `{"id": "...", "stream": true, "data": {...}}\n` chunks before the final response
|
||||
|
||||
5. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout
|
||||
|
||||
6. **Use stderr** for logging — it won't interfere with the IPC protocol
|
||||
|
||||
Here's a minimal Rust skeleton:
|
||||
|
||||
```rust
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::{self, BufRead, Write};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Request {
|
||||
id: String,
|
||||
method: String,
|
||||
params: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Response {
|
||||
id: String,
|
||||
success: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
result: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StreamChunk {
|
||||
id: String,
|
||||
stream: bool,
|
||||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
// Signal ready
|
||||
println!(r#"{{"event":"ready","data":{{"version":"1.0.0"}}}}"#);
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
let stdin = io::stdin();
|
||||
for line in stdin.lock().lines() {
|
||||
let line = line.unwrap();
|
||||
let req: Request = serde_json::from_str(&line).unwrap();
|
||||
|
||||
match req.method.as_str() {
|
||||
"ping" => {
|
||||
let resp = Response {
|
||||
id: req.id,
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"pong": true})),
|
||||
error: None,
|
||||
};
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
"processData" => {
|
||||
let count = req.params["count"].as_u64().unwrap_or(0);
|
||||
// Send stream chunks
|
||||
for i in 0..count {
|
||||
let chunk = StreamChunk {
|
||||
id: req.id.clone(),
|
||||
stream: true,
|
||||
data: serde_json::json!({"index": i, "progress": ((i+1) * 100 / count)}),
|
||||
};
|
||||
println!("{}", serde_json::to_string(&chunk).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
// Send final response
|
||||
let resp = Response {
|
||||
id: req.id,
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"totalProcessed": count})),
|
||||
error: None,
|
||||
};
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
_ => {
|
||||
let resp = Response {
|
||||
id: req.id,
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!("Unknown method: {}", req.method)),
|
||||
};
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## API Reference 📖
|
||||
|
||||
### `RustBridge<TCommands>`
|
||||
|
||||
| Method / Property | Signature | Description |
|
||||
|---|---|---|
|
||||
| `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance |
|
||||
| `spawn()` | `Promise<boolean>` | Spawn the binary and wait for ready; returns `false` on failure |
|
||||
| `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response |
|
||||
| `sendCommandStreaming(method, params)` | `StreamingResponse<TChunk, TResult>` | Send a streaming command; returns immediately |
|
||||
| `kill()` | `void` | SIGTERM the process, reject pending requests, force SIGKILL after 5s |
|
||||
| `running` | `boolean` | Whether the bridge is currently connected |
|
||||
|
||||
### `StreamingResponse<TChunk, TResult>`
|
||||
|
||||
| Method / Property | Type | Description |
|
||||
|---|---|---|
|
||||
| `[Symbol.asyncIterator]()` | `AsyncIterator<TChunk>` | Enables `for await...of` consumption of chunks |
|
||||
| `result` | `Promise<TResult>` | Resolves with the final result after stream ends |
|
||||
|
||||
### `RustBinaryLocator`
|
||||
|
||||
| Method / Property | Signature | Description |
|
||||
|---|---|---|
|
||||
| `constructor` | `new RustBinaryLocator(options: IBinaryLocatorOptions, logger?)` | Create a locator instance |
|
||||
| `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached |
|
||||
| `clearCache()` | `void` | Clear the cached path to force a fresh search |
|
||||
|
||||
### Exported Interfaces & Types
|
||||
|
||||
| Interface / Type | Description |
|
||||
|---|---|
|
||||
| `IRustBridgeOptions` | Full configuration for `RustBridge` |
|
||||
| `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` |
|
||||
| `IRustBridgeLogger` | Logger interface: `{ log(level, message, data?) }` |
|
||||
| `IManagementRequest` | IPC request shape: `{ id, method, params }` |
|
||||
| `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` |
|
||||
| `IManagementEvent` | IPC event shape: `{ event, data }` |
|
||||
| `IManagementStreamChunk` | IPC stream chunk shape: `{ id, stream: true, data }` |
|
||||
| `ICommandDefinition` | Single command definition: `{ params, result }` |
|
||||
| `TCommandMap` | `Record<string, ICommandDefinition>` |
|
||||
| `TStreamingCommandKeys<T>` | Extracts keys from a command map that have a `chunk` field |
|
||||
| `TExtractChunk<T>` | Extracts the chunk type from a streaming command definition |
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
|
||||
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||
|
||||
### Trademarks
|
||||
|
||||
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
|
||||
|
||||
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
|
||||
|
||||
### Company Information
|
||||
|
||||
Task Venture Capital GmbH
|
||||
Registered at District Court Bremen HRB 35230 HB, Germany
|
||||
|
||||
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
||||
|
||||
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
|
||||
|
||||
@@ -2,22 +2,46 @@
|
||||
|
||||
/**
|
||||
* Mock "Rust binary" for testing the RustBridge IPC protocol.
|
||||
* Reads JSON lines from stdin, writes JSON lines to stdout.
|
||||
* Reads JSON lines from stdin via Buffer-based scanner, writes JSON lines to stdout.
|
||||
* Emits a ready event on startup.
|
||||
*/
|
||||
|
||||
import { createInterface } from 'readline';
|
||||
|
||||
// Emit ready event
|
||||
const readyEvent = JSON.stringify({ event: 'ready', data: { version: '1.0.0' } });
|
||||
process.stdout.write(readyEvent + '\n');
|
||||
|
||||
const rl = createInterface({ input: process.stdin });
|
||||
// Buffer-based newline scanner for stdin (mirrors the RustBridge approach)
|
||||
let stdinBuffer = Buffer.alloc(0);
|
||||
|
||||
rl.on('line', (line) => {
|
||||
process.stdin.on('data', (chunk) => {
|
||||
stdinBuffer = Buffer.concat([stdinBuffer, chunk]);
|
||||
|
||||
let newlineIndex;
|
||||
while ((newlineIndex = stdinBuffer.indexOf(0x0A)) !== -1) {
|
||||
const lineBuffer = stdinBuffer.subarray(0, newlineIndex);
|
||||
stdinBuffer = stdinBuffer.subarray(newlineIndex + 1);
|
||||
const line = lineBuffer.toString('utf8').trim();
|
||||
if (line) {
|
||||
handleLine(line);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Backpressure-aware write to stdout.
|
||||
*/
|
||||
function writeResponse(data) {
|
||||
const json = JSON.stringify(data) + '\n';
|
||||
if (!process.stdout.write(json)) {
|
||||
// Wait for drain before continuing
|
||||
process.stdout.once('drain', () => {});
|
||||
}
|
||||
}
|
||||
|
||||
function handleLine(line) {
|
||||
let request;
|
||||
try {
|
||||
request = JSON.parse(line.trim());
|
||||
request = JSON.parse(line);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
@@ -26,35 +50,53 @@ rl.on('line', (line) => {
|
||||
|
||||
if (method === 'echo') {
|
||||
// Echo back the params as result
|
||||
const response = JSON.stringify({ id, success: true, result: params });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: true, result: params });
|
||||
} else if (method === 'largeEcho') {
|
||||
// Echo back params (same as echo, named distinctly for large payload tests)
|
||||
writeResponse({ id, success: true, result: params });
|
||||
} else if (method === 'error') {
|
||||
// Return an error
|
||||
const response = JSON.stringify({ id, success: false, error: 'Test error message' });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: false, error: 'Test error message' });
|
||||
} else if (method === 'emitEvent') {
|
||||
// Emit a custom event, then respond with success
|
||||
const event = JSON.stringify({ event: params.eventName, data: params.eventData });
|
||||
process.stdout.write(event + '\n');
|
||||
const response = JSON.stringify({ id, success: true, result: null });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ event: params.eventName, data: params.eventData });
|
||||
writeResponse({ id, success: true, result: null });
|
||||
} else if (method === 'slow') {
|
||||
// Respond after a delay
|
||||
setTimeout(() => {
|
||||
const response = JSON.stringify({ id, success: true, result: { delayed: true } });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: true, result: { delayed: true } });
|
||||
}, 100);
|
||||
} else if (method === 'streamEcho') {
|
||||
// Send params.count stream chunks, then final response
|
||||
const count = params.count || 0;
|
||||
let sent = 0;
|
||||
const interval = setInterval(() => {
|
||||
if (sent < count) {
|
||||
writeResponse({ id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
|
||||
sent++;
|
||||
} else {
|
||||
clearInterval(interval);
|
||||
writeResponse({ id, success: true, result: { totalChunks: count } });
|
||||
}
|
||||
}, 10);
|
||||
} else if (method === 'streamError') {
|
||||
// Send 1 chunk, then error
|
||||
writeResponse({ id, stream: true, data: { index: 0, value: 'before_error' } });
|
||||
setTimeout(() => {
|
||||
writeResponse({ id, success: false, error: 'Stream error after chunk' });
|
||||
}, 20);
|
||||
} else if (method === 'streamEmpty') {
|
||||
// Zero chunks, immediate final response
|
||||
writeResponse({ id, success: true, result: { totalChunks: 0 } });
|
||||
} else if (method === 'exit') {
|
||||
// Graceful exit
|
||||
const response = JSON.stringify({ id, success: true, result: null });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: true, result: null });
|
||||
process.exit(0);
|
||||
} else {
|
||||
// Unknown command
|
||||
const response = JSON.stringify({ id, success: false, error: `Unknown method: ${method}` });
|
||||
process.stdout.write(response + '\n');
|
||||
writeResponse({ id, success: false, error: `Unknown method: ${method}` });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Handle SIGTERM gracefully
|
||||
process.on('SIGTERM', () => {
|
||||
|
||||
@@ -9,10 +9,14 @@ const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
|
||||
// Define the command types for our mock binary
|
||||
type TMockCommands = {
|
||||
echo: { params: Record<string, any>; result: Record<string, any> };
|
||||
largeEcho: { params: Record<string, any>; result: Record<string, any> };
|
||||
error: { params: {}; result: never };
|
||||
emitEvent: { params: { eventName: string; eventData: any }; result: null };
|
||||
slow: { params: {}; result: { delayed: boolean } };
|
||||
exit: { params: {}; result: null };
|
||||
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
|
||||
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
|
||||
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
|
||||
};
|
||||
|
||||
tap.test('should spawn and receive ready event', async () => {
|
||||
@@ -188,4 +192,249 @@ tap.test('should emit exit event when process exits', async () => {
|
||||
expect(bridge.running).toBeFalse();
|
||||
});
|
||||
|
||||
tap.test('should handle 1MB payload round-trip', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 30000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
// Create a ~1MB payload
|
||||
const largeString = 'x'.repeat(1024 * 1024);
|
||||
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||
expect(result.data).toEqual(largeString);
|
||||
expect(result.data.length).toEqual(1024 * 1024);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('should handle 10MB payload round-trip', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 60000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
// Create a ~10MB payload
|
||||
const largeString = 'y'.repeat(10 * 1024 * 1024);
|
||||
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||
expect(result.data).toEqual(largeString);
|
||||
expect(result.data.length).toEqual(10 * 1024 * 1024);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('should reject outbound messages exceeding maxPayloadSize', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
maxPayloadSize: 1000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
let threw = false;
|
||||
try {
|
||||
await bridge.sendCommand('largeEcho', { data: 'z'.repeat(2000) });
|
||||
} catch (err: any) {
|
||||
threw = true;
|
||||
expect(err.message).toInclude('maxPayloadSize');
|
||||
}
|
||||
expect(threw).toBeTrue();
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('should handle multiple large concurrent commands', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 30000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const size = 500 * 1024; // 500KB each
|
||||
const results = await Promise.all([
|
||||
bridge.sendCommand('largeEcho', { data: 'a'.repeat(size), id: 1 }),
|
||||
bridge.sendCommand('largeEcho', { data: 'b'.repeat(size), id: 2 }),
|
||||
bridge.sendCommand('largeEcho', { data: 'c'.repeat(size), id: 3 }),
|
||||
]);
|
||||
|
||||
expect(results[0].data.length).toEqual(size);
|
||||
expect(results[0].data[0]).toEqual('a');
|
||||
expect(results[1].data.length).toEqual(size);
|
||||
expect(results[1].data[0]).toEqual('b');
|
||||
expect(results[2].data.length).toEqual(size);
|
||||
expect(results[2].data[0]).toEqual('c');
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
// === Streaming tests ===
|
||||
|
||||
tap.test('streaming: should receive chunks via for-await-of and final result', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 10000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
|
||||
const chunks: Array<{ index: number; value: string }> = [];
|
||||
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
expect(chunks.length).toEqual(5);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(chunks[i].index).toEqual(i);
|
||||
expect(chunks[i].value).toEqual(`chunk_${i}`);
|
||||
}
|
||||
|
||||
const result = await stream.result;
|
||||
expect(result.totalChunks).toEqual(5);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('streaming: should handle zero chunks (immediate result)', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamEmpty', {});
|
||||
const chunks: any[] = [];
|
||||
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
expect(chunks.length).toEqual(0);
|
||||
|
||||
const result = await stream.result;
|
||||
expect(result.totalChunks).toEqual(0);
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('streaming: should propagate error to iterator and .result', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 10000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamError', {});
|
||||
const chunks: any[] = [];
|
||||
let iteratorError: Error | null = null;
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
} catch (err: any) {
|
||||
iteratorError = err;
|
||||
}
|
||||
|
||||
// Should have received at least one chunk before error
|
||||
expect(chunks.length).toEqual(1);
|
||||
expect(chunks[0].value).toEqual('before_error');
|
||||
|
||||
// Iterator should have thrown
|
||||
expect(iteratorError).toBeTruthy();
|
||||
expect(iteratorError!.message).toInclude('Stream error after chunk');
|
||||
|
||||
// .result should also reject
|
||||
let resultError: Error | null = null;
|
||||
try {
|
||||
await stream.result;
|
||||
} catch (err: any) {
|
||||
resultError = err;
|
||||
}
|
||||
expect(resultError).toBeTruthy();
|
||||
expect(resultError!.message).toInclude('Stream error after chunk');
|
||||
|
||||
bridge.kill();
|
||||
});
|
||||
|
||||
tap.test('streaming: should fail when bridge is not running', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
});
|
||||
|
||||
const stream = bridge.sendCommandStreaming('streamEcho', { count: 3 });
|
||||
|
||||
let resultError: Error | null = null;
|
||||
try {
|
||||
await stream.result;
|
||||
} catch (err: any) {
|
||||
resultError = err;
|
||||
}
|
||||
expect(resultError).toBeTruthy();
|
||||
expect(resultError!.message).toInclude('not running');
|
||||
});
|
||||
|
||||
tap.test('streaming: should fail when killed mid-stream', async () => {
|
||||
const bridge = new RustBridge<TMockCommands>({
|
||||
binaryName: 'node',
|
||||
binaryPath: 'node',
|
||||
cliArgs: [mockBinaryPath],
|
||||
readyTimeoutMs: 5000,
|
||||
requestTimeoutMs: 30000,
|
||||
});
|
||||
|
||||
await bridge.spawn();
|
||||
|
||||
// Request many chunks so we can kill mid-stream
|
||||
const stream = bridge.sendCommandStreaming('streamEcho', { count: 100 });
|
||||
const chunks: any[] = [];
|
||||
let iteratorError: Error | null = null;
|
||||
|
||||
// Kill after a short delay
|
||||
setTimeout(() => {
|
||||
bridge.kill();
|
||||
}, 50);
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
} catch (err: any) {
|
||||
iteratorError = err;
|
||||
}
|
||||
|
||||
// Should have gotten some chunks but not all
|
||||
expect(iteratorError).toBeTruthy();
|
||||
expect(iteratorError!.message).toInclude('killed');
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartrust',
|
||||
version: '1.1.0',
|
||||
version: '1.2.0',
|
||||
description: 'a bridge between JS engines and rust'
|
||||
}
|
||||
|
||||
@@ -108,7 +108,8 @@ export class RustBinaryLocator {
|
||||
const packageName = `${platformPackagePrefix}-${platform}-${arch}`;
|
||||
|
||||
try {
|
||||
const packagePath = require.resolve(`${packageName}/${binaryName}`);
|
||||
const resolved = import.meta.resolve(`${packageName}/${binaryName}`);
|
||||
const packagePath = plugins.url.fileURLToPath(resolved);
|
||||
if (await this.isExecutable(packagePath)) {
|
||||
return packagePath;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from './plugins.js';
|
||||
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
import { StreamingResponse } from './classes.streamingresponse.js';
|
||||
import type {
|
||||
IRustBridgeOptions,
|
||||
IRustBridgeLogger,
|
||||
@@ -7,6 +8,8 @@ import type {
|
||||
IManagementRequest,
|
||||
IManagementResponse,
|
||||
IManagementEvent,
|
||||
TStreamingCommandKeys,
|
||||
TExtractChunk,
|
||||
} from './interfaces/index.js';
|
||||
|
||||
const defaultLogger: IRustBridgeLogger = {
|
||||
@@ -21,14 +24,16 @@ const defaultLogger: IRustBridgeLogger = {
|
||||
*/
|
||||
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
|
||||
private locator: RustBinaryLocator;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName'>> & IRustBridgeOptions;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
|
||||
private logger: IRustBridgeLogger;
|
||||
private childProcess: plugins.childProcess.ChildProcess | null = null;
|
||||
private readlineInterface: plugins.readline.Interface | null = null;
|
||||
private stdoutBuffer: Buffer = Buffer.alloc(0);
|
||||
private stderrRemainder: string = '';
|
||||
private pendingRequests = new Map<string, {
|
||||
resolve: (value: any) => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
streaming?: StreamingResponse<any, any>;
|
||||
}>();
|
||||
private requestCounter = 0;
|
||||
private isRunning = false;
|
||||
@@ -42,6 +47,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
requestTimeoutMs: 30000,
|
||||
readyTimeoutMs: 10000,
|
||||
readyEventName: 'ready',
|
||||
maxPayloadSize: 50 * 1024 * 1024,
|
||||
...options,
|
||||
};
|
||||
this.locator = new RustBinaryLocator(options, this.logger);
|
||||
@@ -68,24 +74,34 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
env,
|
||||
});
|
||||
|
||||
// Handle stderr
|
||||
// Handle stderr with cross-chunk buffering
|
||||
this.childProcess.stderr?.on('data', (data: Buffer) => {
|
||||
const lines = data.toString().split('\n').filter((l: string) => l.trim());
|
||||
this.stderrRemainder += data.toString();
|
||||
const lines = this.stderrRemainder.split('\n');
|
||||
// Keep the last element (incomplete line) as remainder
|
||||
this.stderrRemainder = lines.pop()!;
|
||||
for (const line of lines) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
|
||||
this.emit('stderr', line);
|
||||
const trimmed = line.trim();
|
||||
if (trimmed) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`);
|
||||
this.emit('stderr', trimmed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stdout via readline for line-delimited JSON
|
||||
this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! });
|
||||
this.readlineInterface.on('line', (line: string) => {
|
||||
this.handleLine(line.trim());
|
||||
// Handle stdout via Buffer-based newline scanner
|
||||
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
|
||||
this.handleStdoutChunk(chunk);
|
||||
});
|
||||
|
||||
// Handle process exit
|
||||
this.childProcess.on('exit', (code, signal) => {
|
||||
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
|
||||
// Flush any remaining stderr
|
||||
if (this.stderrRemainder.trim()) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`);
|
||||
this.emit('stderr', this.stderrRemainder.trim());
|
||||
}
|
||||
this.cleanup();
|
||||
this.emit('exit', code, signal);
|
||||
});
|
||||
@@ -130,6 +146,15 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
const json = JSON.stringify(request);
|
||||
|
||||
// Check outbound payload size
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
if (byteLength > this.options.maxPayloadSize) {
|
||||
throw new Error(
|
||||
`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`
|
||||
);
|
||||
}
|
||||
|
||||
return new Promise<TCommands[K]['result']>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
@@ -139,15 +164,62 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
|
||||
this.pendingRequests.set(id, { resolve, reject, timer });
|
||||
|
||||
const json = JSON.stringify(request) + '\n';
|
||||
this.childProcess!.stdin!.write(json, (err) => {
|
||||
if (err) {
|
||||
this.writeToStdin(json + '\n').catch((err) => {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a streaming command to the Rust process.
|
||||
* Returns a StreamingResponse that yields chunks via `for await...of`
|
||||
* and exposes `.result` for the final response.
|
||||
*/
|
||||
public sendCommandStreaming<K extends string & TStreamingCommandKeys<TCommands>>(
|
||||
method: K,
|
||||
params: TCommands[K]['params'],
|
||||
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
|
||||
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
|
||||
|
||||
if (!this.childProcess || !this.isRunning) {
|
||||
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
|
||||
return streaming;
|
||||
}
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
const json = JSON.stringify(request);
|
||||
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
if (byteLength > this.options.maxPayloadSize) {
|
||||
streaming.fail(
|
||||
new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`)
|
||||
);
|
||||
return streaming;
|
||||
}
|
||||
|
||||
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||
const timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(id);
|
||||
streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
|
||||
this.pendingRequests.set(id, {
|
||||
resolve: (result: any) => streaming.finish(result),
|
||||
reject: (error: Error) => streaming.fail(error),
|
||||
timer,
|
||||
streaming,
|
||||
});
|
||||
|
||||
this.writeToStdin(json + '\n').catch((err) => {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
streaming.fail(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
});
|
||||
|
||||
return streaming;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -159,11 +231,9 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
this.childProcess = null;
|
||||
this.isRunning = false;
|
||||
|
||||
// Close readline
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
// Clear buffers
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
this.stderrRemainder = '';
|
||||
|
||||
// Reject pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
@@ -203,6 +273,62 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
return this.isRunning;
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer-based newline scanner for stdout chunks.
|
||||
* Replaces readline to handle large payloads without buffering entire lines in a separate abstraction.
|
||||
*/
|
||||
private handleStdoutChunk(chunk: Buffer): void {
|
||||
this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]);
|
||||
|
||||
let newlineIndex: number;
|
||||
while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) {
|
||||
const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex);
|
||||
this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1);
|
||||
|
||||
if (lineBuffer.length > this.options.maxPayloadSize) {
|
||||
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const line = lineBuffer.toString('utf8').trim();
|
||||
this.handleLine(line);
|
||||
}
|
||||
|
||||
// If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM
|
||||
if (this.stdoutBuffer.length > this.options.maxPayloadSize) {
|
||||
this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`);
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to stdin with backpressure support.
|
||||
* Waits for drain if the internal buffer is full.
|
||||
*/
|
||||
private writeToStdin(data: string): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (!this.childProcess?.stdin) {
|
||||
reject(new Error('stdin not available'));
|
||||
return;
|
||||
}
|
||||
|
||||
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
if (canContinue) {
|
||||
resolve();
|
||||
} else {
|
||||
// Wait for drain before resolving
|
||||
this.childProcess.stdin.once('drain', () => {
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private handleLine(line: string): void {
|
||||
if (!line) return;
|
||||
|
||||
@@ -221,6 +347,22 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
return;
|
||||
}
|
||||
|
||||
// Stream chunk (has 'id' + stream === true + 'data')
|
||||
if ('id' in parsed && parsed.stream === true && 'data' in parsed) {
|
||||
const pending = this.pendingRequests.get(parsed.id);
|
||||
if (pending?.streaming) {
|
||||
// Reset inactivity timeout
|
||||
clearTimeout(pending.timer);
|
||||
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||
pending.timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(parsed.id);
|
||||
pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`));
|
||||
}, timeoutMs);
|
||||
pending.streaming.pushChunk(parsed.data);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise it's a response (has 'id' field)
|
||||
if ('id' in parsed) {
|
||||
const response = parsed as IManagementResponse;
|
||||
@@ -240,11 +382,8 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
||||
private cleanup(): void {
|
||||
this.isRunning = false;
|
||||
this.childProcess = null;
|
||||
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
this.stdoutBuffer = Buffer.alloc(0);
|
||||
this.stderrRemainder = '';
|
||||
|
||||
// Reject all pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
|
||||
110
ts/classes.streamingresponse.ts
Normal file
110
ts/classes.streamingresponse.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Represents a streaming response from a Rust bridge command.
|
||||
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
|
||||
* and exposes `.result` for the final response once the stream ends.
|
||||
*
|
||||
* @typeParam TChunk - Type of each streamed chunk
|
||||
* @typeParam TResult - Type of the final result
|
||||
*/
|
||||
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
|
||||
/** Resolves with the final result when the stream ends successfully. */
|
||||
public readonly result: Promise<TResult>;
|
||||
|
||||
private resolveResult!: (value: TResult) => void;
|
||||
private rejectResult!: (error: Error) => void;
|
||||
|
||||
/** Buffered chunks not yet consumed by the iterator. */
|
||||
private buffer: TChunk[] = [];
|
||||
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
|
||||
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
|
||||
/** Waiting consumer reject callback. */
|
||||
private waitingReject: ((error: Error) => void) | null = null;
|
||||
|
||||
private done = false;
|
||||
private error: Error | null = null;
|
||||
|
||||
constructor() {
|
||||
this.result = new Promise<TResult>((resolve, reject) => {
|
||||
this.resolveResult = resolve;
|
||||
this.rejectResult = reject;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a chunk into the stream. Called internally by RustBridge.
|
||||
*/
|
||||
public pushChunk(chunk: TChunk): void {
|
||||
if (this.done) return;
|
||||
|
||||
if (this.waiting) {
|
||||
// A consumer is waiting — deliver immediately
|
||||
const resolve = this.waiting;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
resolve({ value: chunk, done: false });
|
||||
} else {
|
||||
// No consumer waiting — buffer the chunk
|
||||
this.buffer.push(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the stream successfully with a final result. Called internally by RustBridge.
|
||||
*/
|
||||
public finish(result: TResult): void {
|
||||
if (this.done) return;
|
||||
this.done = true;
|
||||
this.resolveResult(result);
|
||||
|
||||
// If a consumer is waiting, signal end of iteration
|
||||
if (this.waiting) {
|
||||
const resolve = this.waiting;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
resolve({ value: undefined as any, done: true });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the stream with an error. Called internally by RustBridge.
|
||||
*/
|
||||
public fail(error: Error): void {
|
||||
if (this.done) return;
|
||||
this.done = true;
|
||||
this.error = error;
|
||||
this.rejectResult(error);
|
||||
|
||||
// If a consumer is waiting, reject it
|
||||
if (this.waitingReject) {
|
||||
const reject = this.waitingReject;
|
||||
this.waiting = null;
|
||||
this.waitingReject = null;
|
||||
reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
|
||||
return {
|
||||
next: (): Promise<IteratorResult<TChunk>> => {
|
||||
// If there are buffered chunks, deliver one
|
||||
if (this.buffer.length > 0) {
|
||||
return Promise.resolve({ value: this.buffer.shift()!, done: false });
|
||||
}
|
||||
|
||||
// If the stream is done, signal end
|
||||
if (this.done) {
|
||||
if (this.error) {
|
||||
return Promise.reject(this.error);
|
||||
}
|
||||
return Promise.resolve({ value: undefined as any, done: true });
|
||||
}
|
||||
|
||||
// No buffered chunks and not done — wait for the next push
|
||||
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
|
||||
this.waiting = resolve;
|
||||
this.waitingReject = reject;
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
export { RustBridge } from './classes.rustbridge.js';
|
||||
export { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
export { StreamingResponse } from './classes.streamingresponse.js';
|
||||
export * from './interfaces/index.js';
|
||||
|
||||
@@ -39,4 +39,9 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions {
|
||||
readyEventName?: string;
|
||||
/** Optional logger instance */
|
||||
logger?: IRustBridgeLogger;
|
||||
/** Maximum message size in bytes (default: 50MB). Messages exceeding this are rejected. */
|
||||
maxPayloadSize?: number;
|
||||
/** Inactivity timeout for streaming commands in ms (default: same as requestTimeoutMs).
|
||||
* Resets on each chunk received. */
|
||||
streamTimeoutMs?: number;
|
||||
}
|
||||
|
||||
@@ -38,3 +38,25 @@ export interface ICommandDefinition<TParams = any, TResult = any> {
|
||||
* Used to type-safe the bridge's sendCommand method.
|
||||
*/
|
||||
export type TCommandMap = Record<string, ICommandDefinition>;
|
||||
|
||||
/**
|
||||
* Stream chunk message received from the Rust binary during a streaming command.
|
||||
*/
|
||||
export interface IManagementStreamChunk {
|
||||
id: string;
|
||||
stream: true;
|
||||
data: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract keys from a command map whose definitions include a `chunk` field,
|
||||
* indicating they support streaming responses.
|
||||
*/
|
||||
export type TStreamingCommandKeys<TCommands extends TCommandMap> = {
|
||||
[K in keyof TCommands]: TCommands[K] extends { chunk: any } ? K : never;
|
||||
}[keyof TCommands];
|
||||
|
||||
/**
|
||||
* Extract the chunk type from a command definition that has a `chunk` field.
|
||||
*/
|
||||
export type TExtractChunk<TDef> = TDef extends { chunk: infer C } ? C : never;
|
||||
|
||||
@@ -2,10 +2,10 @@
|
||||
import * as path from 'path';
|
||||
import * as fs from 'fs';
|
||||
import * as childProcess from 'child_process';
|
||||
import * as readline from 'readline';
|
||||
import * as events from 'events';
|
||||
import * as url from 'url';
|
||||
|
||||
export { path, fs, childProcess, readline, events };
|
||||
export { path, fs, childProcess, events, url };
|
||||
|
||||
// @push.rocks scope
|
||||
import * as smartpath from '@push.rocks/smartpath';
|
||||
|
||||
Reference in New Issue
Block a user