Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1ec6caca3b | |||
| a29201b1c5 | |||
| d762c26565 | |||
| deda8cc4ee | |||
| 0c39e157c2 | |||
| b7e3e30ce5 | |||
| 35971a395f | |||
| 5fb991ff51 | |||
| dcb88ef4b5 | |||
| 6e4947ef7d |
49
changelog.md
49
changelog.md
@@ -1,5 +1,54 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-02-26 - 1.3.1 - fix(readme)
|
||||||
|
document socket transport and clarify stdio/socket differences in README
|
||||||
|
|
||||||
|
- Add 'Two Transport Modes' section documenting stdio (spawn) and socket (connect) modes
|
||||||
|
- Add examples for connect(), socket usage, and auto-reconnect with exponential backoff
|
||||||
|
- Clarify protocol is transport-agnostic and update ready/stream/event descriptions
|
||||||
|
- Update event docs: mark stderr as stdio-only and add 'reconnected' event for socket transports
|
||||||
|
- Clarify kill() behavior for both stdio and socket transports
|
||||||
|
- Add API reference entries for SocketTransport, StdioTransport, ISocketConnectOptions, IRustTransport, and LineScanner
|
||||||
|
- Add platform notes, architecture diagram, and minimal Rust/socket usage guidance
|
||||||
|
|
||||||
|
## 2026-02-26 - 1.3.0 - feat(transport)
|
||||||
|
introduce transport abstraction and socket-mode support for RustBridge
|
||||||
|
|
||||||
|
- Add IRustTransport interface and two transport implementations: StdioTransport (spawns child process and uses stdin/stdout) and SocketTransport (connects to Unix socket / Windows named pipe).
|
||||||
|
- Refactor RustBridge to use a transport abstraction (connectWithTransport) and add connect(socketPath) to attach to an existing daemon via socket.
|
||||||
|
- Introduce LineScanner: a buffer-based newline scanner used by both transports to handle large/newline-delimited messages and avoid OOMs.
|
||||||
|
- Add socket connection options (autoReconnect, reconnectBaseDelayMs, reconnectMaxDelayMs, maxReconnectAttempts) and implement auto-reconnect/backoff behavior in SocketTransport.
|
||||||
|
- Implement backpressure-aware write semantics and proper disconnect/cleanup for both transports; RustBridge.kill() now disconnects the transport instead of directly managing processes.
|
||||||
|
- Add tests and tooling: socket transport tests, line scanner tests, and a mock-socket-server.mjs helper script for testing socket mode.
|
||||||
|
- Export new symbols (StdioTransport, SocketTransport, LineScanner) and update plugins to expose net; update interfaces to export transport types.
|
||||||
|
|
||||||
|
## 2026-02-12 - 1.2.1 - fix(rust-binary-locator)
|
||||||
|
auto-fix missing execute permission for located Rust binaries
|
||||||
|
|
||||||
|
- If a located binary exists but lacks the execute bit, attempt to chmod it to 0o755 and treat it as executable.
|
||||||
|
- Logs an info message when the auto-fix is applied: 'Auto-fixed missing execute permission on: <filePath>'.
|
||||||
|
- Addresses cases where npm/pnpm installs remove the execute permission from bundled binaries.
|
||||||
|
|
||||||
|
## 2026-02-11 - 1.2.0 - feat(rustbridge)
|
||||||
|
add streaming responses and robust large-payload/backpressure handling to RustBridge
|
||||||
|
|
||||||
|
- Introduce StreamingResponse type and export it (for-await-of iterator + .result promise)
|
||||||
|
- Add sendCommandStreaming API to send streaming commands and receive chunks + final result
|
||||||
|
- Implement buffer-based stdout newline scanner to handle large messages and avoid readline limits
|
||||||
|
- Add backpressure-aware writeToStdin to wait for drain when writing large outbound payloads
|
||||||
|
- Add maxPayloadSize option and enforce outbound/inbound size checks to prevent OOMs
|
||||||
|
- Add streamTimeoutMs (inactivity timeout) and reset timeout on each received chunk
|
||||||
|
- Improve stderr handling (cross-chunk buffering and trimmed emits)
|
||||||
|
- Update mock test binary and extensive tests for streaming, large payloads, concurrency, and error cases
|
||||||
|
- Add TypeScript types for streaming commands (TStreamingCommandKeys, TExtractChunk, IManagementStreamChunk)
|
||||||
|
|
||||||
|
## 2026-02-10 - 1.1.2 - fix(rust-binary-locator)
|
||||||
|
use import.meta.resolve and url.fileURLToPath to locate bundled Rust binary in ESM environments
|
||||||
|
|
||||||
|
- Replace require.resolve with import.meta.resolve to support ESM module resolution
|
||||||
|
- Convert resolved file URL to a filesystem path using url.fileURLToPath
|
||||||
|
- Export the url module from ts/plugins to provide fileURLToPath
|
||||||
|
|
||||||
## 2026-02-10 - 1.1.1 - fix(readme)
|
## 2026-02-10 - 1.1.1 - fix(readme)
|
||||||
update README with comprehensive documentation, usage examples, API reference, installation instructions, and legal/company information
|
update README with comprehensive documentation, usage examples, API reference, installation instructions, and legal/company information
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartrust",
|
"name": "@push.rocks/smartrust",
|
||||||
"version": "1.1.1",
|
"version": "1.3.1",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "a bridge between JS engines and rust",
|
"description": "a bridge between JS engines and rust",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
368
readme.md
368
readme.md
@@ -1,6 +1,6 @@
|
|||||||
# @push.rocks/smartrust
|
# @push.rocks/smartrust
|
||||||
|
|
||||||
A type-safe, standardized bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC.
|
A type-safe, production-ready bridge between TypeScript and Rust binaries — with support for **stdio** (child process) and **socket** (Unix socket / Windows named pipe) transports, request/response, streaming, and event patterns.
|
||||||
|
|
||||||
## Issue Reporting and Security
|
## Issue Reporting and Security
|
||||||
|
|
||||||
@@ -16,40 +16,51 @@ pnpm install @push.rocks/smartrust
|
|||||||
|
|
||||||
## Overview 🔭
|
## Overview 🔭
|
||||||
|
|
||||||
`@push.rocks/smartrust` provides a production-ready bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation with timeouts, event streaming, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
|
`@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning **or socket connection**, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
|
||||||
|
|
||||||
### Why?
|
### Two Transport Modes 🔌
|
||||||
|
|
||||||
|
| Mode | Method | Use Case |
|
||||||
|
|------|--------|----------|
|
||||||
|
| **Stdio** | `bridge.spawn()` | Spawn the Rust binary as a child process. Communicate via stdin/stdout. |
|
||||||
|
| **Socket** | `bridge.connect(path)` | Connect to an **already-running** Rust daemon via Unix socket or Windows named pipe. |
|
||||||
|
|
||||||
|
The JSON protocol is identical in both modes — only the transport layer changes. Socket mode enables use cases where the Rust binary runs as a **privileged system service** (e.g., a VPN daemon needing root for TUN devices, a network proxy binding to privileged ports) while the TypeScript app connects to it unprivileged.
|
||||||
|
|
||||||
|
### Why? 🤔
|
||||||
|
|
||||||
If you're integrating Rust into a Node.js project, you'll inevitably need:
|
If you're integrating Rust into a Node.js project, you'll inevitably need:
|
||||||
- A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages)
|
- A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages)
|
||||||
- A way to **spawn** it and establish reliable two-way communication
|
- A way to **spawn it** or **connect to it** and establish reliable two-way communication
|
||||||
- **Type-safe** request/response patterns with proper error handling
|
- **Type-safe** request/response patterns with proper error handling
|
||||||
|
- **Streaming responses** for progressive data processing, log tailing, or chunked transfers
|
||||||
- **Event streaming** from Rust to TypeScript
|
- **Event streaming** from Rust to TypeScript
|
||||||
- **Graceful lifecycle management** (ready detection, clean shutdown, force kill)
|
- **Graceful lifecycle management** (ready detection, clean shutdown, auto-reconnection)
|
||||||
|
|
||||||
`smartrust` wraps all of this into two classes: `RustBridge` and `RustBinaryLocator`.
|
`smartrust` wraps all of this into a clean API: `RustBridge`, `RustBinaryLocator`, `StreamingResponse`, and pluggable transports.
|
||||||
|
|
||||||
## Usage 🚀
|
## Usage 🚀
|
||||||
|
|
||||||
### The IPC Protocol
|
### The IPC Protocol
|
||||||
|
|
||||||
`smartrust` uses a simple, newline-delimited JSON protocol over stdin/stdout:
|
`smartrust` uses a simple, newline-delimited JSON protocol:
|
||||||
|
|
||||||
| Direction | Format | Description |
|
| Direction | Format | Description |
|
||||||
|-----------|--------|-------------|
|
|-----------|--------|-------------|
|
||||||
| **TS → Rust** (Request) | `{"id": "req_1", "method": "start", "params": {...}}` | Command with unique ID |
|
| **TS → Rust** (Request) | `{"id": "req_1", "method": "start", "params": {...}}` | Command with unique ID |
|
||||||
| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Response correlated by ID |
|
| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Final response correlated by ID |
|
||||||
| **Rust → TS** (Error) | `{"id": "req_1", "success": false, "error": "msg"}` | Error correlated by ID |
|
| **Rust → TS** (Error) | `{"id": "req_1", "success": false, "error": "msg"}` | Error correlated by ID |
|
||||||
|
| **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) |
|
||||||
| **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) |
|
| **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) |
|
||||||
|
|
||||||
Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. That's it. Stderr is free for logging.
|
This protocol works identically over stdio and socket transports. Your Rust binary reads JSON lines from one end and writes JSON lines to the other. That's it.
|
||||||
|
|
||||||
### Defining Your Commands
|
### Defining Your Commands
|
||||||
|
|
||||||
Start by defining a type map of commands your Rust binary supports:
|
Start by defining a type map of commands your Rust binary supports:
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import { RustBridge, type ICommandDefinition } from '@push.rocks/smartrust';
|
import { RustBridge } from '@push.rocks/smartrust';
|
||||||
|
|
||||||
// Define your command types
|
// Define your command types
|
||||||
type TMyCommands = {
|
type TMyCommands = {
|
||||||
@@ -60,7 +71,9 @@ type TMyCommands = {
|
|||||||
};
|
};
|
||||||
```
|
```
|
||||||
|
|
||||||
### Creating and Using the Bridge
|
### Stdio Mode — Spawn a Child Process
|
||||||
|
|
||||||
|
This is the classic mode. The bridge spawns the Rust binary and communicates via stdin/stdout:
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
const bridge = new RustBridge<TMyCommands>({
|
const bridge = new RustBridge<TMyCommands>({
|
||||||
@@ -88,11 +101,147 @@ bridge.on('management:configChanged', (data) => {
|
|||||||
console.log('Config was changed:', data);
|
console.log('Config was changed:', data);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Clean shutdown
|
// Clean shutdown (SIGTERM → SIGKILL after 5s)
|
||||||
bridge.kill();
|
bridge.kill();
|
||||||
```
|
```
|
||||||
|
|
||||||
### Binary Locator
|
### Socket Mode — Connect to a Running Daemon 🔗
|
||||||
|
|
||||||
|
When the Rust binary runs as a system service (e.g., via `systemd`, `launchd`, or a Windows Service), use `connect()` to talk to it over a Unix socket or named pipe:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const bridge = new RustBridge<TMyCommands>({
|
||||||
|
binaryName: 'my-daemon', // used for logging / error messages
|
||||||
|
});
|
||||||
|
|
||||||
|
// Connect to the daemon's management socket
|
||||||
|
const ok = await bridge.connect('/var/run/my-daemon.sock');
|
||||||
|
if (!ok) {
|
||||||
|
console.error('Failed to connect to daemon');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Same API as stdio mode — completely transparent!
|
||||||
|
const { pid } = await bridge.sendCommand('start', { port: 8080, host: '0.0.0.0' });
|
||||||
|
const metrics = await bridge.sendCommand('getMetrics', {});
|
||||||
|
|
||||||
|
// kill() closes the socket — it does NOT kill the daemon
|
||||||
|
bridge.kill();
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Auto-Reconnect
|
||||||
|
|
||||||
|
For long-running applications, enable automatic reconnection with exponential backoff:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const ok = await bridge.connect('/var/run/my-daemon.sock', {
|
||||||
|
autoReconnect: true, // reconnect on unexpected disconnect
|
||||||
|
reconnectBaseDelayMs: 100, // initial retry delay (doubles each attempt)
|
||||||
|
reconnectMaxDelayMs: 30000, // max retry delay cap
|
||||||
|
maxReconnectAttempts: 10, // give up after 10 attempts
|
||||||
|
});
|
||||||
|
|
||||||
|
// Listen for reconnection events
|
||||||
|
bridge.on('reconnected', () => {
|
||||||
|
console.log('Reconnected to daemon!');
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Platform Notes
|
||||||
|
|
||||||
|
| Platform | Socket Path Format | Example |
|
||||||
|
|----------|-------------------|---------|
|
||||||
|
| **Linux** | `/var/run/<name>.sock` or `$XDG_RUNTIME_DIR/<name>.sock` | `/var/run/my-daemon.sock` |
|
||||||
|
| **macOS** | `/var/run/<name>.sock` | `/var/run/my-daemon.sock` |
|
||||||
|
| **Windows** | `\\.\pipe\<name>` | `\\.\pipe\my-daemon` |
|
||||||
|
|
||||||
|
Node.js `net.connect()` handles all formats transparently — no platform-specific code needed.
|
||||||
|
|
||||||
|
### Streaming Commands 🌊
|
||||||
|
|
||||||
|
For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output.
|
||||||
|
|
||||||
|
#### Defining Streaming Commands
|
||||||
|
|
||||||
|
Add a `chunk` field to your command type definition to mark it as streamable:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
type TMyCommands = {
|
||||||
|
// Regular command (request → response)
|
||||||
|
ping: { params: {}; result: { pong: boolean } };
|
||||||
|
|
||||||
|
// Streaming command (request → chunks... → final result)
|
||||||
|
processData: { params: { count: number }; chunk: { index: number; progress: number }; result: { totalProcessed: number } };
|
||||||
|
tailLogs: { params: { lines: number }; chunk: string; result: { linesRead: number } };
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Consuming Streams
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Returns a StreamingResponse immediately (does NOT block)
|
||||||
|
const stream = bridge.sendCommandStreaming('processData', { count: 1000 });
|
||||||
|
|
||||||
|
// Consume chunks with for-await-of
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
console.log(`Processing item ${chunk.index}, progress: ${chunk.progress}%`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the final result after all chunks are consumed
|
||||||
|
const result = await stream.result;
|
||||||
|
console.log(`Done! Processed ${result.totalProcessed} items`);
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Error Handling in Streams
|
||||||
|
|
||||||
|
Errors propagate to both the iterator and the `.result` promise:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const stream = bridge.sendCommandStreaming('processData', { count: 100 });
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
console.log(chunk);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Stream failed:', err.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// .result also rejects on error
|
||||||
|
try {
|
||||||
|
await stream.result;
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Same error here:', err.message);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Stream Timeout
|
||||||
|
|
||||||
|
By default, streaming commands use the same timeout as regular commands (`requestTimeoutMs`). The timeout **resets on each chunk received**, so it acts as an inactivity timeout rather than an absolute timeout. You can configure it separately:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const bridge = new RustBridge<TMyCommands>({
|
||||||
|
binaryName: 'my-server',
|
||||||
|
requestTimeoutMs: 30000, // regular command timeout: 30s
|
||||||
|
streamTimeoutMs: 60000, // streaming inactivity timeout: 60s
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Implementing Streaming on the Rust Side
|
||||||
|
|
||||||
|
Your Rust binary sends stream chunks by writing lines with `"stream": true` before the final response:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// For each chunk:
|
||||||
|
println!(r#"{{"id":"{}","stream":true,"data":{{"index":{},"progress":{}}}}}"#, req.id, i, pct);
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
|
||||||
|
// When done, send the final response (same as non-streaming):
|
||||||
|
println!(r#"{{"id":"{}","success":true,"result":{{"totalProcessed":{}}}}}"#, req.id, total);
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
```
|
||||||
|
|
||||||
|
### Binary Locator 🔍
|
||||||
|
|
||||||
The `RustBinaryLocator` searches for your binary using a priority-ordered strategy:
|
The `RustBinaryLocator` searches for your binary using a priority-ordered strategy:
|
||||||
|
|
||||||
@@ -119,7 +268,7 @@ const binaryPath = await locator.findBinary();
|
|||||||
// Result is cached — call clearCache() to force re-search
|
// Result is cached — call clearCache() to force re-search
|
||||||
```
|
```
|
||||||
|
|
||||||
### Configuration Reference
|
### Configuration Reference ⚙️
|
||||||
|
|
||||||
The `RustBridge` constructor accepts an `IRustBridgeOptions` object:
|
The `RustBridge` constructor accepts an `IRustBridgeOptions` object:
|
||||||
|
|
||||||
@@ -136,25 +285,39 @@ const bridge = new RustBridge<TMyCommands>({
|
|||||||
// --- Bridge Options ---
|
// --- Bridge Options ---
|
||||||
cliArgs: ['--management'], // optional: args passed to binary (default: ['--management'])
|
cliArgs: ['--management'], // optional: args passed to binary (default: ['--management'])
|
||||||
requestTimeoutMs: 30000, // optional: per-request timeout (default: 30000)
|
requestTimeoutMs: 30000, // optional: per-request timeout (default: 30000)
|
||||||
|
streamTimeoutMs: 30000, // optional: streaming inactivity timeout (default: requestTimeoutMs)
|
||||||
readyTimeoutMs: 10000, // optional: ready event timeout (default: 10000)
|
readyTimeoutMs: 10000, // optional: ready event timeout (default: 10000)
|
||||||
|
maxPayloadSize: 50 * 1024 * 1024, // optional: max message size in bytes (default: 50MB)
|
||||||
env: { RUST_LOG: 'debug' }, // optional: extra env vars for the child process
|
env: { RUST_LOG: 'debug' }, // optional: extra env vars for the child process
|
||||||
readyEventName: 'ready', // optional: name of the ready event (default: 'ready')
|
readyEventName: 'ready', // optional: name of the ready event (default: 'ready')
|
||||||
logger: myLogger, // optional: logger implementing IRustBridgeLogger
|
logger: myLogger, // optional: logger implementing IRustBridgeLogger
|
||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
### Events
|
Socket connection options (passed to `bridge.connect()`):
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
interface ISocketConnectOptions {
|
||||||
|
autoReconnect?: boolean; // default: false
|
||||||
|
reconnectBaseDelayMs?: number; // default: 100
|
||||||
|
reconnectMaxDelayMs?: number; // default: 30000
|
||||||
|
maxReconnectAttempts?: number; // default: 10
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Events 📡
|
||||||
|
|
||||||
`RustBridge` extends `EventEmitter` and emits the following events:
|
`RustBridge` extends `EventEmitter` and emits the following events:
|
||||||
|
|
||||||
| Event | Payload | Description |
|
| Event | Payload | Description |
|
||||||
|-------|---------|-------------|
|
|-------|---------|-------------|
|
||||||
| `ready` | — | Bridge connected and binary reported ready |
|
| `ready` | — | Bridge connected and binary reported ready |
|
||||||
| `exit` | `(code, signal)` | Rust process exited |
|
| `exit` | `(code, signal)` | Transport closed (process exited or socket disconnected) |
|
||||||
| `stderr` | `string` | A line from the binary's stderr |
|
| `stderr` | `string` | A line from the binary's stderr (stdio mode only) |
|
||||||
|
| `reconnected` | — | Socket transport reconnected after unexpected disconnect |
|
||||||
| `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) |
|
| `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) |
|
||||||
|
|
||||||
### Custom Logger
|
### Custom Logger 📝
|
||||||
|
|
||||||
Plug in your own logger by implementing the `IRustBridgeLogger` interface:
|
Plug in your own logger by implementing the `IRustBridgeLogger` interface:
|
||||||
|
|
||||||
@@ -173,9 +336,11 @@ const bridge = new RustBridge<TMyCommands>({
|
|||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
### Writing the Rust Side
|
### Writing the Rust Side 🦀
|
||||||
|
|
||||||
Your Rust binary needs to implement a simple protocol:
|
Your Rust binary needs to implement a simple protocol. The transport (stdio or socket) doesn't change the message format — only how connections are established.
|
||||||
|
|
||||||
|
#### Stdio Mode (Child Process)
|
||||||
|
|
||||||
1. **On startup**, write a ready event to stdout:
|
1. **On startup**, write a ready event to stdout:
|
||||||
```
|
```
|
||||||
@@ -186,11 +351,20 @@ Your Rust binary needs to implement a simple protocol:
|
|||||||
|
|
||||||
3. **Write JSON responses to stdout**, each as `{"id": "...", "success": true, "result": {...}}\n`
|
3. **Write JSON responses to stdout**, each as `{"id": "...", "success": true, "result": {...}}\n`
|
||||||
|
|
||||||
4. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout
|
4. **For streaming commands**, write zero or more `{"id": "...", "stream": true, "data": {...}}\n` chunks before the final response
|
||||||
|
|
||||||
5. **Use stderr** for logging — it won't interfere with the IPC protocol
|
5. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout
|
||||||
|
|
||||||
Here's a minimal Rust skeleton:
|
6. **Use stderr** for logging — it won't interfere with the IPC protocol
|
||||||
|
|
||||||
|
#### Socket Mode (Daemon)
|
||||||
|
|
||||||
|
1. **Listen** on a Unix socket (e.g., `/var/run/my-daemon.sock`) or Windows named pipe
|
||||||
|
2. **On each new client connection**, send the `{"event":"ready","data":{...}}\n` event
|
||||||
|
3. Read/write JSON lines on the socket (same protocol as stdio)
|
||||||
|
4. Support multiple concurrent clients — each connection is independent
|
||||||
|
|
||||||
|
Here's a minimal Rust skeleton (stdio mode):
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -213,6 +387,13 @@ struct Response {
|
|||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct StreamChunk {
|
||||||
|
id: String,
|
||||||
|
stream: bool,
|
||||||
|
data: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
// Signal ready
|
// Signal ready
|
||||||
println!(r#"{{"event":"ready","data":{{"version":"1.0.0"}}}}"#);
|
println!(r#"{{"event":"ready","data":{{"version":"1.0.0"}}}}"#);
|
||||||
@@ -223,28 +404,81 @@ fn main() {
|
|||||||
let line = line.unwrap();
|
let line = line.unwrap();
|
||||||
let req: Request = serde_json::from_str(&line).unwrap();
|
let req: Request = serde_json::from_str(&line).unwrap();
|
||||||
|
|
||||||
let response = match req.method.as_str() {
|
match req.method.as_str() {
|
||||||
"ping" => Response {
|
"ping" => {
|
||||||
id: req.id,
|
let resp = Response {
|
||||||
success: true,
|
id: req.id,
|
||||||
result: Some(serde_json::json!({"pong": true})),
|
success: true,
|
||||||
error: None,
|
result: Some(serde_json::json!({"pong": true})),
|
||||||
},
|
error: None,
|
||||||
_ => Response {
|
};
|
||||||
id: req.id,
|
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||||
success: false,
|
io::stdout().flush().unwrap();
|
||||||
result: None,
|
}
|
||||||
error: Some(format!("Unknown method: {}", req.method)),
|
"processData" => {
|
||||||
},
|
let count = req.params["count"].as_u64().unwrap_or(0);
|
||||||
};
|
// Send stream chunks
|
||||||
|
for i in 0..count {
|
||||||
let json = serde_json::to_string(&response).unwrap();
|
let chunk = StreamChunk {
|
||||||
println!("{json}");
|
id: req.id.clone(),
|
||||||
io::stdout().flush().unwrap();
|
stream: true,
|
||||||
|
data: serde_json::json!({"index": i, "progress": ((i+1) * 100 / count)}),
|
||||||
|
};
|
||||||
|
println!("{}", serde_json::to_string(&chunk).unwrap());
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
}
|
||||||
|
// Send final response
|
||||||
|
let resp = Response {
|
||||||
|
id: req.id,
|
||||||
|
success: true,
|
||||||
|
result: Some(serde_json::json!({"totalProcessed": count})),
|
||||||
|
error: None,
|
||||||
|
};
|
||||||
|
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let resp = Response {
|
||||||
|
id: req.id,
|
||||||
|
success: false,
|
||||||
|
result: None,
|
||||||
|
error: Some(format!("Unknown method: {}", req.method)),
|
||||||
|
};
|
||||||
|
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Architecture 🏗️
|
||||||
|
|
||||||
|
```
|
||||||
|
┌──────────────────────────────────────────────────────┐
|
||||||
|
│ RustBridge<T> │
|
||||||
|
│ (protocol layer: handleLine, sendCommand, events) │
|
||||||
|
├──────────────┬───────────────────────────────────────┤
|
||||||
|
│ StdioTransport │ SocketTransport │
|
||||||
|
│ spawn() + │ net.connect() + auto-reconnect │
|
||||||
|
│ stdin/stdout │ Unix socket / named pipe │
|
||||||
|
├──────────────┴───────────────────────────────────────┤
|
||||||
|
│ IRustTransport interface │
|
||||||
|
│ connect() / write() / disconnect() │
|
||||||
|
├──────────────────────────────────────────────────────┤
|
||||||
|
│ LineScanner (shared newline scanner) │
|
||||||
|
├──────────────────────────────────────────────────────┤
|
||||||
|
│ RustBinaryLocator │
|
||||||
|
│ (binary search — stdio mode only) │
|
||||||
|
└──────────────────────────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
- **`RustBridge`** — The main class. Protocol-level logic (JSON parsing, request correlation, streaming, events) is transport-agnostic.
|
||||||
|
- **`StdioTransport`** — Spawns a child process, manages stdin/stdout/stderr, handles SIGTERM/SIGKILL.
|
||||||
|
- **`SocketTransport`** — Connects to an existing Unix socket or named pipe, with optional auto-reconnect and exponential backoff.
|
||||||
|
- **`LineScanner`** — Shared buffer-based newline scanner used by both transports for efficient message framing.
|
||||||
|
- **`RustBinaryLocator`** — Priority-ordered binary search (used by stdio mode only).
|
||||||
|
|
||||||
## API Reference 📖
|
## API Reference 📖
|
||||||
|
|
||||||
### `RustBridge<TCommands>`
|
### `RustBridge<TCommands>`
|
||||||
@@ -252,10 +486,19 @@ fn main() {
|
|||||||
| Method / Property | Signature | Description |
|
| Method / Property | Signature | Description |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
| `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance |
|
| `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance |
|
||||||
| `spawn()` | `Promise<boolean>` | Spawn the binary and wait for ready; returns `false` on failure |
|
| `spawn()` | `Promise<boolean>` | **Stdio mode**: Spawn the binary and wait for ready; returns `false` on failure |
|
||||||
|
| `connect(socketPath, options?)` | `Promise<boolean>` | **Socket mode**: Connect to a running daemon; returns `false` on failure |
|
||||||
| `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response |
|
| `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response |
|
||||||
| `kill()` | `void` | SIGTERM the process, reject pending requests, force SIGKILL after 5s |
|
| `sendCommandStreaming(method, params)` | `StreamingResponse<TChunk, TResult>` | Send a streaming command; returns immediately |
|
||||||
| `running` | `boolean` | Whether the bridge is currently connected |
|
| `kill()` | `void` | Stdio: SIGTERM the process, SIGKILL after 5s. Socket: close the connection (daemon stays alive) |
|
||||||
|
| `running` | `boolean` | Whether the bridge is currently connected and ready |
|
||||||
|
|
||||||
|
### `StreamingResponse<TChunk, TResult>`
|
||||||
|
|
||||||
|
| Method / Property | Type | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `[Symbol.asyncIterator]()` | `AsyncIterator<TChunk>` | Enables `for await...of` consumption of chunks |
|
||||||
|
| `result` | `Promise<TResult>` | Resolves with the final result after stream ends |
|
||||||
|
|
||||||
### `RustBinaryLocator`
|
### `RustBinaryLocator`
|
||||||
|
|
||||||
@@ -265,18 +508,51 @@ fn main() {
|
|||||||
| `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached |
|
| `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached |
|
||||||
| `clearCache()` | `void` | Clear the cached path to force a fresh search |
|
| `clearCache()` | `void` | Clear the cached path to force a fresh search |
|
||||||
|
|
||||||
### Exported Interfaces
|
### `StdioTransport`
|
||||||
|
|
||||||
| Interface | Description |
|
| Method / Property | Signature | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `constructor` | `new StdioTransport(options: IStdioTransportOptions)` | Create a stdio transport |
|
||||||
|
| `connect()` | `Promise<void>` | Spawn the child process |
|
||||||
|
| `write(data)` | `Promise<void>` | Write to stdin with backpressure handling |
|
||||||
|
| `disconnect()` | `void` | Kill the process (SIGTERM → SIGKILL after 5s) |
|
||||||
|
| `connected` | `boolean` | Whether the process is running |
|
||||||
|
|
||||||
|
### `SocketTransport`
|
||||||
|
|
||||||
|
| Method / Property | Signature | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `constructor` | `new SocketTransport(options: ISocketTransportOptions)` | Create a socket transport |
|
||||||
|
| `connect()` | `Promise<void>` | Connect to the Unix socket / named pipe |
|
||||||
|
| `write(data)` | `Promise<void>` | Write to socket with backpressure handling |
|
||||||
|
| `disconnect()` | `void` | Close the socket (does not kill the daemon) |
|
||||||
|
| `connected` | `boolean` | Whether the socket is connected |
|
||||||
|
|
||||||
|
### `LineScanner`
|
||||||
|
|
||||||
|
| Method / Property | Signature | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `constructor` | `new LineScanner(maxPayloadSize, logger)` | Create a line scanner |
|
||||||
|
| `push(chunk, onLine)` | `void` | Feed a `Buffer` chunk; calls `onLine` for each complete line |
|
||||||
|
| `clear()` | `void` | Reset the internal buffer |
|
||||||
|
|
||||||
|
### Exported Interfaces & Types
|
||||||
|
|
||||||
|
| Interface / Type | Description |
|
||||||
|---|---|
|
|---|---|
|
||||||
| `IRustBridgeOptions` | Full configuration for `RustBridge` |
|
| `IRustBridgeOptions` | Full configuration for `RustBridge` |
|
||||||
| `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` |
|
| `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` |
|
||||||
|
| `ISocketConnectOptions` | Socket connection options (reconnect settings) |
|
||||||
| `IRustBridgeLogger` | Logger interface: `{ log(level, message, data?) }` |
|
| `IRustBridgeLogger` | Logger interface: `{ log(level, message, data?) }` |
|
||||||
|
| `IRustTransport` | Transport interface (extends `EventEmitter`) |
|
||||||
| `IManagementRequest` | IPC request shape: `{ id, method, params }` |
|
| `IManagementRequest` | IPC request shape: `{ id, method, params }` |
|
||||||
| `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` |
|
| `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` |
|
||||||
| `IManagementEvent` | IPC event shape: `{ event, data }` |
|
| `IManagementEvent` | IPC event shape: `{ event, data }` |
|
||||||
|
| `IManagementStreamChunk` | IPC stream chunk shape: `{ id, stream: true, data }` |
|
||||||
| `ICommandDefinition` | Single command definition: `{ params, result }` |
|
| `ICommandDefinition` | Single command definition: `{ params, result }` |
|
||||||
| `TCommandMap` | `Record<string, ICommandDefinition>` |
|
| `TCommandMap` | `Record<string, ICommandDefinition>` |
|
||||||
|
| `TStreamingCommandKeys<T>` | Extracts keys from a command map that have a `chunk` field |
|
||||||
|
| `TExtractChunk<T>` | Extracts the chunk type from a streaming command definition |
|
||||||
|
|
||||||
## License and Legal Information
|
## License and Legal Information
|
||||||
|
|
||||||
|
|||||||
@@ -2,22 +2,46 @@
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Mock "Rust binary" for testing the RustBridge IPC protocol.
|
* Mock "Rust binary" for testing the RustBridge IPC protocol.
|
||||||
* Reads JSON lines from stdin, writes JSON lines to stdout.
|
* Reads JSON lines from stdin via Buffer-based scanner, writes JSON lines to stdout.
|
||||||
* Emits a ready event on startup.
|
* Emits a ready event on startup.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { createInterface } from 'readline';
|
|
||||||
|
|
||||||
// Emit ready event
|
// Emit ready event
|
||||||
const readyEvent = JSON.stringify({ event: 'ready', data: { version: '1.0.0' } });
|
const readyEvent = JSON.stringify({ event: 'ready', data: { version: '1.0.0' } });
|
||||||
process.stdout.write(readyEvent + '\n');
|
process.stdout.write(readyEvent + '\n');
|
||||||
|
|
||||||
const rl = createInterface({ input: process.stdin });
|
// Buffer-based newline scanner for stdin (mirrors the RustBridge approach)
|
||||||
|
let stdinBuffer = Buffer.alloc(0);
|
||||||
|
|
||||||
rl.on('line', (line) => {
|
process.stdin.on('data', (chunk) => {
|
||||||
|
stdinBuffer = Buffer.concat([stdinBuffer, chunk]);
|
||||||
|
|
||||||
|
let newlineIndex;
|
||||||
|
while ((newlineIndex = stdinBuffer.indexOf(0x0A)) !== -1) {
|
||||||
|
const lineBuffer = stdinBuffer.subarray(0, newlineIndex);
|
||||||
|
stdinBuffer = stdinBuffer.subarray(newlineIndex + 1);
|
||||||
|
const line = lineBuffer.toString('utf8').trim();
|
||||||
|
if (line) {
|
||||||
|
handleLine(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Backpressure-aware write to stdout.
|
||||||
|
*/
|
||||||
|
function writeResponse(data) {
|
||||||
|
const json = JSON.stringify(data) + '\n';
|
||||||
|
if (!process.stdout.write(json)) {
|
||||||
|
// Wait for drain before continuing
|
||||||
|
process.stdout.once('drain', () => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleLine(line) {
|
||||||
let request;
|
let request;
|
||||||
try {
|
try {
|
||||||
request = JSON.parse(line.trim());
|
request = JSON.parse(line);
|
||||||
} catch {
|
} catch {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -26,35 +50,53 @@ rl.on('line', (line) => {
|
|||||||
|
|
||||||
if (method === 'echo') {
|
if (method === 'echo') {
|
||||||
// Echo back the params as result
|
// Echo back the params as result
|
||||||
const response = JSON.stringify({ id, success: true, result: params });
|
writeResponse({ id, success: true, result: params });
|
||||||
process.stdout.write(response + '\n');
|
} else if (method === 'largeEcho') {
|
||||||
|
// Echo back params (same as echo, named distinctly for large payload tests)
|
||||||
|
writeResponse({ id, success: true, result: params });
|
||||||
} else if (method === 'error') {
|
} else if (method === 'error') {
|
||||||
// Return an error
|
// Return an error
|
||||||
const response = JSON.stringify({ id, success: false, error: 'Test error message' });
|
writeResponse({ id, success: false, error: 'Test error message' });
|
||||||
process.stdout.write(response + '\n');
|
|
||||||
} else if (method === 'emitEvent') {
|
} else if (method === 'emitEvent') {
|
||||||
// Emit a custom event, then respond with success
|
// Emit a custom event, then respond with success
|
||||||
const event = JSON.stringify({ event: params.eventName, data: params.eventData });
|
writeResponse({ event: params.eventName, data: params.eventData });
|
||||||
process.stdout.write(event + '\n');
|
writeResponse({ id, success: true, result: null });
|
||||||
const response = JSON.stringify({ id, success: true, result: null });
|
|
||||||
process.stdout.write(response + '\n');
|
|
||||||
} else if (method === 'slow') {
|
} else if (method === 'slow') {
|
||||||
// Respond after a delay
|
// Respond after a delay
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
const response = JSON.stringify({ id, success: true, result: { delayed: true } });
|
writeResponse({ id, success: true, result: { delayed: true } });
|
||||||
process.stdout.write(response + '\n');
|
|
||||||
}, 100);
|
}, 100);
|
||||||
|
} else if (method === 'streamEcho') {
|
||||||
|
// Send params.count stream chunks, then final response
|
||||||
|
const count = params.count || 0;
|
||||||
|
let sent = 0;
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
if (sent < count) {
|
||||||
|
writeResponse({ id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
|
||||||
|
sent++;
|
||||||
|
} else {
|
||||||
|
clearInterval(interval);
|
||||||
|
writeResponse({ id, success: true, result: { totalChunks: count } });
|
||||||
|
}
|
||||||
|
}, 10);
|
||||||
|
} else if (method === 'streamError') {
|
||||||
|
// Send 1 chunk, then error
|
||||||
|
writeResponse({ id, stream: true, data: { index: 0, value: 'before_error' } });
|
||||||
|
setTimeout(() => {
|
||||||
|
writeResponse({ id, success: false, error: 'Stream error after chunk' });
|
||||||
|
}, 20);
|
||||||
|
} else if (method === 'streamEmpty') {
|
||||||
|
// Zero chunks, immediate final response
|
||||||
|
writeResponse({ id, success: true, result: { totalChunks: 0 } });
|
||||||
} else if (method === 'exit') {
|
} else if (method === 'exit') {
|
||||||
// Graceful exit
|
// Graceful exit
|
||||||
const response = JSON.stringify({ id, success: true, result: null });
|
writeResponse({ id, success: true, result: null });
|
||||||
process.stdout.write(response + '\n');
|
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
} else {
|
} else {
|
||||||
// Unknown command
|
// Unknown command
|
||||||
const response = JSON.stringify({ id, success: false, error: `Unknown method: ${method}` });
|
writeResponse({ id, success: false, error: `Unknown method: ${method}` });
|
||||||
process.stdout.write(response + '\n');
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
// Handle SIGTERM gracefully
|
// Handle SIGTERM gracefully
|
||||||
process.on('SIGTERM', () => {
|
process.on('SIGTERM', () => {
|
||||||
|
|||||||
124
test/helpers/mock-socket-server.mjs
Normal file
124
test/helpers/mock-socket-server.mjs
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock "Rust daemon" for testing the SocketTransport and RustBridge socket mode.
|
||||||
|
* Creates a Unix socket server, accepts connections, and speaks the same
|
||||||
|
* JSON-over-newline IPC protocol as mock-rust-binary.mjs.
|
||||||
|
*
|
||||||
|
* Usage: node mock-socket-server.mjs <socket-path>
|
||||||
|
* Signals readiness by writing a JSON line to stdout: {"socketPath":"...","ready":true}
|
||||||
|
*/
|
||||||
|
|
||||||
|
import * as net from 'net';
|
||||||
|
import * as fs from 'fs';
|
||||||
|
|
||||||
|
const socketPath = process.argv[2];
|
||||||
|
if (!socketPath) {
|
||||||
|
process.stderr.write('Usage: mock-socket-server.mjs <socket-path>\n');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove stale socket file
|
||||||
|
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Backpressure-aware write to a socket.
|
||||||
|
*/
|
||||||
|
function writeResponse(conn, data) {
|
||||||
|
const json = JSON.stringify(data) + '\n';
|
||||||
|
if (!conn.write(json)) {
|
||||||
|
conn.once('drain', () => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleLine(line, conn) {
|
||||||
|
let request;
|
||||||
|
try {
|
||||||
|
request = JSON.parse(line);
|
||||||
|
} catch {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { id, method, params } = request;
|
||||||
|
|
||||||
|
if (method === 'echo') {
|
||||||
|
writeResponse(conn, { id, success: true, result: params });
|
||||||
|
} else if (method === 'largeEcho') {
|
||||||
|
writeResponse(conn, { id, success: true, result: params });
|
||||||
|
} else if (method === 'error') {
|
||||||
|
writeResponse(conn, { id, success: false, error: 'Test error message' });
|
||||||
|
} else if (method === 'emitEvent') {
|
||||||
|
writeResponse(conn, { event: params.eventName, data: params.eventData });
|
||||||
|
writeResponse(conn, { id, success: true, result: null });
|
||||||
|
} else if (method === 'slow') {
|
||||||
|
setTimeout(() => {
|
||||||
|
writeResponse(conn, { id, success: true, result: { delayed: true } });
|
||||||
|
}, 100);
|
||||||
|
} else if (method === 'streamEcho') {
|
||||||
|
const count = params.count || 0;
|
||||||
|
let sent = 0;
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
if (sent < count) {
|
||||||
|
writeResponse(conn, { id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
|
||||||
|
sent++;
|
||||||
|
} else {
|
||||||
|
clearInterval(interval);
|
||||||
|
writeResponse(conn, { id, success: true, result: { totalChunks: count } });
|
||||||
|
}
|
||||||
|
}, 10);
|
||||||
|
} else if (method === 'streamError') {
|
||||||
|
writeResponse(conn, { id, stream: true, data: { index: 0, value: 'before_error' } });
|
||||||
|
setTimeout(() => {
|
||||||
|
writeResponse(conn, { id, success: false, error: 'Stream error after chunk' });
|
||||||
|
}, 20);
|
||||||
|
} else if (method === 'streamEmpty') {
|
||||||
|
writeResponse(conn, { id, success: true, result: { totalChunks: 0 } });
|
||||||
|
} else if (method === 'exit') {
|
||||||
|
writeResponse(conn, { id, success: true, result: null });
|
||||||
|
// In socket mode, 'exit' just closes this connection, not the server
|
||||||
|
setTimeout(() => conn.end(), 50);
|
||||||
|
} else {
|
||||||
|
writeResponse(conn, { id, success: false, error: `Unknown method: ${method}` });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const server = net.createServer((conn) => {
|
||||||
|
// Send ready event on each new connection
|
||||||
|
writeResponse(conn, { event: 'ready', data: { version: '1.0.0' } });
|
||||||
|
|
||||||
|
// Buffer-based newline scanner for incoming data
|
||||||
|
let buffer = Buffer.alloc(0);
|
||||||
|
conn.on('data', (chunk) => {
|
||||||
|
buffer = Buffer.concat([buffer, chunk]);
|
||||||
|
let idx;
|
||||||
|
while ((idx = buffer.indexOf(0x0A)) !== -1) {
|
||||||
|
const lineBuffer = buffer.subarray(0, idx);
|
||||||
|
buffer = buffer.subarray(idx + 1);
|
||||||
|
const line = lineBuffer.toString('utf8').trim();
|
||||||
|
if (line) {
|
||||||
|
handleLine(line, conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
conn.on('error', () => { /* ignore client errors */ });
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(socketPath, () => {
|
||||||
|
// Signal to parent that the server is ready
|
||||||
|
process.stdout.write(JSON.stringify({ socketPath, ready: true }) + '\n');
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle SIGTERM gracefully
|
||||||
|
process.on('SIGTERM', () => {
|
||||||
|
server.close();
|
||||||
|
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
|
||||||
|
process.exit(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle SIGINT
|
||||||
|
process.on('SIGINT', () => {
|
||||||
|
server.close();
|
||||||
|
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
|
||||||
|
process.exit(0);
|
||||||
|
});
|
||||||
89
test/test.linescanner.node.ts
Normal file
89
test/test.linescanner.node.ts
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||||
|
import { LineScanner } from '../ts/classes.linescanner.js';
|
||||||
|
|
||||||
|
const noopLogger = { log() {} };
|
||||||
|
|
||||||
|
tap.test('should parse a single complete line', async () => {
|
||||||
|
const scanner = new LineScanner(1024 * 1024, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
scanner.push(Buffer.from('{"hello":"world"}\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(1);
|
||||||
|
expect(lines[0]).toEqual('{"hello":"world"}');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should parse multiple lines in one chunk', async () => {
|
||||||
|
const scanner = new LineScanner(1024 * 1024, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
scanner.push(Buffer.from('{"a":1}\n{"b":2}\n{"c":3}\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(3);
|
||||||
|
expect(lines[0]).toEqual('{"a":1}');
|
||||||
|
expect(lines[1]).toEqual('{"b":2}');
|
||||||
|
expect(lines[2]).toEqual('{"c":3}');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should handle a line split across chunks', async () => {
|
||||||
|
const scanner = new LineScanner(1024 * 1024, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
scanner.push(Buffer.from('{"hel'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(0);
|
||||||
|
scanner.push(Buffer.from('lo":"world"}\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(1);
|
||||||
|
expect(lines[0]).toEqual('{"hello":"world"}');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should drop oversized lines', async () => {
|
||||||
|
const scanner = new LineScanner(100, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
// Line is 200 chars + newline, exceeds maxPayloadSize of 100
|
||||||
|
const oversized = 'x'.repeat(200) + '\n';
|
||||||
|
scanner.push(Buffer.from(oversized), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should clear buffer on OOM (no newline, exceeds max)', async () => {
|
||||||
|
const scanner = new LineScanner(100, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
// Push 200 bytes without any newline — exceeds maxPayloadSize
|
||||||
|
scanner.push(Buffer.from('x'.repeat(200)), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(0);
|
||||||
|
// After clearing, should work normally again
|
||||||
|
scanner.push(Buffer.from('{"ok":true}\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(1);
|
||||||
|
expect(lines[0]).toEqual('{"ok":true}');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should skip empty lines', async () => {
|
||||||
|
const scanner = new LineScanner(1024 * 1024, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
scanner.push(Buffer.from('\n\n{"a":1}\n\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(1);
|
||||||
|
expect(lines[0]).toEqual('{"a":1}');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should handle mixed complete and partial lines', async () => {
|
||||||
|
const scanner = new LineScanner(1024 * 1024, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
// First chunk: one complete line + start of second line
|
||||||
|
scanner.push(Buffer.from('{"first":1}\n{"sec'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(1);
|
||||||
|
// Second chunk: end of second line + complete third line
|
||||||
|
scanner.push(Buffer.from('ond":2}\n{"third":3}\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(3);
|
||||||
|
expect(lines[1]).toEqual('{"second":2}');
|
||||||
|
expect(lines[2]).toEqual('{"third":3}');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('clear should reset the buffer', async () => {
|
||||||
|
const scanner = new LineScanner(1024 * 1024, noopLogger);
|
||||||
|
const lines: string[] = [];
|
||||||
|
// Push partial data
|
||||||
|
scanner.push(Buffer.from('{"partial":'), (line) => lines.push(line));
|
||||||
|
// Clear
|
||||||
|
scanner.clear();
|
||||||
|
// Now push a complete new line — old partial should not affect it
|
||||||
|
scanner.push(Buffer.from('{"fresh":true}\n'), (line) => lines.push(line));
|
||||||
|
expect(lines.length).toEqual(1);
|
||||||
|
expect(lines[0]).toEqual('{"fresh":true}');
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
@@ -9,10 +9,14 @@ const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
|
|||||||
// Define the command types for our mock binary
|
// Define the command types for our mock binary
|
||||||
type TMockCommands = {
|
type TMockCommands = {
|
||||||
echo: { params: Record<string, any>; result: Record<string, any> };
|
echo: { params: Record<string, any>; result: Record<string, any> };
|
||||||
|
largeEcho: { params: Record<string, any>; result: Record<string, any> };
|
||||||
error: { params: {}; result: never };
|
error: { params: {}; result: never };
|
||||||
emitEvent: { params: { eventName: string; eventData: any }; result: null };
|
emitEvent: { params: { eventName: string; eventData: any }; result: null };
|
||||||
slow: { params: {}; result: { delayed: boolean } };
|
slow: { params: {}; result: { delayed: boolean } };
|
||||||
exit: { params: {}; result: null };
|
exit: { params: {}; result: null };
|
||||||
|
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
|
||||||
|
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
|
||||||
|
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
|
||||||
};
|
};
|
||||||
|
|
||||||
tap.test('should spawn and receive ready event', async () => {
|
tap.test('should spawn and receive ready event', async () => {
|
||||||
@@ -188,4 +192,249 @@ tap.test('should emit exit event when process exits', async () => {
|
|||||||
expect(bridge.running).toBeFalse();
|
expect(bridge.running).toBeFalse();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tap.test('should handle 1MB payload round-trip', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 30000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
// Create a ~1MB payload
|
||||||
|
const largeString = 'x'.repeat(1024 * 1024);
|
||||||
|
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||||
|
expect(result.data).toEqual(largeString);
|
||||||
|
expect(result.data.length).toEqual(1024 * 1024);
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should handle 10MB payload round-trip', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 60000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
// Create a ~10MB payload
|
||||||
|
const largeString = 'y'.repeat(10 * 1024 * 1024);
|
||||||
|
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||||
|
expect(result.data).toEqual(largeString);
|
||||||
|
expect(result.data.length).toEqual(10 * 1024 * 1024);
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should reject outbound messages exceeding maxPayloadSize', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
maxPayloadSize: 1000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
let threw = false;
|
||||||
|
try {
|
||||||
|
await bridge.sendCommand('largeEcho', { data: 'z'.repeat(2000) });
|
||||||
|
} catch (err: any) {
|
||||||
|
threw = true;
|
||||||
|
expect(err.message).toInclude('maxPayloadSize');
|
||||||
|
}
|
||||||
|
expect(threw).toBeTrue();
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should handle multiple large concurrent commands', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 30000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
const size = 500 * 1024; // 500KB each
|
||||||
|
const results = await Promise.all([
|
||||||
|
bridge.sendCommand('largeEcho', { data: 'a'.repeat(size), id: 1 }),
|
||||||
|
bridge.sendCommand('largeEcho', { data: 'b'.repeat(size), id: 2 }),
|
||||||
|
bridge.sendCommand('largeEcho', { data: 'c'.repeat(size), id: 3 }),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(results[0].data.length).toEqual(size);
|
||||||
|
expect(results[0].data[0]).toEqual('a');
|
||||||
|
expect(results[1].data.length).toEqual(size);
|
||||||
|
expect(results[1].data[0]).toEqual('b');
|
||||||
|
expect(results[2].data.length).toEqual(size);
|
||||||
|
expect(results[2].data[0]).toEqual('c');
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
// === Streaming tests ===
|
||||||
|
|
||||||
|
tap.test('streaming: should receive chunks via for-await-of and final result', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 10000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
|
||||||
|
const chunks: Array<{ index: number; value: string }> = [];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(chunks.length).toEqual(5);
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
expect(chunks[i].index).toEqual(i);
|
||||||
|
expect(chunks[i].value).toEqual(`chunk_${i}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await stream.result;
|
||||||
|
expect(result.totalChunks).toEqual(5);
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('streaming: should handle zero chunks (immediate result)', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
const stream = bridge.sendCommandStreaming('streamEmpty', {});
|
||||||
|
const chunks: any[] = [];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(chunks.length).toEqual(0);
|
||||||
|
|
||||||
|
const result = await stream.result;
|
||||||
|
expect(result.totalChunks).toEqual(0);
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('streaming: should propagate error to iterator and .result', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 10000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
const stream = bridge.sendCommandStreaming('streamError', {});
|
||||||
|
const chunks: any[] = [];
|
||||||
|
let iteratorError: Error | null = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
} catch (err: any) {
|
||||||
|
iteratorError = err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should have received at least one chunk before error
|
||||||
|
expect(chunks.length).toEqual(1);
|
||||||
|
expect(chunks[0].value).toEqual('before_error');
|
||||||
|
|
||||||
|
// Iterator should have thrown
|
||||||
|
expect(iteratorError).toBeTruthy();
|
||||||
|
expect(iteratorError!.message).toInclude('Stream error after chunk');
|
||||||
|
|
||||||
|
// .result should also reject
|
||||||
|
let resultError: Error | null = null;
|
||||||
|
try {
|
||||||
|
await stream.result;
|
||||||
|
} catch (err: any) {
|
||||||
|
resultError = err;
|
||||||
|
}
|
||||||
|
expect(resultError).toBeTruthy();
|
||||||
|
expect(resultError!.message).toInclude('Stream error after chunk');
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('streaming: should fail when bridge is not running', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
});
|
||||||
|
|
||||||
|
const stream = bridge.sendCommandStreaming('streamEcho', { count: 3 });
|
||||||
|
|
||||||
|
let resultError: Error | null = null;
|
||||||
|
try {
|
||||||
|
await stream.result;
|
||||||
|
} catch (err: any) {
|
||||||
|
resultError = err;
|
||||||
|
}
|
||||||
|
expect(resultError).toBeTruthy();
|
||||||
|
expect(resultError!.message).toInclude('not running');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('streaming: should fail when killed mid-stream', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'node',
|
||||||
|
binaryPath: 'node',
|
||||||
|
cliArgs: [mockBinaryPath],
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 30000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.spawn();
|
||||||
|
|
||||||
|
// Request many chunks so we can kill mid-stream
|
||||||
|
const stream = bridge.sendCommandStreaming('streamEcho', { count: 100 });
|
||||||
|
const chunks: any[] = [];
|
||||||
|
let iteratorError: Error | null = null;
|
||||||
|
|
||||||
|
// Kill after a short delay
|
||||||
|
setTimeout(() => {
|
||||||
|
bridge.kill();
|
||||||
|
}, 50);
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
} catch (err: any) {
|
||||||
|
iteratorError = err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should have gotten some chunks but not all
|
||||||
|
expect(iteratorError).toBeTruthy();
|
||||||
|
expect(iteratorError!.message).toInclude('killed');
|
||||||
|
});
|
||||||
|
|
||||||
export default tap.start();
|
export default tap.start();
|
||||||
|
|||||||
312
test/test.sockettransport.node.ts
Normal file
312
test/test.sockettransport.node.ts
Normal file
@@ -0,0 +1,312 @@
|
|||||||
|
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||||
|
import * as path from 'path';
|
||||||
|
import * as fs from 'fs';
|
||||||
|
import * as childProcess from 'child_process';
|
||||||
|
import * as os from 'os';
|
||||||
|
import { RustBridge } from '../ts/classes.rustbridge.js';
|
||||||
|
import type { ICommandDefinition } from '../ts/interfaces/index.js';
|
||||||
|
|
||||||
|
const testDir = path.resolve(path.dirname(new URL(import.meta.url).pathname));
|
||||||
|
const mockServerPath = path.join(testDir, 'helpers/mock-socket-server.mjs');
|
||||||
|
|
||||||
|
// Define the command types for our mock binary
|
||||||
|
type TMockCommands = {
|
||||||
|
echo: { params: Record<string, any>; result: Record<string, any> };
|
||||||
|
largeEcho: { params: Record<string, any>; result: Record<string, any> };
|
||||||
|
error: { params: {}; result: never };
|
||||||
|
emitEvent: { params: { eventName: string; eventData: any }; result: null };
|
||||||
|
slow: { params: {}; result: { delayed: boolean } };
|
||||||
|
exit: { params: {}; result: null };
|
||||||
|
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
|
||||||
|
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
|
||||||
|
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the mock socket server and return the socket path.
|
||||||
|
* Returns { proc, socketPath }.
|
||||||
|
*/
|
||||||
|
async function startMockServer(testName: string): Promise<{ proc: childProcess.ChildProcess; socketPath: string }> {
|
||||||
|
const socketPath = path.join(os.tmpdir(), `smartrust-test-${Date.now()}-${testName}.sock`);
|
||||||
|
|
||||||
|
const proc = childProcess.spawn('node', [mockServerPath, socketPath], {
|
||||||
|
stdio: ['pipe', 'pipe', 'pipe'],
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the server to signal readiness via stdout
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let stdoutData = '';
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
proc.kill();
|
||||||
|
reject(new Error('Mock server did not start within 5s'));
|
||||||
|
}, 5000);
|
||||||
|
|
||||||
|
proc.stdout!.on('data', (data: Buffer) => {
|
||||||
|
stdoutData += data.toString();
|
||||||
|
const lines = stdoutData.split('\n');
|
||||||
|
for (const line of lines) {
|
||||||
|
if (line.trim()) {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(line.trim());
|
||||||
|
if (parsed.ready) {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve({ proc, socketPath: parsed.socketPath });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch { /* not JSON yet */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
proc.on('error', (err) => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
reject(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
proc.on('exit', (code) => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
reject(new Error(`Mock server exited with code ${code}`));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function stopMockServer(proc: childProcess.ChildProcess, socketPath: string) {
|
||||||
|
try { proc.kill('SIGTERM'); } catch { /* ignore */ }
|
||||||
|
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
// === Socket Transport Tests via RustBridge.connect() ===
|
||||||
|
|
||||||
|
tap.test('socket: should connect and receive ready event', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('connect-ready');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await bridge.connect(socketPath);
|
||||||
|
expect(result).toBeTrue();
|
||||||
|
expect(bridge.running).toBeTrue();
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
expect(bridge.running).toBeFalse();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should send command and receive response', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('send-command');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.connect(socketPath);
|
||||||
|
|
||||||
|
const result = await bridge.sendCommand('echo', { hello: 'world', num: 42 });
|
||||||
|
expect(result).toEqual({ hello: 'world', num: 42 });
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should handle error responses', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('error-response');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.connect(socketPath);
|
||||||
|
|
||||||
|
let threw = false;
|
||||||
|
try {
|
||||||
|
await bridge.sendCommand('error', {});
|
||||||
|
} catch (err: any) {
|
||||||
|
threw = true;
|
||||||
|
expect(err.message).toInclude('Test error message');
|
||||||
|
}
|
||||||
|
expect(threw).toBeTrue();
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should receive custom events', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('custom-events');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.connect(socketPath);
|
||||||
|
|
||||||
|
const eventPromise = new Promise<any>((resolve) => {
|
||||||
|
bridge.once('management:testEvent', (data) => resolve(data));
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.sendCommand('emitEvent', {
|
||||||
|
eventName: 'testEvent',
|
||||||
|
eventData: { key: 'value' },
|
||||||
|
});
|
||||||
|
|
||||||
|
const eventData = await eventPromise;
|
||||||
|
expect(eventData).toEqual({ key: 'value' });
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should handle multiple concurrent commands', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('concurrent');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.connect(socketPath);
|
||||||
|
|
||||||
|
const results = await Promise.all([
|
||||||
|
bridge.sendCommand('echo', { id: 1 }),
|
||||||
|
bridge.sendCommand('echo', { id: 2 }),
|
||||||
|
bridge.sendCommand('echo', { id: 3 }),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(results[0]).toEqual({ id: 1 });
|
||||||
|
expect(results[1]).toEqual({ id: 2 });
|
||||||
|
expect(results[2]).toEqual({ id: 3 });
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should handle 1MB payload round-trip', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('large-payload');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 30000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.connect(socketPath);
|
||||||
|
|
||||||
|
const largeString = 'x'.repeat(1024 * 1024);
|
||||||
|
const result = await bridge.sendCommand('largeEcho', { data: largeString });
|
||||||
|
expect(result.data).toEqual(largeString);
|
||||||
|
expect(result.data.length).toEqual(1024 * 1024);
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should disconnect without killing the daemon', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('no-kill-daemon');
|
||||||
|
try {
|
||||||
|
// First connection
|
||||||
|
const bridge1 = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
await bridge1.connect(socketPath);
|
||||||
|
expect(bridge1.running).toBeTrue();
|
||||||
|
|
||||||
|
// Disconnect
|
||||||
|
bridge1.kill();
|
||||||
|
expect(bridge1.running).toBeFalse();
|
||||||
|
|
||||||
|
// Second connection — daemon should still be alive
|
||||||
|
const bridge2 = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
});
|
||||||
|
const result = await bridge2.connect(socketPath);
|
||||||
|
expect(result).toBeTrue();
|
||||||
|
expect(bridge2.running).toBeTrue();
|
||||||
|
|
||||||
|
// Verify the daemon is functional
|
||||||
|
const echoResult = await bridge2.sendCommand('echo', { reconnected: true });
|
||||||
|
expect(echoResult).toEqual({ reconnected: true });
|
||||||
|
|
||||||
|
bridge2.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should stream responses via socket', async () => {
|
||||||
|
const { proc, socketPath } = await startMockServer('streaming');
|
||||||
|
try {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 5000,
|
||||||
|
requestTimeoutMs: 10000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await bridge.connect(socketPath);
|
||||||
|
|
||||||
|
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
|
||||||
|
const chunks: Array<{ index: number; value: string }> = [];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(chunks.length).toEqual(5);
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
expect(chunks[i].index).toEqual(i);
|
||||||
|
expect(chunks[i].value).toEqual(`chunk_${i}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await stream.result;
|
||||||
|
expect(result.totalChunks).toEqual(5);
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
} finally {
|
||||||
|
stopMockServer(proc, socketPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should return false when socket path does not exist', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
readyTimeoutMs: 3000,
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await bridge.connect('/tmp/nonexistent-smartrust-test.sock');
|
||||||
|
expect(result).toBeFalse();
|
||||||
|
expect(bridge.running).toBeFalse();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('socket: should throw when sending command while not connected', async () => {
|
||||||
|
const bridge = new RustBridge<TMockCommands>({
|
||||||
|
binaryName: 'mock-daemon',
|
||||||
|
});
|
||||||
|
|
||||||
|
let threw = false;
|
||||||
|
try {
|
||||||
|
await bridge.sendCommand('echo', {});
|
||||||
|
} catch (err: any) {
|
||||||
|
threw = true;
|
||||||
|
expect(err.message).toInclude('not running');
|
||||||
|
}
|
||||||
|
expect(threw).toBeTrue();
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartrust',
|
name: '@push.rocks/smartrust',
|
||||||
version: '1.1.1',
|
version: '1.3.1',
|
||||||
description: 'a bridge between JS engines and rust'
|
description: 'a bridge between JS engines and rust'
|
||||||
}
|
}
|
||||||
|
|||||||
51
ts/classes.linescanner.ts
Normal file
51
ts/classes.linescanner.ts
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
import type { IRustBridgeLogger } from './interfaces/index.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Buffer-based newline scanner for streaming binary data.
|
||||||
|
* Accumulates chunks and emits complete lines via callback.
|
||||||
|
* Used by both StdioTransport and SocketTransport.
|
||||||
|
*/
|
||||||
|
export class LineScanner {
|
||||||
|
private buffer: Buffer = Buffer.alloc(0);
|
||||||
|
private maxPayloadSize: number;
|
||||||
|
private logger: IRustBridgeLogger;
|
||||||
|
|
||||||
|
constructor(maxPayloadSize: number, logger: IRustBridgeLogger) {
|
||||||
|
this.maxPayloadSize = maxPayloadSize;
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Feed a chunk of data. Calls onLine for each complete newline-terminated line found.
|
||||||
|
*/
|
||||||
|
public push(chunk: Buffer, onLine: (line: string) => void): void {
|
||||||
|
this.buffer = Buffer.concat([this.buffer, chunk]);
|
||||||
|
|
||||||
|
let newlineIndex: number;
|
||||||
|
while ((newlineIndex = this.buffer.indexOf(0x0A)) !== -1) {
|
||||||
|
const lineBuffer = this.buffer.subarray(0, newlineIndex);
|
||||||
|
this.buffer = this.buffer.subarray(newlineIndex + 1);
|
||||||
|
|
||||||
|
if (lineBuffer.length > this.maxPayloadSize) {
|
||||||
|
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const line = lineBuffer.toString('utf8').trim();
|
||||||
|
if (line) {
|
||||||
|
onLine(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prevent OOM if sender never sends newline
|
||||||
|
if (this.buffer.length > this.maxPayloadSize) {
|
||||||
|
this.logger.log('error', `Buffer exceeded maxPayloadSize (${this.buffer.length} bytes) without newline, clearing`);
|
||||||
|
this.buffer = Buffer.alloc(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Reset the internal buffer. */
|
||||||
|
public clear(): void {
|
||||||
|
this.buffer = Buffer.alloc(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -108,7 +108,8 @@ export class RustBinaryLocator {
|
|||||||
const packageName = `${platformPackagePrefix}-${platform}-${arch}`;
|
const packageName = `${platformPackagePrefix}-${platform}-${arch}`;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const packagePath = require.resolve(`${packageName}/${binaryName}`);
|
const resolved = import.meta.resolve(`${packageName}/${binaryName}`);
|
||||||
|
const packagePath = plugins.url.fileURLToPath(resolved);
|
||||||
if (await this.isExecutable(packagePath)) {
|
if (await this.isExecutable(packagePath)) {
|
||||||
return packagePath;
|
return packagePath;
|
||||||
}
|
}
|
||||||
@@ -123,7 +124,16 @@ export class RustBinaryLocator {
|
|||||||
await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK);
|
await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK);
|
||||||
return true;
|
return true;
|
||||||
} catch {
|
} catch {
|
||||||
return false;
|
// File may exist but lack execute bit (common after npm/pnpm install).
|
||||||
|
// Try to make it executable.
|
||||||
|
try {
|
||||||
|
await plugins.fs.promises.access(filePath, plugins.fs.constants.F_OK);
|
||||||
|
await plugins.fs.promises.chmod(filePath, 0o755);
|
||||||
|
this.logger.log('info', `Auto-fixed missing execute permission on: ${filePath}`);
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,19 @@
|
|||||||
import * as plugins from './plugins.js';
|
import * as plugins from './plugins.js';
|
||||||
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||||
|
import { StreamingResponse } from './classes.streamingresponse.js';
|
||||||
|
import { StdioTransport } from './classes.stdiotransport.js';
|
||||||
|
import { SocketTransport } from './classes.sockettransport.js';
|
||||||
import type {
|
import type {
|
||||||
IRustBridgeOptions,
|
IRustBridgeOptions,
|
||||||
IRustBridgeLogger,
|
IRustBridgeLogger,
|
||||||
|
ISocketConnectOptions,
|
||||||
TCommandMap,
|
TCommandMap,
|
||||||
IManagementRequest,
|
IManagementRequest,
|
||||||
IManagementResponse,
|
IManagementResponse,
|
||||||
IManagementEvent,
|
IManagementEvent,
|
||||||
|
TStreamingCommandKeys,
|
||||||
|
TExtractChunk,
|
||||||
|
IRustTransport,
|
||||||
} from './interfaces/index.js';
|
} from './interfaces/index.js';
|
||||||
|
|
||||||
const defaultLogger: IRustBridgeLogger = {
|
const defaultLogger: IRustBridgeLogger = {
|
||||||
@@ -15,20 +22,21 @@ const defaultLogger: IRustBridgeLogger = {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Generic bridge between TypeScript and a Rust binary.
|
* Generic bridge between TypeScript and a Rust binary.
|
||||||
* Communicates via JSON-over-stdin/stdout IPC protocol.
|
* Communicates via JSON-over-stdin/stdout IPC protocol (stdio mode)
|
||||||
|
* or JSON-over-Unix-socket/named-pipe (socket mode).
|
||||||
*
|
*
|
||||||
* @typeParam TCommands - Map of command names to their param/result types
|
* @typeParam TCommands - Map of command names to their param/result types
|
||||||
*/
|
*/
|
||||||
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
|
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
|
||||||
private locator: RustBinaryLocator;
|
private locator: RustBinaryLocator;
|
||||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName'>> & IRustBridgeOptions;
|
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
|
||||||
private logger: IRustBridgeLogger;
|
private logger: IRustBridgeLogger;
|
||||||
private childProcess: plugins.childProcess.ChildProcess | null = null;
|
private transport: IRustTransport | null = null;
|
||||||
private readlineInterface: plugins.readline.Interface | null = null;
|
|
||||||
private pendingRequests = new Map<string, {
|
private pendingRequests = new Map<string, {
|
||||||
resolve: (value: any) => void;
|
resolve: (value: any) => void;
|
||||||
reject: (error: Error) => void;
|
reject: (error: Error) => void;
|
||||||
timer: ReturnType<typeof setTimeout>;
|
timer: ReturnType<typeof setTimeout>;
|
||||||
|
streaming?: StreamingResponse<any, any>;
|
||||||
}>();
|
}>();
|
||||||
private requestCounter = 0;
|
private requestCounter = 0;
|
||||||
private isRunning = false;
|
private isRunning = false;
|
||||||
@@ -42,6 +50,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
requestTimeoutMs: 30000,
|
requestTimeoutMs: 30000,
|
||||||
readyTimeoutMs: 10000,
|
readyTimeoutMs: 10000,
|
||||||
readyEventName: 'ready',
|
readyEventName: 'ready',
|
||||||
|
maxPayloadSize: 50 * 1024 * 1024,
|
||||||
...options,
|
...options,
|
||||||
};
|
};
|
||||||
this.locator = new RustBinaryLocator(options, this.logger);
|
this.locator = new RustBinaryLocator(options, this.logger);
|
||||||
@@ -57,61 +66,93 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const transport = new StdioTransport({
|
||||||
|
binaryPath: this.binaryPath,
|
||||||
|
cliArgs: this.options.cliArgs,
|
||||||
|
env: this.options.env,
|
||||||
|
maxPayloadSize: this.options.maxPayloadSize,
|
||||||
|
logger: this.logger,
|
||||||
|
});
|
||||||
|
|
||||||
|
return this.connectWithTransport(transport);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to an already-running Rust daemon via Unix socket or named pipe.
|
||||||
|
* Returns true if the connection was established and the daemon signaled readiness.
|
||||||
|
*
|
||||||
|
* @param socketPath - Path to Unix socket or Windows named pipe
|
||||||
|
* @param socketOptions - Optional socket connection options (reconnect, etc.)
|
||||||
|
*/
|
||||||
|
public async connect(socketPath: string, socketOptions?: ISocketConnectOptions): Promise<boolean> {
|
||||||
|
const transport = new SocketTransport({
|
||||||
|
socketPath,
|
||||||
|
maxPayloadSize: this.options.maxPayloadSize,
|
||||||
|
logger: this.logger,
|
||||||
|
autoReconnect: socketOptions?.autoReconnect,
|
||||||
|
reconnectBaseDelayMs: socketOptions?.reconnectBaseDelayMs,
|
||||||
|
reconnectMaxDelayMs: socketOptions?.reconnectMaxDelayMs,
|
||||||
|
maxReconnectAttempts: socketOptions?.maxReconnectAttempts,
|
||||||
|
});
|
||||||
|
|
||||||
|
return this.connectWithTransport(transport);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal: wire up any transport and wait for the ready handshake.
|
||||||
|
*/
|
||||||
|
private connectWithTransport(transport: IRustTransport): Promise<boolean> {
|
||||||
return new Promise<boolean>((resolve) => {
|
return new Promise<boolean>((resolve) => {
|
||||||
try {
|
try {
|
||||||
const env = this.options.env
|
this.transport = transport;
|
||||||
? { ...process.env, ...this.options.env }
|
|
||||||
: { ...process.env };
|
|
||||||
|
|
||||||
this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, {
|
// Wire transport events
|
||||||
stdio: ['pipe', 'pipe', 'pipe'],
|
transport.on('line', (line: string) => this.handleLine(line));
|
||||||
env,
|
|
||||||
|
transport.on('stderr', (line: string) => {
|
||||||
|
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
|
||||||
|
this.emit('stderr', line);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Handle stderr
|
transport.on('close', (...args: any[]) => {
|
||||||
this.childProcess.stderr?.on('data', (data: Buffer) => {
|
this.logger.log('info', `Transport closed`);
|
||||||
const lines = data.toString().split('\n').filter((l: string) => l.trim());
|
|
||||||
for (const line of lines) {
|
|
||||||
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
|
|
||||||
this.emit('stderr', line);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Handle stdout via readline for line-delimited JSON
|
|
||||||
this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! });
|
|
||||||
this.readlineInterface.on('line', (line: string) => {
|
|
||||||
this.handleLine(line.trim());
|
|
||||||
});
|
|
||||||
|
|
||||||
// Handle process exit
|
|
||||||
this.childProcess.on('exit', (code, signal) => {
|
|
||||||
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
|
|
||||||
this.cleanup();
|
this.cleanup();
|
||||||
this.emit('exit', code, signal);
|
this.emit('exit', ...args);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.childProcess.on('error', (err) => {
|
transport.on('error', (err: Error) => {
|
||||||
this.logger.log('error', `Process error: ${err.message}`);
|
this.logger.log('error', `Transport error: ${err.message}`);
|
||||||
this.cleanup();
|
this.cleanup();
|
||||||
resolve(false);
|
resolve(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wait for the ready event
|
transport.on('reconnected', () => {
|
||||||
const readyTimeout = setTimeout(() => {
|
this.logger.log('info', 'Transport reconnected, waiting for ready event');
|
||||||
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
|
this.emit('reconnected');
|
||||||
this.kill();
|
});
|
||||||
resolve(false);
|
|
||||||
}, this.options.readyTimeoutMs);
|
|
||||||
|
|
||||||
this.once(`management:${this.options.readyEventName}`, () => {
|
// Connect the transport
|
||||||
clearTimeout(readyTimeout);
|
transport.connect().then(() => {
|
||||||
this.isRunning = true;
|
// Wait for the ready event from the protocol layer
|
||||||
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
|
const readyTimeout = setTimeout(() => {
|
||||||
this.emit('ready');
|
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
|
||||||
resolve(true);
|
this.kill();
|
||||||
|
resolve(false);
|
||||||
|
}, this.options.readyTimeoutMs);
|
||||||
|
|
||||||
|
this.once(`management:${this.options.readyEventName}`, () => {
|
||||||
|
clearTimeout(readyTimeout);
|
||||||
|
this.isRunning = true;
|
||||||
|
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
|
||||||
|
this.emit('ready');
|
||||||
|
resolve(true);
|
||||||
|
});
|
||||||
|
}).catch((err: Error) => {
|
||||||
|
this.logger.log('error', `Transport connect failed: ${err.message}`);
|
||||||
|
resolve(false);
|
||||||
});
|
});
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
this.logger.log('error', `Failed to spawn: ${err.message}`);
|
this.logger.log('error', `Failed to connect: ${err.message}`);
|
||||||
resolve(false);
|
resolve(false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -124,12 +165,21 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
method: K,
|
method: K,
|
||||||
params: TCommands[K]['params'],
|
params: TCommands[K]['params'],
|
||||||
): Promise<TCommands[K]['result']> {
|
): Promise<TCommands[K]['result']> {
|
||||||
if (!this.childProcess || !this.isRunning) {
|
if (!this.transport?.connected || !this.isRunning) {
|
||||||
throw new Error(`${this.options.binaryName} bridge is not running`);
|
throw new Error(`${this.options.binaryName} bridge is not running`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const id = `req_${++this.requestCounter}`;
|
const id = `req_${++this.requestCounter}`;
|
||||||
const request: IManagementRequest = { id, method, params };
|
const request: IManagementRequest = { id, method, params };
|
||||||
|
const json = JSON.stringify(request);
|
||||||
|
|
||||||
|
// Check outbound payload size
|
||||||
|
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||||
|
if (byteLength > this.options.maxPayloadSize) {
|
||||||
|
throw new Error(
|
||||||
|
`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
return new Promise<TCommands[K]['result']>((resolve, reject) => {
|
return new Promise<TCommands[K]['result']>((resolve, reject) => {
|
||||||
const timer = setTimeout(() => {
|
const timer = setTimeout(() => {
|
||||||
@@ -139,32 +189,75 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
|
|
||||||
this.pendingRequests.set(id, { resolve, reject, timer });
|
this.pendingRequests.set(id, { resolve, reject, timer });
|
||||||
|
|
||||||
const json = JSON.stringify(request) + '\n';
|
this.transport!.write(json + '\n').catch((err) => {
|
||||||
this.childProcess!.stdin!.write(json, (err) => {
|
clearTimeout(timer);
|
||||||
if (err) {
|
this.pendingRequests.delete(id);
|
||||||
clearTimeout(timer);
|
reject(new Error(`Failed to write to transport: ${err.message}`));
|
||||||
this.pendingRequests.delete(id);
|
|
||||||
reject(new Error(`Failed to write to stdin: ${err.message}`));
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Kill the Rust process and clean up all resources.
|
* Send a streaming command to the Rust process.
|
||||||
|
* Returns a StreamingResponse that yields chunks via `for await...of`
|
||||||
|
* and exposes `.result` for the final response.
|
||||||
|
*/
|
||||||
|
public sendCommandStreaming<K extends string & TStreamingCommandKeys<TCommands>>(
|
||||||
|
method: K,
|
||||||
|
params: TCommands[K]['params'],
|
||||||
|
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
|
||||||
|
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
|
||||||
|
|
||||||
|
if (!this.transport?.connected || !this.isRunning) {
|
||||||
|
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
|
||||||
|
return streaming;
|
||||||
|
}
|
||||||
|
|
||||||
|
const id = `req_${++this.requestCounter}`;
|
||||||
|
const request: IManagementRequest = { id, method, params };
|
||||||
|
const json = JSON.stringify(request);
|
||||||
|
|
||||||
|
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||||
|
if (byteLength > this.options.maxPayloadSize) {
|
||||||
|
streaming.fail(
|
||||||
|
new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`)
|
||||||
|
);
|
||||||
|
return streaming;
|
||||||
|
}
|
||||||
|
|
||||||
|
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
this.pendingRequests.delete(id);
|
||||||
|
streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`));
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
this.pendingRequests.set(id, {
|
||||||
|
resolve: (result: any) => streaming.finish(result),
|
||||||
|
reject: (error: Error) => streaming.fail(error),
|
||||||
|
timer,
|
||||||
|
streaming,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.transport!.write(json + '\n').catch((err) => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
this.pendingRequests.delete(id);
|
||||||
|
streaming.fail(new Error(`Failed to write to transport: ${err.message}`));
|
||||||
|
});
|
||||||
|
|
||||||
|
return streaming;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kill the connection and clean up all resources.
|
||||||
|
* For stdio: kills the child process (SIGTERM, then SIGKILL).
|
||||||
|
* For socket: closes the socket connection (does NOT kill the daemon).
|
||||||
*/
|
*/
|
||||||
public kill(): void {
|
public kill(): void {
|
||||||
if (this.childProcess) {
|
if (this.transport) {
|
||||||
const proc = this.childProcess;
|
const transport = this.transport;
|
||||||
this.childProcess = null;
|
this.transport = null;
|
||||||
this.isRunning = false;
|
this.isRunning = false;
|
||||||
|
|
||||||
// Close readline
|
|
||||||
if (this.readlineInterface) {
|
|
||||||
this.readlineInterface.close();
|
|
||||||
this.readlineInterface = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reject pending requests
|
// Reject pending requests
|
||||||
for (const [, pending] of this.pendingRequests) {
|
for (const [, pending] of this.pendingRequests) {
|
||||||
clearTimeout(pending.timer);
|
clearTimeout(pending.timer);
|
||||||
@@ -172,27 +265,8 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
}
|
}
|
||||||
this.pendingRequests.clear();
|
this.pendingRequests.clear();
|
||||||
|
|
||||||
// Remove all listeners
|
transport.removeAllListeners();
|
||||||
proc.removeAllListeners();
|
transport.disconnect();
|
||||||
proc.stdout?.removeAllListeners();
|
|
||||||
proc.stderr?.removeAllListeners();
|
|
||||||
proc.stdin?.removeAllListeners();
|
|
||||||
|
|
||||||
// Kill the process
|
|
||||||
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
|
|
||||||
|
|
||||||
// Destroy stdio pipes
|
|
||||||
try { proc.stdin?.destroy(); } catch { /* ignore */ }
|
|
||||||
try { proc.stdout?.destroy(); } catch { /* ignore */ }
|
|
||||||
try { proc.stderr?.destroy(); } catch { /* ignore */ }
|
|
||||||
|
|
||||||
// Unref so Node doesn't wait
|
|
||||||
try { proc.unref(); } catch { /* ignore */ }
|
|
||||||
|
|
||||||
// Force kill after 5 seconds
|
|
||||||
setTimeout(() => {
|
|
||||||
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
|
|
||||||
}, 5000).unref();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -221,6 +295,22 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stream chunk (has 'id' + stream === true + 'data')
|
||||||
|
if ('id' in parsed && parsed.stream === true && 'data' in parsed) {
|
||||||
|
const pending = this.pendingRequests.get(parsed.id);
|
||||||
|
if (pending?.streaming) {
|
||||||
|
// Reset inactivity timeout
|
||||||
|
clearTimeout(pending.timer);
|
||||||
|
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
|
||||||
|
pending.timer = setTimeout(() => {
|
||||||
|
this.pendingRequests.delete(parsed.id);
|
||||||
|
pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`));
|
||||||
|
}, timeoutMs);
|
||||||
|
pending.streaming.pushChunk(parsed.data);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Otherwise it's a response (has 'id' field)
|
// Otherwise it's a response (has 'id' field)
|
||||||
if ('id' in parsed) {
|
if ('id' in parsed) {
|
||||||
const response = parsed as IManagementResponse;
|
const response = parsed as IManagementResponse;
|
||||||
@@ -239,12 +329,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
|
|||||||
|
|
||||||
private cleanup(): void {
|
private cleanup(): void {
|
||||||
this.isRunning = false;
|
this.isRunning = false;
|
||||||
this.childProcess = null;
|
this.transport = null;
|
||||||
|
|
||||||
if (this.readlineInterface) {
|
|
||||||
this.readlineInterface.close();
|
|
||||||
this.readlineInterface = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reject all pending requests
|
// Reject all pending requests
|
||||||
for (const [, pending] of this.pendingRequests) {
|
for (const [, pending] of this.pendingRequests) {
|
||||||
|
|||||||
187
ts/classes.sockettransport.ts
Normal file
187
ts/classes.sockettransport.ts
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
import * as plugins from './plugins.js';
|
||||||
|
import { LineScanner } from './classes.linescanner.js';
|
||||||
|
import type { IRustBridgeLogger } from './interfaces/index.js';
|
||||||
|
import type { IRustTransport } from './interfaces/transport.js';
|
||||||
|
|
||||||
|
export interface ISocketTransportOptions {
|
||||||
|
/** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */
|
||||||
|
socketPath: string;
|
||||||
|
/** Maximum inbound message size in bytes */
|
||||||
|
maxPayloadSize: number;
|
||||||
|
/** Logger instance */
|
||||||
|
logger: IRustBridgeLogger;
|
||||||
|
/** Enable auto-reconnect on unexpected disconnect (default: false) */
|
||||||
|
autoReconnect?: boolean;
|
||||||
|
/** Initial delay between reconnect attempts in ms (default: 100) */
|
||||||
|
reconnectBaseDelayMs?: number;
|
||||||
|
/** Maximum delay between reconnect attempts in ms (default: 30000) */
|
||||||
|
reconnectMaxDelayMs?: number;
|
||||||
|
/** Maximum number of reconnect attempts before giving up (default: 10) */
|
||||||
|
maxReconnectAttempts?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface IResolvedSocketTransportOptions {
|
||||||
|
socketPath: string;
|
||||||
|
maxPayloadSize: number;
|
||||||
|
logger: IRustBridgeLogger;
|
||||||
|
autoReconnect: boolean;
|
||||||
|
reconnectBaseDelayMs: number;
|
||||||
|
reconnectMaxDelayMs: number;
|
||||||
|
maxReconnectAttempts: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transport that connects to an already-running process via Unix socket or Windows named pipe.
|
||||||
|
* The JSON-over-newline protocol is identical to stdio; only the transport changes.
|
||||||
|
*/
|
||||||
|
export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport {
|
||||||
|
private options: IResolvedSocketTransportOptions;
|
||||||
|
private socket: plugins.net.Socket | null = null;
|
||||||
|
private lineScanner: LineScanner;
|
||||||
|
private _connected: boolean = false;
|
||||||
|
private intentionalDisconnect: boolean = false;
|
||||||
|
private reconnectAttempts: number = 0;
|
||||||
|
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
constructor(options: ISocketTransportOptions) {
|
||||||
|
super();
|
||||||
|
this.options = {
|
||||||
|
autoReconnect: false,
|
||||||
|
reconnectBaseDelayMs: 100,
|
||||||
|
reconnectMaxDelayMs: 30000,
|
||||||
|
maxReconnectAttempts: 10,
|
||||||
|
...options,
|
||||||
|
};
|
||||||
|
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
public get connected(): boolean {
|
||||||
|
return this._connected;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to the socket. Resolves when the TCP/Unix connection is established.
|
||||||
|
*/
|
||||||
|
public async connect(): Promise<void> {
|
||||||
|
this.intentionalDisconnect = false;
|
||||||
|
this.reconnectAttempts = 0;
|
||||||
|
return this.doConnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
private doConnect(): Promise<void> {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
let settled = false;
|
||||||
|
|
||||||
|
this.socket = plugins.net.connect({ path: this.options.socketPath });
|
||||||
|
|
||||||
|
this.socket.on('connect', () => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
this._connected = true;
|
||||||
|
this.reconnectAttempts = 0;
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.socket.on('data', (chunk: Buffer) => {
|
||||||
|
this.lineScanner.push(chunk, (line) => {
|
||||||
|
this.emit('line', line);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
this.socket.on('close', () => {
|
||||||
|
const wasConnected = this._connected;
|
||||||
|
this._connected = false;
|
||||||
|
this.lineScanner.clear();
|
||||||
|
|
||||||
|
if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) {
|
||||||
|
this.attemptReconnect();
|
||||||
|
}
|
||||||
|
this.emit('close');
|
||||||
|
});
|
||||||
|
|
||||||
|
this.socket.on('error', (err: Error) => {
|
||||||
|
this._connected = false;
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
reject(err);
|
||||||
|
} else if (!this.intentionalDisconnect) {
|
||||||
|
this.emit('error', err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private attemptReconnect(): void {
|
||||||
|
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
|
||||||
|
this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`);
|
||||||
|
this.emit('reconnect_failed');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const delay = Math.min(
|
||||||
|
this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts),
|
||||||
|
this.options.reconnectMaxDelayMs,
|
||||||
|
);
|
||||||
|
this.reconnectAttempts++;
|
||||||
|
|
||||||
|
this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
|
||||||
|
|
||||||
|
this.reconnectTimer = setTimeout(async () => {
|
||||||
|
this.reconnectTimer = null;
|
||||||
|
try {
|
||||||
|
await this.doConnect();
|
||||||
|
this.emit('reconnected');
|
||||||
|
} catch {
|
||||||
|
// doConnect rejected — the 'close' handler on the new socket will trigger another attempt
|
||||||
|
}
|
||||||
|
}, delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write data to the socket with backpressure support.
|
||||||
|
*/
|
||||||
|
public async write(data: string): Promise<void> {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
if (!this.socket || !this._connected) {
|
||||||
|
reject(new Error('Socket not connected'));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const canContinue = this.socket.write(data, 'utf8', (err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (canContinue) {
|
||||||
|
resolve();
|
||||||
|
} else {
|
||||||
|
this.socket.once('drain', () => {
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the socket connection. Does NOT kill the remote daemon.
|
||||||
|
*/
|
||||||
|
public disconnect(): void {
|
||||||
|
this.intentionalDisconnect = true;
|
||||||
|
|
||||||
|
if (this.reconnectTimer) {
|
||||||
|
clearTimeout(this.reconnectTimer);
|
||||||
|
this.reconnectTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.socket) {
|
||||||
|
const sock = this.socket;
|
||||||
|
this.socket = null;
|
||||||
|
this._connected = false;
|
||||||
|
this.lineScanner.clear();
|
||||||
|
sock.removeAllListeners();
|
||||||
|
sock.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
149
ts/classes.stdiotransport.ts
Normal file
149
ts/classes.stdiotransport.ts
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
import * as plugins from './plugins.js';
|
||||||
|
import { LineScanner } from './classes.linescanner.js';
|
||||||
|
import type { IRustBridgeLogger } from './interfaces/index.js';
|
||||||
|
import type { IRustTransport } from './interfaces/transport.js';
|
||||||
|
|
||||||
|
export interface IStdioTransportOptions {
|
||||||
|
binaryPath: string;
|
||||||
|
cliArgs: string[];
|
||||||
|
env?: Record<string, string>;
|
||||||
|
maxPayloadSize: number;
|
||||||
|
logger: IRustBridgeLogger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transport that spawns a child process and communicates via stdin/stdout.
|
||||||
|
* Extracted from the original RustBridge process management logic.
|
||||||
|
*/
|
||||||
|
export class StdioTransport extends plugins.events.EventEmitter implements IRustTransport {
|
||||||
|
private options: IStdioTransportOptions;
|
||||||
|
private childProcess: plugins.childProcess.ChildProcess | null = null;
|
||||||
|
private lineScanner: LineScanner;
|
||||||
|
private stderrRemainder: string = '';
|
||||||
|
private _connected: boolean = false;
|
||||||
|
|
||||||
|
constructor(options: IStdioTransportOptions) {
|
||||||
|
super();
|
||||||
|
this.options = options;
|
||||||
|
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
public get connected(): boolean {
|
||||||
|
return this._connected;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spawn the child process. Resolves when the process is running (not necessarily ready).
|
||||||
|
*/
|
||||||
|
public async connect(): Promise<void> {
|
||||||
|
const env = this.options.env
|
||||||
|
? { ...process.env, ...this.options.env }
|
||||||
|
: { ...process.env };
|
||||||
|
|
||||||
|
this.childProcess = plugins.childProcess.spawn(
|
||||||
|
this.options.binaryPath,
|
||||||
|
this.options.cliArgs,
|
||||||
|
{ stdio: ['pipe', 'pipe', 'pipe'], env },
|
||||||
|
);
|
||||||
|
|
||||||
|
// Handle stderr with cross-chunk buffering
|
||||||
|
this.childProcess.stderr?.on('data', (data: Buffer) => {
|
||||||
|
this.stderrRemainder += data.toString();
|
||||||
|
const lines = this.stderrRemainder.split('\n');
|
||||||
|
this.stderrRemainder = lines.pop()!;
|
||||||
|
for (const line of lines) {
|
||||||
|
const trimmed = line.trim();
|
||||||
|
if (trimmed) {
|
||||||
|
this.emit('stderr', trimmed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle stdout via LineScanner
|
||||||
|
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
|
||||||
|
this.lineScanner.push(chunk, (line) => {
|
||||||
|
this.emit('line', line);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle process exit
|
||||||
|
this.childProcess.on('exit', (code: number | null, signal: string | null) => {
|
||||||
|
// Flush remaining stderr
|
||||||
|
if (this.stderrRemainder.trim()) {
|
||||||
|
this.emit('stderr', this.stderrRemainder.trim());
|
||||||
|
}
|
||||||
|
this._connected = false;
|
||||||
|
this.lineScanner.clear();
|
||||||
|
this.stderrRemainder = '';
|
||||||
|
this.emit('close', code, signal);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.childProcess.on('error', (err: Error) => {
|
||||||
|
this._connected = false;
|
||||||
|
this.emit('error', err);
|
||||||
|
});
|
||||||
|
|
||||||
|
this._connected = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write data to stdin with backpressure support.
|
||||||
|
*/
|
||||||
|
public async write(data: string): Promise<void> {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
if (!this.childProcess?.stdin) {
|
||||||
|
reject(new Error('stdin not available'));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (canContinue) {
|
||||||
|
resolve();
|
||||||
|
} else {
|
||||||
|
this.childProcess.stdin.once('drain', () => {
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kill the child process. Sends SIGTERM, then SIGKILL after 5s.
|
||||||
|
*/
|
||||||
|
public disconnect(): void {
|
||||||
|
if (!this.childProcess) return;
|
||||||
|
|
||||||
|
const proc = this.childProcess;
|
||||||
|
this.childProcess = null;
|
||||||
|
this._connected = false;
|
||||||
|
this.lineScanner.clear();
|
||||||
|
this.stderrRemainder = '';
|
||||||
|
|
||||||
|
// Remove all listeners
|
||||||
|
proc.removeAllListeners();
|
||||||
|
proc.stdout?.removeAllListeners();
|
||||||
|
proc.stderr?.removeAllListeners();
|
||||||
|
proc.stdin?.removeAllListeners();
|
||||||
|
|
||||||
|
// Kill the process
|
||||||
|
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
|
||||||
|
|
||||||
|
// Destroy stdio pipes
|
||||||
|
try { proc.stdin?.destroy(); } catch { /* ignore */ }
|
||||||
|
try { proc.stdout?.destroy(); } catch { /* ignore */ }
|
||||||
|
try { proc.stderr?.destroy(); } catch { /* ignore */ }
|
||||||
|
|
||||||
|
// Unref so Node doesn't wait
|
||||||
|
try { proc.unref(); } catch { /* ignore */ }
|
||||||
|
|
||||||
|
// Force kill after 5 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
|
||||||
|
}, 5000).unref();
|
||||||
|
}
|
||||||
|
}
|
||||||
110
ts/classes.streamingresponse.ts
Normal file
110
ts/classes.streamingresponse.ts
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
/**
|
||||||
|
* Represents a streaming response from a Rust bridge command.
|
||||||
|
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
|
||||||
|
* and exposes `.result` for the final response once the stream ends.
|
||||||
|
*
|
||||||
|
* @typeParam TChunk - Type of each streamed chunk
|
||||||
|
* @typeParam TResult - Type of the final result
|
||||||
|
*/
|
||||||
|
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
|
||||||
|
/** Resolves with the final result when the stream ends successfully. */
|
||||||
|
public readonly result: Promise<TResult>;
|
||||||
|
|
||||||
|
private resolveResult!: (value: TResult) => void;
|
||||||
|
private rejectResult!: (error: Error) => void;
|
||||||
|
|
||||||
|
/** Buffered chunks not yet consumed by the iterator. */
|
||||||
|
private buffer: TChunk[] = [];
|
||||||
|
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
|
||||||
|
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
|
||||||
|
/** Waiting consumer reject callback. */
|
||||||
|
private waitingReject: ((error: Error) => void) | null = null;
|
||||||
|
|
||||||
|
private done = false;
|
||||||
|
private error: Error | null = null;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.result = new Promise<TResult>((resolve, reject) => {
|
||||||
|
this.resolveResult = resolve;
|
||||||
|
this.rejectResult = reject;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push a chunk into the stream. Called internally by RustBridge.
|
||||||
|
*/
|
||||||
|
public pushChunk(chunk: TChunk): void {
|
||||||
|
if (this.done) return;
|
||||||
|
|
||||||
|
if (this.waiting) {
|
||||||
|
// A consumer is waiting — deliver immediately
|
||||||
|
const resolve = this.waiting;
|
||||||
|
this.waiting = null;
|
||||||
|
this.waitingReject = null;
|
||||||
|
resolve({ value: chunk, done: false });
|
||||||
|
} else {
|
||||||
|
// No consumer waiting — buffer the chunk
|
||||||
|
this.buffer.push(chunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End the stream successfully with a final result. Called internally by RustBridge.
|
||||||
|
*/
|
||||||
|
public finish(result: TResult): void {
|
||||||
|
if (this.done) return;
|
||||||
|
this.done = true;
|
||||||
|
this.resolveResult(result);
|
||||||
|
|
||||||
|
// If a consumer is waiting, signal end of iteration
|
||||||
|
if (this.waiting) {
|
||||||
|
const resolve = this.waiting;
|
||||||
|
this.waiting = null;
|
||||||
|
this.waitingReject = null;
|
||||||
|
resolve({ value: undefined as any, done: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End the stream with an error. Called internally by RustBridge.
|
||||||
|
*/
|
||||||
|
public fail(error: Error): void {
|
||||||
|
if (this.done) return;
|
||||||
|
this.done = true;
|
||||||
|
this.error = error;
|
||||||
|
this.rejectResult(error);
|
||||||
|
|
||||||
|
// If a consumer is waiting, reject it
|
||||||
|
if (this.waitingReject) {
|
||||||
|
const reject = this.waitingReject;
|
||||||
|
this.waiting = null;
|
||||||
|
this.waitingReject = null;
|
||||||
|
reject(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
|
||||||
|
return {
|
||||||
|
next: (): Promise<IteratorResult<TChunk>> => {
|
||||||
|
// If there are buffered chunks, deliver one
|
||||||
|
if (this.buffer.length > 0) {
|
||||||
|
return Promise.resolve({ value: this.buffer.shift()!, done: false });
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the stream is done, signal end
|
||||||
|
if (this.done) {
|
||||||
|
if (this.error) {
|
||||||
|
return Promise.reject(this.error);
|
||||||
|
}
|
||||||
|
return Promise.resolve({ value: undefined as any, done: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
// No buffered chunks and not done — wait for the next push
|
||||||
|
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
|
||||||
|
this.waiting = resolve;
|
||||||
|
this.waitingReject = reject;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,7 @@
|
|||||||
export { RustBridge } from './classes.rustbridge.js';
|
export { RustBridge } from './classes.rustbridge.js';
|
||||||
export { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
export { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||||
|
export { StreamingResponse } from './classes.streamingresponse.js';
|
||||||
|
export { StdioTransport } from './classes.stdiotransport.js';
|
||||||
|
export { SocketTransport } from './classes.sockettransport.js';
|
||||||
|
export { LineScanner } from './classes.linescanner.js';
|
||||||
export * from './interfaces/index.js';
|
export * from './interfaces/index.js';
|
||||||
|
|||||||
@@ -39,4 +39,23 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions {
|
|||||||
readyEventName?: string;
|
readyEventName?: string;
|
||||||
/** Optional logger instance */
|
/** Optional logger instance */
|
||||||
logger?: IRustBridgeLogger;
|
logger?: IRustBridgeLogger;
|
||||||
|
/** Maximum message size in bytes (default: 50MB). Messages exceeding this are rejected. */
|
||||||
|
maxPayloadSize?: number;
|
||||||
|
/** Inactivity timeout for streaming commands in ms (default: same as requestTimeoutMs).
|
||||||
|
* Resets on each chunk received. */
|
||||||
|
streamTimeoutMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Options for connecting to an already-running daemon via Unix socket or named pipe.
|
||||||
|
*/
|
||||||
|
export interface ISocketConnectOptions {
|
||||||
|
/** Enable auto-reconnect on unexpected disconnect (default: false) */
|
||||||
|
autoReconnect?: boolean;
|
||||||
|
/** Initial delay between reconnect attempts in ms (default: 100) */
|
||||||
|
reconnectBaseDelayMs?: number;
|
||||||
|
/** Maximum delay between reconnect attempts in ms (default: 30000) */
|
||||||
|
reconnectMaxDelayMs?: number;
|
||||||
|
/** Maximum number of reconnect attempts before giving up (default: 10) */
|
||||||
|
maxReconnectAttempts?: number;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,2 +1,3 @@
|
|||||||
export * from './ipc.js';
|
export * from './ipc.js';
|
||||||
export * from './config.js';
|
export * from './config.js';
|
||||||
|
export * from './transport.js';
|
||||||
|
|||||||
@@ -38,3 +38,25 @@ export interface ICommandDefinition<TParams = any, TResult = any> {
|
|||||||
* Used to type-safe the bridge's sendCommand method.
|
* Used to type-safe the bridge's sendCommand method.
|
||||||
*/
|
*/
|
||||||
export type TCommandMap = Record<string, ICommandDefinition>;
|
export type TCommandMap = Record<string, ICommandDefinition>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream chunk message received from the Rust binary during a streaming command.
|
||||||
|
*/
|
||||||
|
export interface IManagementStreamChunk {
|
||||||
|
id: string;
|
||||||
|
stream: true;
|
||||||
|
data: any;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract keys from a command map whose definitions include a `chunk` field,
|
||||||
|
* indicating they support streaming responses.
|
||||||
|
*/
|
||||||
|
export type TStreamingCommandKeys<TCommands extends TCommandMap> = {
|
||||||
|
[K in keyof TCommands]: TCommands[K] extends { chunk: any } ? K : never;
|
||||||
|
}[keyof TCommands];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the chunk type from a command definition that has a `chunk` field.
|
||||||
|
*/
|
||||||
|
export type TExtractChunk<TDef> = TDef extends { chunk: infer C } ? C : never;
|
||||||
|
|||||||
26
ts/interfaces/transport.ts
Normal file
26
ts/interfaces/transport.ts
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
import type * as plugins from '../plugins.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract transport for communicating with a Rust process.
|
||||||
|
* Both stdio and socket transports implement this interface.
|
||||||
|
*
|
||||||
|
* Events emitted:
|
||||||
|
* - 'line' (line: string) — a complete newline-terminated JSON line received
|
||||||
|
* - 'error' (err: Error) — transport-level error
|
||||||
|
* - 'close' (...args: any[]) — transport has closed/disconnected
|
||||||
|
* - 'stderr' (line: string) — stderr output (stdio transport only)
|
||||||
|
* - 'reconnected' () — transport reconnected after unexpected disconnect (socket only)
|
||||||
|
*/
|
||||||
|
export interface IRustTransport extends plugins.events.EventEmitter {
|
||||||
|
/** Connect the transport (spawn process or connect to socket). Resolves when I/O channel is open. */
|
||||||
|
connect(): Promise<void>;
|
||||||
|
|
||||||
|
/** Write a string to the transport. Handles backpressure. */
|
||||||
|
write(data: string): Promise<void>;
|
||||||
|
|
||||||
|
/** Disconnect the transport. For stdio: kills the process. For socket: closes the connection. */
|
||||||
|
disconnect(): void;
|
||||||
|
|
||||||
|
/** Whether the transport is currently connected and writable. */
|
||||||
|
readonly connected: boolean;
|
||||||
|
}
|
||||||
@@ -2,10 +2,11 @@
|
|||||||
import * as path from 'path';
|
import * as path from 'path';
|
||||||
import * as fs from 'fs';
|
import * as fs from 'fs';
|
||||||
import * as childProcess from 'child_process';
|
import * as childProcess from 'child_process';
|
||||||
import * as readline from 'readline';
|
|
||||||
import * as events from 'events';
|
import * as events from 'events';
|
||||||
|
import * as url from 'url';
|
||||||
|
import * as net from 'net';
|
||||||
|
|
||||||
export { path, fs, childProcess, readline, events };
|
export { path, fs, childProcess, events, url, net };
|
||||||
|
|
||||||
// @push.rocks scope
|
// @push.rocks scope
|
||||||
import * as smartpath from '@push.rocks/smartpath';
|
import * as smartpath from '@push.rocks/smartpath';
|
||||||
|
|||||||
Reference in New Issue
Block a user