12 Commits

Author SHA1 Message Date
d762c26565 v1.3.0
Some checks failed
Default (tags) / security (push) Successful in 27s
Default (tags) / test (push) Failing after 3m58s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 08:44:28 +00:00
deda8cc4ee feat(transport): introduce transport abstraction and socket-mode support for RustBridge 2026-02-26 08:44:28 +00:00
0c39e157c2 v1.2.1
Some checks failed
Default (tags) / security (push) Successful in 34s
Default (tags) / test (push) Failing after 3m58s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-12 21:25:11 +00:00
b7e3e30ce5 fix(rust-binary-locator): auto-fix missing execute permission for located Rust binaries 2026-02-12 21:25:11 +00:00
35971a395f v1.2.0
Some checks failed
Default (tags) / security (push) Successful in 30s
Default (tags) / test (push) Failing after 3m57s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-11 00:12:56 +00:00
5fb991ff51 feat(rustbridge): add streaming responses and robust large-payload/backpressure handling to RustBridge 2026-02-11 00:12:56 +00:00
dcb88ef4b5 v1.1.2
Some checks failed
Default (tags) / security (push) Successful in 39s
Default (tags) / test (push) Failing after 3m58s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-10 22:21:18 +00:00
6e4947ef7d fix(rust-binary-locator): use import.meta.resolve and url.fileURLToPath to locate bundled Rust binary in ESM environments 2026-02-10 22:21:17 +00:00
38249ea20f v1.1.1
Some checks failed
Default (tags) / security (push) Successful in 22s
Default (tags) / test (push) Failing after 3m57s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-10 09:13:37 +00:00
9d30a283b5 fix(readme): update README with comprehensive documentation, usage examples, API reference, installation instructions, and legal/company information 2026-02-10 09:13:37 +00:00
65f8e98eb3 v1.1.0
Some checks failed
Default (tags) / security (push) Successful in 32s
Default (tags) / test (push) Failing after 3m57s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-10 09:10:18 +00:00
40dec91940 feat(rustbridge): add RustBridge and RustBinaryLocator with typed IPC interfaces, plugins, tests and mock runner; export from index; add npm registries 2026-02-10 09:10:18 +00:00
24 changed files with 2824 additions and 16 deletions

View File

@@ -1,5 +1,60 @@
# Changelog # Changelog
## 2026-02-26 - 1.3.0 - feat(transport)
introduce transport abstraction and socket-mode support for RustBridge
- Add IRustTransport interface and two transport implementations: StdioTransport (spawns child process and uses stdin/stdout) and SocketTransport (connects to Unix socket / Windows named pipe).
- Refactor RustBridge to use a transport abstraction (connectWithTransport) and add connect(socketPath) to attach to an existing daemon via socket.
- Introduce LineScanner: a buffer-based newline scanner used by both transports to handle large/newline-delimited messages and avoid OOMs.
- Add socket connection options (autoReconnect, reconnectBaseDelayMs, reconnectMaxDelayMs, maxReconnectAttempts) and implement auto-reconnect/backoff behavior in SocketTransport.
- Implement backpressure-aware write semantics and proper disconnect/cleanup for both transports; RustBridge.kill() now disconnects the transport instead of directly managing processes.
- Add tests and tooling: socket transport tests, line scanner tests, and a mock-socket-server.mjs helper script for testing socket mode.
- Export new symbols (StdioTransport, SocketTransport, LineScanner) and update plugins to expose net; update interfaces to export transport types.
## 2026-02-12 - 1.2.1 - fix(rust-binary-locator)
auto-fix missing execute permission for located Rust binaries
- If a located binary exists but lacks the execute bit, attempt to chmod it to 0o755 and treat it as executable.
- Logs an info message when the auto-fix is applied: 'Auto-fixed missing execute permission on: <filePath>'.
- Addresses cases where npm/pnpm installs remove the execute permission from bundled binaries.
## 2026-02-11 - 1.2.0 - feat(rustbridge)
add streaming responses and robust large-payload/backpressure handling to RustBridge
- Introduce StreamingResponse type and export it (for-await-of iterator + .result promise)
- Add sendCommandStreaming API to send streaming commands and receive chunks + final result
- Implement buffer-based stdout newline scanner to handle large messages and avoid readline limits
- Add backpressure-aware writeToStdin to wait for drain when writing large outbound payloads
- Add maxPayloadSize option and enforce outbound/inbound size checks to prevent OOMs
- Add streamTimeoutMs (inactivity timeout) and reset timeout on each received chunk
- Improve stderr handling (cross-chunk buffering and trimmed emits)
- Update mock test binary and extensive tests for streaming, large payloads, concurrency, and error cases
- Add TypeScript types for streaming commands (TStreamingCommandKeys, TExtractChunk, IManagementStreamChunk)
## 2026-02-10 - 1.1.2 - fix(rust-binary-locator)
use import.meta.resolve and url.fileURLToPath to locate bundled Rust binary in ESM environments
- Replace require.resolve with import.meta.resolve to support ESM module resolution
- Convert resolved file URL to a filesystem path using url.fileURLToPath
- Export the url module from ts/plugins to provide fileURLToPath
## 2026-02-10 - 1.1.1 - fix(readme)
update README with comprehensive documentation, usage examples, API reference, installation instructions, and legal/company information
- Rewrote readme.md (≈ +298 3 lines) to add detailed install, overview, IPC protocol, command definition examples, usage, API reference, issue reporting & security guidance, and legal/trademark/company information.
- Documentation-only change — no source code modified.
- Current package version is 1.1.0; recommend a patch release
## 2026-02-10 - 1.1.0 - feat(rustbridge)
add RustBridge and RustBinaryLocator with typed IPC interfaces, plugins, tests and mock runner; export from index; add npm registries
- Introduce RustBridge: spawn and manage a child binary, JSON-over-stdin/stdout request/response handling, events, timeouts, pending request tracking, kill/cleanup logic.
- Introduce RustBinaryLocator: multi-strategy binary discovery (explicit path, env var, platform-specific package, local build paths, system PATH) with caching and logger hooks.
- Add IPC and config TypeScript interfaces (IManagementRequest/Response/Event, ICommandDefinition, IBinaryLocatorOptions, IRustBridgeOptions) and re-export via interfaces/index.ts.
- Update ts/plugins.ts to export fs, child_process, readline and events for easier native integration.
- Add tests for RustBridge and RustBinaryLocator plus a test helper mock-rust-binary.mjs to simulate the IPC protocol and exercise commands, events, timeouts and locator behaviors.
- Update ts/index.ts to export RustBridge and RustBinaryLocator and export interfaces; update npmextra.json to include internal Verdaccio registry alongside npmjs.org.
## 2026-02-08 - 1.0.2 - fix() ## 2026-02-08 - 1.0.2 - fix()
no changes no changes

View File

@@ -11,10 +11,14 @@
"projectDomain": "push.rocks" "projectDomain": "push.rocks"
}, },
"release": { "release": {
"accessLevel": "public" "accessLevel": "public",
"registries": [
"https://verdaccio.lossless.digital",
"https://registry.npmjs.org"
]
} }
}, },
"@ship.zone/szci": { "@ship.zone/szci": {
"npmGlobalTools": [] "npmGlobalTools": []
} }
} }

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartrust", "name": "@push.rocks/smartrust",
"version": "1.0.2", "version": "1.3.0",
"private": false, "private": false,
"description": "a bridge between JS engines and rust", "description": "a bridge between JS engines and rust",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

435
readme.md
View File

@@ -1,5 +1,434 @@
# @push.rocks/smartrust # @push.rocks/smartrust
a bridge between JS engines and rust
## How to create the docs A type-safe, production-ready bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC — with support for request/response, streaming, and event patterns.
To create docs run gitzone aidoc.
## Issue Reporting and Security
For reporting bugs, issues, or security vulnerabilities, please visit [community.foss.global/](https://community.foss.global/). This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a [code.foss.global/](https://code.foss.global/) account to submit Pull Requests directly.
## Install 📦
```bash
npm install @push.rocks/smartrust
# or
pnpm install @push.rocks/smartrust
```
## Overview 🔭
`@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
### Why? 🤔
If you're integrating Rust into a Node.js project, you'll inevitably need:
- A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages)
- A way to **spawn** it and establish reliable two-way communication
- **Type-safe** request/response patterns with proper error handling
- **Streaming responses** for progressive data processing, log tailing, or chunked transfers
- **Event streaming** from Rust to TypeScript
- **Graceful lifecycle management** (ready detection, clean shutdown, force kill)
`smartrust` wraps all of this into three classes: `RustBridge`, `RustBinaryLocator`, and `StreamingResponse`.
## Usage 🚀
### The IPC Protocol
`smartrust` uses a simple, newline-delimited JSON protocol over stdin/stdout:
| Direction | Format | Description |
|-----------|--------|-------------|
| **TS → Rust** (Request) | `{"id": "req_1", "method": "start", "params": {...}}` | Command with unique ID |
| **Rust → TS** (Response) | `{"id": "req_1", "success": true, "result": {...}}` | Final response correlated by ID |
| **Rust → TS** (Error) | `{"id": "req_1", "success": false, "error": "msg"}` | Error correlated by ID |
| **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) |
| **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) |
Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. That's it. Stderr is free for logging.
### Defining Your Commands
Start by defining a type map of commands your Rust binary supports:
```typescript
import { RustBridge } from '@push.rocks/smartrust';
// Define your command types
type TMyCommands = {
start: { params: { port: number; host: string }; result: { pid: number } };
stop: { params: {}; result: void };
getMetrics: { params: {}; result: { connections: number; uptime: number } };
reload: { params: { configPath: string }; result: void };
};
```
### Creating and Using the Bridge
```typescript
const bridge = new RustBridge<TMyCommands>({
binaryName: 'my-rust-server',
envVarName: 'MY_SERVER_BINARY', // optional: env var override
platformPackagePrefix: '@myorg/my-server', // optional: platform npm packages
});
// Spawn the binary and wait for it to signal readiness
const ok = await bridge.spawn();
if (!ok) {
console.error('Failed to start Rust binary');
process.exit(1);
}
// Send type-safe commands — params and return types are inferred!
const { pid } = await bridge.sendCommand('start', { port: 8080, host: '0.0.0.0' });
console.log(`Server started with PID ${pid}`);
const metrics = await bridge.sendCommand('getMetrics', {});
console.log(`Active connections: ${metrics.connections}`);
// Listen for events from Rust
bridge.on('management:configChanged', (data) => {
console.log('Config was changed:', data);
});
// Clean shutdown
bridge.kill();
```
### Streaming Commands 🌊
For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output.
#### Defining Streaming Commands
Add a `chunk` field to your command type definition to mark it as streamable:
```typescript
type TMyCommands = {
// Regular command (request → response)
ping: { params: {}; result: { pong: boolean } };
// Streaming command (request → chunks... → final result)
processData: { params: { count: number }; chunk: { index: number; progress: number }; result: { totalProcessed: number } };
tailLogs: { params: { lines: number }; chunk: string; result: { linesRead: number } };
};
```
#### Consuming Streams
```typescript
// Returns a StreamingResponse immediately (does NOT block)
const stream = bridge.sendCommandStreaming('processData', { count: 1000 });
// Consume chunks with for-await-of
for await (const chunk of stream) {
console.log(`Processing item ${chunk.index}, progress: ${chunk.progress}%`);
}
// Get the final result after all chunks are consumed
const result = await stream.result;
console.log(`Done! Processed ${result.totalProcessed} items`);
```
#### Error Handling in Streams
Errors propagate to both the iterator and the `.result` promise:
```typescript
const stream = bridge.sendCommandStreaming('processData', { count: 100 });
try {
for await (const chunk of stream) {
console.log(chunk);
}
} catch (err) {
console.error('Stream failed:', err.message);
}
// .result also rejects on error
try {
await stream.result;
} catch (err) {
console.error('Same error here:', err.message);
}
```
#### Stream Timeout
By default, streaming commands use the same timeout as regular commands (`requestTimeoutMs`). The timeout **resets on each chunk received**, so it acts as an inactivity timeout rather than an absolute timeout. You can configure it separately:
```typescript
const bridge = new RustBridge<TMyCommands>({
binaryName: 'my-server',
requestTimeoutMs: 30000, // regular command timeout: 30s
streamTimeoutMs: 60000, // streaming inactivity timeout: 60s
});
```
#### Implementing Streaming on the Rust Side
Your Rust binary sends stream chunks by writing lines with `"stream": true` before the final response:
```rust
// For each chunk:
println!(r#"{{"id":"{}","stream":true,"data":{{"index":{},"progress":{}}}}}"#, req.id, i, pct);
io::stdout().flush().unwrap();
// When done, send the final response (same as non-streaming):
println!(r#"{{"id":"{}","success":true,"result":{{"totalProcessed":{}}}}}"#, req.id, total);
io::stdout().flush().unwrap();
```
### Binary Locator 🔍
The `RustBinaryLocator` searches for your binary using a priority-ordered strategy:
| Priority | Source | Description |
|----------|--------|-------------|
| 1 | `binaryPath` option | Explicit path — skips all other search |
| 2 | Environment variable | e.g. `MY_SERVER_BINARY=/usr/local/bin/server` |
| 3 | Platform npm package | e.g. `@myorg/my-server-linux-x64/my-rust-server` |
| 4 | Local dev paths | `./rust/target/release/<name>` and `./rust/target/debug/<name>` |
| 5 | System PATH | Standard `$PATH` lookup |
You can also use the locator standalone:
```typescript
import { RustBinaryLocator } from '@push.rocks/smartrust';
const locator = new RustBinaryLocator({
binaryName: 'my-rust-server',
envVarName: 'MY_SERVER_BINARY',
localPaths: ['/opt/myapp/bin/server'], // custom search paths
});
const binaryPath = await locator.findBinary();
// Result is cached — call clearCache() to force re-search
```
### Configuration Reference ⚙️
The `RustBridge` constructor accepts an `IRustBridgeOptions` object:
```typescript
const bridge = new RustBridge<TMyCommands>({
// --- Binary Locator Options ---
binaryName: 'my-server', // required: name of the binary
binaryPath: '/explicit/path/to/binary', // optional: skip search entirely
envVarName: 'MY_SERVER_BINARY', // optional: env var for path override
platformPackagePrefix: '@myorg/my-server', // optional: platform npm package prefix
localPaths: ['./build/server'], // optional: custom local search paths
searchSystemPath: true, // optional: search $PATH (default: true)
// --- Bridge Options ---
cliArgs: ['--management'], // optional: args passed to binary (default: ['--management'])
requestTimeoutMs: 30000, // optional: per-request timeout (default: 30000)
streamTimeoutMs: 30000, // optional: streaming inactivity timeout (default: requestTimeoutMs)
readyTimeoutMs: 10000, // optional: ready event timeout (default: 10000)
maxPayloadSize: 50 * 1024 * 1024, // optional: max message size in bytes (default: 50MB)
env: { RUST_LOG: 'debug' }, // optional: extra env vars for the child process
readyEventName: 'ready', // optional: name of the ready event (default: 'ready')
logger: myLogger, // optional: logger implementing IRustBridgeLogger
});
```
### Events 📡
`RustBridge` extends `EventEmitter` and emits the following events:
| Event | Payload | Description |
|-------|---------|-------------|
| `ready` | — | Bridge connected and binary reported ready |
| `exit` | `(code, signal)` | Rust process exited |
| `stderr` | `string` | A line from the binary's stderr |
| `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) |
### Custom Logger 📝
Plug in your own logger by implementing the `IRustBridgeLogger` interface:
```typescript
import type { IRustBridgeLogger } from '@push.rocks/smartrust';
const logger: IRustBridgeLogger = {
log(level: string, message: string, data?: Record<string, any>) {
console.log(`[${level}] ${message}`, data || '');
},
};
const bridge = new RustBridge<TMyCommands>({
binaryName: 'my-server',
logger,
});
```
### Writing the Rust Side 🦀
Your Rust binary needs to implement a simple protocol:
1. **On startup**, write a ready event to stdout:
```
{"event":"ready","data":{"version":"1.0.0"}}\n
```
2. **Read JSON lines from stdin**, parse each as `{"id": "...", "method": "...", "params": {...}}`
3. **Write JSON responses to stdout**, each as `{"id": "...", "success": true, "result": {...}}\n`
4. **For streaming commands**, write zero or more `{"id": "...", "stream": true, "data": {...}}\n` chunks before the final response
5. **Emit events** anytime by writing `{"event": "name", "data": {...}}\n` to stdout
6. **Use stderr** for logging — it won't interfere with the IPC protocol
Here's a minimal Rust skeleton:
```rust
use serde::{Deserialize, Serialize};
use std::io::{self, BufRead, Write};
#[derive(Deserialize)]
struct Request {
id: String,
method: String,
params: serde_json::Value,
}
#[derive(Serialize)]
struct Response {
id: String,
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
#[derive(Serialize)]
struct StreamChunk {
id: String,
stream: bool,
data: serde_json::Value,
}
fn main() {
// Signal ready
println!(r#"{{"event":"ready","data":{{"version":"1.0.0"}}}}"#);
io::stdout().flush().unwrap();
let stdin = io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let req: Request = serde_json::from_str(&line).unwrap();
match req.method.as_str() {
"ping" => {
let resp = Response {
id: req.id,
success: true,
result: Some(serde_json::json!({"pong": true})),
error: None,
};
println!("{}", serde_json::to_string(&resp).unwrap());
io::stdout().flush().unwrap();
}
"processData" => {
let count = req.params["count"].as_u64().unwrap_or(0);
// Send stream chunks
for i in 0..count {
let chunk = StreamChunk {
id: req.id.clone(),
stream: true,
data: serde_json::json!({"index": i, "progress": ((i+1) * 100 / count)}),
};
println!("{}", serde_json::to_string(&chunk).unwrap());
io::stdout().flush().unwrap();
}
// Send final response
let resp = Response {
id: req.id,
success: true,
result: Some(serde_json::json!({"totalProcessed": count})),
error: None,
};
println!("{}", serde_json::to_string(&resp).unwrap());
io::stdout().flush().unwrap();
}
_ => {
let resp = Response {
id: req.id,
success: false,
result: None,
error: Some(format!("Unknown method: {}", req.method)),
};
println!("{}", serde_json::to_string(&resp).unwrap());
io::stdout().flush().unwrap();
}
}
}
}
```
## API Reference 📖
### `RustBridge<TCommands>`
| Method / Property | Signature | Description |
|---|---|---|
| `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance |
| `spawn()` | `Promise<boolean>` | Spawn the binary and wait for ready; returns `false` on failure |
| `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response |
| `sendCommandStreaming(method, params)` | `StreamingResponse<TChunk, TResult>` | Send a streaming command; returns immediately |
| `kill()` | `void` | SIGTERM the process, reject pending requests, force SIGKILL after 5s |
| `running` | `boolean` | Whether the bridge is currently connected |
### `StreamingResponse<TChunk, TResult>`
| Method / Property | Type | Description |
|---|---|---|
| `[Symbol.asyncIterator]()` | `AsyncIterator<TChunk>` | Enables `for await...of` consumption of chunks |
| `result` | `Promise<TResult>` | Resolves with the final result after stream ends |
### `RustBinaryLocator`
| Method / Property | Signature | Description |
|---|---|---|
| `constructor` | `new RustBinaryLocator(options: IBinaryLocatorOptions, logger?)` | Create a locator instance |
| `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached |
| `clearCache()` | `void` | Clear the cached path to force a fresh search |
### Exported Interfaces & Types
| Interface / Type | Description |
|---|---|
| `IRustBridgeOptions` | Full configuration for `RustBridge` |
| `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` |
| `IRustBridgeLogger` | Logger interface: `{ log(level, message, data?) }` |
| `IManagementRequest` | IPC request shape: `{ id, method, params }` |
| `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` |
| `IManagementEvent` | IPC event shape: `{ event, data }` |
| `IManagementStreamChunk` | IPC stream chunk shape: `{ id, stream: true, data }` |
| `ICommandDefinition` | Single command definition: `{ params, result }` |
| `TCommandMap` | `Record<string, ICommandDefinition>` |
| `TStreamingCommandKeys<T>` | Extracts keys from a command map that have a `chunk` field |
| `TExtractChunk<T>` | Extracts the chunk type from a streaming command definition |
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
### Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
### Company Information
Task Venture Capital GmbH
Registered at District Court Bremen HRB 35230 HB, Germany
For any legal inquiries or further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

104
test/helpers/mock-rust-binary.mjs Executable file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env node
/**
* Mock "Rust binary" for testing the RustBridge IPC protocol.
* Reads JSON lines from stdin via Buffer-based scanner, writes JSON lines to stdout.
* Emits a ready event on startup.
*/
// Emit ready event
const readyEvent = JSON.stringify({ event: 'ready', data: { version: '1.0.0' } });
process.stdout.write(readyEvent + '\n');
// Buffer-based newline scanner for stdin (mirrors the RustBridge approach)
let stdinBuffer = Buffer.alloc(0);
process.stdin.on('data', (chunk) => {
stdinBuffer = Buffer.concat([stdinBuffer, chunk]);
let newlineIndex;
while ((newlineIndex = stdinBuffer.indexOf(0x0A)) !== -1) {
const lineBuffer = stdinBuffer.subarray(0, newlineIndex);
stdinBuffer = stdinBuffer.subarray(newlineIndex + 1);
const line = lineBuffer.toString('utf8').trim();
if (line) {
handleLine(line);
}
}
});
/**
* Backpressure-aware write to stdout.
*/
function writeResponse(data) {
const json = JSON.stringify(data) + '\n';
if (!process.stdout.write(json)) {
// Wait for drain before continuing
process.stdout.once('drain', () => {});
}
}
function handleLine(line) {
let request;
try {
request = JSON.parse(line);
} catch {
return;
}
const { id, method, params } = request;
if (method === 'echo') {
// Echo back the params as result
writeResponse({ id, success: true, result: params });
} else if (method === 'largeEcho') {
// Echo back params (same as echo, named distinctly for large payload tests)
writeResponse({ id, success: true, result: params });
} else if (method === 'error') {
// Return an error
writeResponse({ id, success: false, error: 'Test error message' });
} else if (method === 'emitEvent') {
// Emit a custom event, then respond with success
writeResponse({ event: params.eventName, data: params.eventData });
writeResponse({ id, success: true, result: null });
} else if (method === 'slow') {
// Respond after a delay
setTimeout(() => {
writeResponse({ id, success: true, result: { delayed: true } });
}, 100);
} else if (method === 'streamEcho') {
// Send params.count stream chunks, then final response
const count = params.count || 0;
let sent = 0;
const interval = setInterval(() => {
if (sent < count) {
writeResponse({ id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
sent++;
} else {
clearInterval(interval);
writeResponse({ id, success: true, result: { totalChunks: count } });
}
}, 10);
} else if (method === 'streamError') {
// Send 1 chunk, then error
writeResponse({ id, stream: true, data: { index: 0, value: 'before_error' } });
setTimeout(() => {
writeResponse({ id, success: false, error: 'Stream error after chunk' });
}, 20);
} else if (method === 'streamEmpty') {
// Zero chunks, immediate final response
writeResponse({ id, success: true, result: { totalChunks: 0 } });
} else if (method === 'exit') {
// Graceful exit
writeResponse({ id, success: true, result: null });
process.exit(0);
} else {
// Unknown command
writeResponse({ id, success: false, error: `Unknown method: ${method}` });
}
}
// Handle SIGTERM gracefully
process.on('SIGTERM', () => {
process.exit(0);
});

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env node
/**
* Mock "Rust daemon" for testing the SocketTransport and RustBridge socket mode.
* Creates a Unix socket server, accepts connections, and speaks the same
* JSON-over-newline IPC protocol as mock-rust-binary.mjs.
*
* Usage: node mock-socket-server.mjs <socket-path>
* Signals readiness by writing a JSON line to stdout: {"socketPath":"...","ready":true}
*/
import * as net from 'net';
import * as fs from 'fs';
const socketPath = process.argv[2];
if (!socketPath) {
process.stderr.write('Usage: mock-socket-server.mjs <socket-path>\n');
process.exit(1);
}
// Remove stale socket file
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
/**
* Backpressure-aware write to a socket.
*/
function writeResponse(conn, data) {
const json = JSON.stringify(data) + '\n';
if (!conn.write(json)) {
conn.once('drain', () => {});
}
}
function handleLine(line, conn) {
let request;
try {
request = JSON.parse(line);
} catch {
return;
}
const { id, method, params } = request;
if (method === 'echo') {
writeResponse(conn, { id, success: true, result: params });
} else if (method === 'largeEcho') {
writeResponse(conn, { id, success: true, result: params });
} else if (method === 'error') {
writeResponse(conn, { id, success: false, error: 'Test error message' });
} else if (method === 'emitEvent') {
writeResponse(conn, { event: params.eventName, data: params.eventData });
writeResponse(conn, { id, success: true, result: null });
} else if (method === 'slow') {
setTimeout(() => {
writeResponse(conn, { id, success: true, result: { delayed: true } });
}, 100);
} else if (method === 'streamEcho') {
const count = params.count || 0;
let sent = 0;
const interval = setInterval(() => {
if (sent < count) {
writeResponse(conn, { id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
sent++;
} else {
clearInterval(interval);
writeResponse(conn, { id, success: true, result: { totalChunks: count } });
}
}, 10);
} else if (method === 'streamError') {
writeResponse(conn, { id, stream: true, data: { index: 0, value: 'before_error' } });
setTimeout(() => {
writeResponse(conn, { id, success: false, error: 'Stream error after chunk' });
}, 20);
} else if (method === 'streamEmpty') {
writeResponse(conn, { id, success: true, result: { totalChunks: 0 } });
} else if (method === 'exit') {
writeResponse(conn, { id, success: true, result: null });
// In socket mode, 'exit' just closes this connection, not the server
setTimeout(() => conn.end(), 50);
} else {
writeResponse(conn, { id, success: false, error: `Unknown method: ${method}` });
}
}
const server = net.createServer((conn) => {
// Send ready event on each new connection
writeResponse(conn, { event: 'ready', data: { version: '1.0.0' } });
// Buffer-based newline scanner for incoming data
let buffer = Buffer.alloc(0);
conn.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
let idx;
while ((idx = buffer.indexOf(0x0A)) !== -1) {
const lineBuffer = buffer.subarray(0, idx);
buffer = buffer.subarray(idx + 1);
const line = lineBuffer.toString('utf8').trim();
if (line) {
handleLine(line, conn);
}
}
});
conn.on('error', () => { /* ignore client errors */ });
});
server.listen(socketPath, () => {
// Signal to parent that the server is ready
process.stdout.write(JSON.stringify({ socketPath, ready: true }) + '\n');
});
// Handle SIGTERM gracefully
process.on('SIGTERM', () => {
server.close();
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
process.exit(0);
});
// Handle SIGINT
process.on('SIGINT', () => {
server.close();
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
process.exit(0);
});

View File

@@ -0,0 +1,89 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { LineScanner } from '../ts/classes.linescanner.js';
const noopLogger = { log() {} };
tap.test('should parse a single complete line', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('{"hello":"world"}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"hello":"world"}');
});
tap.test('should parse multiple lines in one chunk', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('{"a":1}\n{"b":2}\n{"c":3}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(3);
expect(lines[0]).toEqual('{"a":1}');
expect(lines[1]).toEqual('{"b":2}');
expect(lines[2]).toEqual('{"c":3}');
});
tap.test('should handle a line split across chunks', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('{"hel'), (line) => lines.push(line));
expect(lines.length).toEqual(0);
scanner.push(Buffer.from('lo":"world"}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"hello":"world"}');
});
tap.test('should drop oversized lines', async () => {
const scanner = new LineScanner(100, noopLogger);
const lines: string[] = [];
// Line is 200 chars + newline, exceeds maxPayloadSize of 100
const oversized = 'x'.repeat(200) + '\n';
scanner.push(Buffer.from(oversized), (line) => lines.push(line));
expect(lines.length).toEqual(0);
});
tap.test('should clear buffer on OOM (no newline, exceeds max)', async () => {
const scanner = new LineScanner(100, noopLogger);
const lines: string[] = [];
// Push 200 bytes without any newline — exceeds maxPayloadSize
scanner.push(Buffer.from('x'.repeat(200)), (line) => lines.push(line));
expect(lines.length).toEqual(0);
// After clearing, should work normally again
scanner.push(Buffer.from('{"ok":true}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"ok":true}');
});
tap.test('should skip empty lines', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('\n\n{"a":1}\n\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"a":1}');
});
tap.test('should handle mixed complete and partial lines', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
// First chunk: one complete line + start of second line
scanner.push(Buffer.from('{"first":1}\n{"sec'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
// Second chunk: end of second line + complete third line
scanner.push(Buffer.from('ond":2}\n{"third":3}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(3);
expect(lines[1]).toEqual('{"second":2}');
expect(lines[2]).toEqual('{"third":3}');
});
tap.test('clear should reset the buffer', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
// Push partial data
scanner.push(Buffer.from('{"partial":'), (line) => lines.push(line));
// Clear
scanner.clear();
// Now push a complete new line — old partial should not affect it
scanner.push(Buffer.from('{"fresh":true}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"fresh":true}');
});
export default tap.start();

View File

@@ -0,0 +1,98 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as path from 'path';
import * as fs from 'fs';
import { RustBinaryLocator } from '../ts/classes.rustbinarylocator.js';
const testDir = path.resolve(path.dirname(new URL(import.meta.url).pathname));
tap.test('should return null when no binary is found', async () => {
const locator = new RustBinaryLocator({
binaryName: 'nonexistent-binary-xyz',
searchSystemPath: false,
});
const result = await locator.findBinary();
expect(result).toBeNull();
});
tap.test('should use explicit binaryPath when provided', async () => {
const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
const locator = new RustBinaryLocator({
binaryName: 'mock-rust-binary',
binaryPath: mockBinaryPath,
searchSystemPath: false,
});
const result = await locator.findBinary();
expect(result).toEqual(mockBinaryPath);
});
tap.test('should cache the result', async () => {
const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
const locator = new RustBinaryLocator({
binaryName: 'mock-rust-binary',
binaryPath: mockBinaryPath,
searchSystemPath: false,
});
const first = await locator.findBinary();
const second = await locator.findBinary();
expect(first).toEqual(second);
expect(first).toEqual(mockBinaryPath);
});
tap.test('should clear cache', async () => {
const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
const locator = new RustBinaryLocator({
binaryName: 'mock-rust-binary',
binaryPath: mockBinaryPath,
searchSystemPath: false,
});
const first = await locator.findBinary();
expect(first).toEqual(mockBinaryPath);
locator.clearCache();
// After clearing, next call should re-search and still find it
const second = await locator.findBinary();
expect(second).toEqual(mockBinaryPath);
});
tap.test('should fall back to env var when binaryPath not set', async () => {
const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
const envVar = 'TEST_SMARTRUST_BINARY_' + Date.now();
process.env[envVar] = mockBinaryPath;
const locator = new RustBinaryLocator({
binaryName: 'mock-rust-binary',
envVarName: envVar,
searchSystemPath: false,
});
const result = await locator.findBinary();
expect(result).toEqual(mockBinaryPath);
delete process.env[envVar];
});
tap.test('should find binary in local paths', async () => {
const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
const locator = new RustBinaryLocator({
binaryName: 'mock-rust-binary',
localPaths: ['/nonexistent/path/binary', mockBinaryPath],
searchSystemPath: false,
});
const result = await locator.findBinary();
expect(result).toEqual(mockBinaryPath);
});
tap.test('should find node in system PATH', async () => {
const locator = new RustBinaryLocator({
binaryName: 'node',
searchSystemPath: true,
});
const result = await locator.findBinary();
expect(result).not.toBeNull();
});
export default tap.start();

View File

@@ -0,0 +1,440 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as path from 'path';
import { RustBridge } from '../ts/classes.rustbridge.js';
import type { ICommandDefinition } from '../ts/interfaces/index.js';
const testDir = path.resolve(path.dirname(new URL(import.meta.url).pathname));
const mockBinaryPath = path.join(testDir, 'helpers/mock-rust-binary.mjs');
// Define the command types for our mock binary
type TMockCommands = {
echo: { params: Record<string, any>; result: Record<string, any> };
largeEcho: { params: Record<string, any>; result: Record<string, any> };
error: { params: {}; result: never };
emitEvent: { params: { eventName: string; eventData: any }; result: null };
slow: { params: {}; result: { delayed: boolean } };
exit: { params: {}; result: null };
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
};
tap.test('should spawn and receive ready event', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
const result = await bridge.spawn();
expect(result).toBeTrue();
expect(bridge.running).toBeTrue();
bridge.kill();
expect(bridge.running).toBeFalse();
});
tap.test('should send command and receive response', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
await bridge.spawn();
const result = await bridge.sendCommand('echo', { hello: 'world', num: 42 });
expect(result).toEqual({ hello: 'world', num: 42 });
bridge.kill();
});
tap.test('should handle error responses', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
await bridge.spawn();
let threw = false;
try {
await bridge.sendCommand('error', {});
} catch (err: any) {
threw = true;
expect(err.message).toInclude('Test error message');
}
expect(threw).toBeTrue();
bridge.kill();
});
tap.test('should receive custom events from the binary', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
await bridge.spawn();
const eventPromise = new Promise<any>((resolve) => {
bridge.once('management:testEvent', (data) => {
resolve(data);
});
});
await bridge.sendCommand('emitEvent', {
eventName: 'testEvent',
eventData: { key: 'value' },
});
const eventData = await eventPromise;
expect(eventData).toEqual({ key: 'value' });
bridge.kill();
});
tap.test('should handle delayed responses', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 5000,
});
await bridge.spawn();
const result = await bridge.sendCommand('slow', {});
expect(result).toEqual({ delayed: true });
bridge.kill();
});
tap.test('should handle multiple concurrent commands', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
await bridge.spawn();
const results = await Promise.all([
bridge.sendCommand('echo', { id: 1 }),
bridge.sendCommand('echo', { id: 2 }),
bridge.sendCommand('echo', { id: 3 }),
]);
expect(results[0]).toEqual({ id: 1 });
expect(results[1]).toEqual({ id: 2 });
expect(results[2]).toEqual({ id: 3 });
bridge.kill();
});
tap.test('should throw when sending command while not running', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
});
let threw = false;
try {
await bridge.sendCommand('echo', {});
} catch (err: any) {
threw = true;
expect(err.message).toInclude('not running');
}
expect(threw).toBeTrue();
});
tap.test('should return false when binary not found', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'nonexistent-binary-xyz',
searchSystemPath: false,
});
const result = await bridge.spawn();
expect(result).toBeFalse();
expect(bridge.running).toBeFalse();
});
tap.test('should emit exit event when process exits', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
await bridge.spawn();
const exitPromise = new Promise<number | null>((resolve) => {
bridge.once('exit', (code) => {
resolve(code);
});
});
// Tell mock binary to exit
await bridge.sendCommand('exit', {});
const exitCode = await exitPromise;
expect(exitCode).toEqual(0);
expect(bridge.running).toBeFalse();
});
tap.test('should handle 1MB payload round-trip', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 30000,
});
await bridge.spawn();
// Create a ~1MB payload
const largeString = 'x'.repeat(1024 * 1024);
const result = await bridge.sendCommand('largeEcho', { data: largeString });
expect(result.data).toEqual(largeString);
expect(result.data.length).toEqual(1024 * 1024);
bridge.kill();
});
tap.test('should handle 10MB payload round-trip', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 60000,
});
await bridge.spawn();
// Create a ~10MB payload
const largeString = 'y'.repeat(10 * 1024 * 1024);
const result = await bridge.sendCommand('largeEcho', { data: largeString });
expect(result.data).toEqual(largeString);
expect(result.data.length).toEqual(10 * 1024 * 1024);
bridge.kill();
});
tap.test('should reject outbound messages exceeding maxPayloadSize', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
maxPayloadSize: 1000,
});
await bridge.spawn();
let threw = false;
try {
await bridge.sendCommand('largeEcho', { data: 'z'.repeat(2000) });
} catch (err: any) {
threw = true;
expect(err.message).toInclude('maxPayloadSize');
}
expect(threw).toBeTrue();
bridge.kill();
});
tap.test('should handle multiple large concurrent commands', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 30000,
});
await bridge.spawn();
const size = 500 * 1024; // 500KB each
const results = await Promise.all([
bridge.sendCommand('largeEcho', { data: 'a'.repeat(size), id: 1 }),
bridge.sendCommand('largeEcho', { data: 'b'.repeat(size), id: 2 }),
bridge.sendCommand('largeEcho', { data: 'c'.repeat(size), id: 3 }),
]);
expect(results[0].data.length).toEqual(size);
expect(results[0].data[0]).toEqual('a');
expect(results[1].data.length).toEqual(size);
expect(results[1].data[0]).toEqual('b');
expect(results[2].data.length).toEqual(size);
expect(results[2].data[0]).toEqual('c');
bridge.kill();
});
// === Streaming tests ===
tap.test('streaming: should receive chunks via for-await-of and final result', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 10000,
});
await bridge.spawn();
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
const chunks: Array<{ index: number; value: string }> = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
expect(chunks.length).toEqual(5);
for (let i = 0; i < 5; i++) {
expect(chunks[i].index).toEqual(i);
expect(chunks[i].value).toEqual(`chunk_${i}`);
}
const result = await stream.result;
expect(result.totalChunks).toEqual(5);
bridge.kill();
});
tap.test('streaming: should handle zero chunks (immediate result)', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
});
await bridge.spawn();
const stream = bridge.sendCommandStreaming('streamEmpty', {});
const chunks: any[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
expect(chunks.length).toEqual(0);
const result = await stream.result;
expect(result.totalChunks).toEqual(0);
bridge.kill();
});
tap.test('streaming: should propagate error to iterator and .result', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 10000,
});
await bridge.spawn();
const stream = bridge.sendCommandStreaming('streamError', {});
const chunks: any[] = [];
let iteratorError: Error | null = null;
try {
for await (const chunk of stream) {
chunks.push(chunk);
}
} catch (err: any) {
iteratorError = err;
}
// Should have received at least one chunk before error
expect(chunks.length).toEqual(1);
expect(chunks[0].value).toEqual('before_error');
// Iterator should have thrown
expect(iteratorError).toBeTruthy();
expect(iteratorError!.message).toInclude('Stream error after chunk');
// .result should also reject
let resultError: Error | null = null;
try {
await stream.result;
} catch (err: any) {
resultError = err;
}
expect(resultError).toBeTruthy();
expect(resultError!.message).toInclude('Stream error after chunk');
bridge.kill();
});
tap.test('streaming: should fail when bridge is not running', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
});
const stream = bridge.sendCommandStreaming('streamEcho', { count: 3 });
let resultError: Error | null = null;
try {
await stream.result;
} catch (err: any) {
resultError = err;
}
expect(resultError).toBeTruthy();
expect(resultError!.message).toInclude('not running');
});
tap.test('streaming: should fail when killed mid-stream', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'node',
binaryPath: 'node',
cliArgs: [mockBinaryPath],
readyTimeoutMs: 5000,
requestTimeoutMs: 30000,
});
await bridge.spawn();
// Request many chunks so we can kill mid-stream
const stream = bridge.sendCommandStreaming('streamEcho', { count: 100 });
const chunks: any[] = [];
let iteratorError: Error | null = null;
// Kill after a short delay
setTimeout(() => {
bridge.kill();
}, 50);
try {
for await (const chunk of stream) {
chunks.push(chunk);
}
} catch (err: any) {
iteratorError = err;
}
// Should have gotten some chunks but not all
expect(iteratorError).toBeTruthy();
expect(iteratorError!.message).toInclude('killed');
});
export default tap.start();

View File

@@ -0,0 +1,312 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as path from 'path';
import * as fs from 'fs';
import * as childProcess from 'child_process';
import * as os from 'os';
import { RustBridge } from '../ts/classes.rustbridge.js';
import type { ICommandDefinition } from '../ts/interfaces/index.js';
const testDir = path.resolve(path.dirname(new URL(import.meta.url).pathname));
const mockServerPath = path.join(testDir, 'helpers/mock-socket-server.mjs');
// Define the command types for our mock binary
type TMockCommands = {
echo: { params: Record<string, any>; result: Record<string, any> };
largeEcho: { params: Record<string, any>; result: Record<string, any> };
error: { params: {}; result: never };
emitEvent: { params: { eventName: string; eventData: any }; result: null };
slow: { params: {}; result: { delayed: boolean } };
exit: { params: {}; result: null };
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
};
/**
* Start the mock socket server and return the socket path.
* Returns { proc, socketPath }.
*/
async function startMockServer(testName: string): Promise<{ proc: childProcess.ChildProcess; socketPath: string }> {
const socketPath = path.join(os.tmpdir(), `smartrust-test-${Date.now()}-${testName}.sock`);
const proc = childProcess.spawn('node', [mockServerPath, socketPath], {
stdio: ['pipe', 'pipe', 'pipe'],
});
// Wait for the server to signal readiness via stdout
return new Promise((resolve, reject) => {
let stdoutData = '';
const timeout = setTimeout(() => {
proc.kill();
reject(new Error('Mock server did not start within 5s'));
}, 5000);
proc.stdout!.on('data', (data: Buffer) => {
stdoutData += data.toString();
const lines = stdoutData.split('\n');
for (const line of lines) {
if (line.trim()) {
try {
const parsed = JSON.parse(line.trim());
if (parsed.ready) {
clearTimeout(timeout);
resolve({ proc, socketPath: parsed.socketPath });
return;
}
} catch { /* not JSON yet */ }
}
}
});
proc.on('error', (err) => {
clearTimeout(timeout);
reject(err);
});
proc.on('exit', (code) => {
clearTimeout(timeout);
reject(new Error(`Mock server exited with code ${code}`));
});
});
}
function stopMockServer(proc: childProcess.ChildProcess, socketPath: string) {
try { proc.kill('SIGTERM'); } catch { /* ignore */ }
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
}
// === Socket Transport Tests via RustBridge.connect() ===
tap.test('socket: should connect and receive ready event', async () => {
const { proc, socketPath } = await startMockServer('connect-ready');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
const result = await bridge.connect(socketPath);
expect(result).toBeTrue();
expect(bridge.running).toBeTrue();
bridge.kill();
expect(bridge.running).toBeFalse();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should send command and receive response', async () => {
const { proc, socketPath } = await startMockServer('send-command');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
const result = await bridge.sendCommand('echo', { hello: 'world', num: 42 });
expect(result).toEqual({ hello: 'world', num: 42 });
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should handle error responses', async () => {
const { proc, socketPath } = await startMockServer('error-response');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
let threw = false;
try {
await bridge.sendCommand('error', {});
} catch (err: any) {
threw = true;
expect(err.message).toInclude('Test error message');
}
expect(threw).toBeTrue();
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should receive custom events', async () => {
const { proc, socketPath } = await startMockServer('custom-events');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
const eventPromise = new Promise<any>((resolve) => {
bridge.once('management:testEvent', (data) => resolve(data));
});
await bridge.sendCommand('emitEvent', {
eventName: 'testEvent',
eventData: { key: 'value' },
});
const eventData = await eventPromise;
expect(eventData).toEqual({ key: 'value' });
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should handle multiple concurrent commands', async () => {
const { proc, socketPath } = await startMockServer('concurrent');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
const results = await Promise.all([
bridge.sendCommand('echo', { id: 1 }),
bridge.sendCommand('echo', { id: 2 }),
bridge.sendCommand('echo', { id: 3 }),
]);
expect(results[0]).toEqual({ id: 1 });
expect(results[1]).toEqual({ id: 2 });
expect(results[2]).toEqual({ id: 3 });
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should handle 1MB payload round-trip', async () => {
const { proc, socketPath } = await startMockServer('large-payload');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
requestTimeoutMs: 30000,
});
await bridge.connect(socketPath);
const largeString = 'x'.repeat(1024 * 1024);
const result = await bridge.sendCommand('largeEcho', { data: largeString });
expect(result.data).toEqual(largeString);
expect(result.data.length).toEqual(1024 * 1024);
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should disconnect without killing the daemon', async () => {
const { proc, socketPath } = await startMockServer('no-kill-daemon');
try {
// First connection
const bridge1 = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge1.connect(socketPath);
expect(bridge1.running).toBeTrue();
// Disconnect
bridge1.kill();
expect(bridge1.running).toBeFalse();
// Second connection — daemon should still be alive
const bridge2 = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
const result = await bridge2.connect(socketPath);
expect(result).toBeTrue();
expect(bridge2.running).toBeTrue();
// Verify the daemon is functional
const echoResult = await bridge2.sendCommand('echo', { reconnected: true });
expect(echoResult).toEqual({ reconnected: true });
bridge2.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should stream responses via socket', async () => {
const { proc, socketPath } = await startMockServer('streaming');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
requestTimeoutMs: 10000,
});
await bridge.connect(socketPath);
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
const chunks: Array<{ index: number; value: string }> = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
expect(chunks.length).toEqual(5);
for (let i = 0; i < 5; i++) {
expect(chunks[i].index).toEqual(i);
expect(chunks[i].value).toEqual(`chunk_${i}`);
}
const result = await stream.result;
expect(result.totalChunks).toEqual(5);
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should return false when socket path does not exist', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 3000,
});
const result = await bridge.connect('/tmp/nonexistent-smartrust-test.sock');
expect(result).toBeFalse();
expect(bridge.running).toBeFalse();
});
tap.test('socket: should throw when sending command while not connected', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
});
let threw = false;
try {
await bridge.sendCommand('echo', {});
} catch (err: any) {
threw = true;
expect(err.message).toInclude('not running');
}
expect(threw).toBeTrue();
});
export default tap.start();

View File

@@ -1,8 +1,12 @@
import { expect, tap } from '@git.zone/tstest/tapbundle'; import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartrust from '../ts/index.js' import * as smartrust from '../ts/index.js';
tap.test('first test', async () => { tap.test('should export RustBridge', async () => {
console.log(smartrust) expect(smartrust.RustBridge).toBeTypeOf('function');
}) });
export default tap.start() tap.test('should export RustBinaryLocator', async () => {
expect(smartrust.RustBinaryLocator).toBeTypeOf('function');
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartrust', name: '@push.rocks/smartrust',
version: '1.0.2', version: '1.3.0',
description: 'a bridge between JS engines and rust' description: 'a bridge between JS engines and rust'
} }

51
ts/classes.linescanner.ts Normal file
View File

@@ -0,0 +1,51 @@
import type { IRustBridgeLogger } from './interfaces/index.js';
/**
* Buffer-based newline scanner for streaming binary data.
* Accumulates chunks and emits complete lines via callback.
* Used by both StdioTransport and SocketTransport.
*/
export class LineScanner {
private buffer: Buffer = Buffer.alloc(0);
private maxPayloadSize: number;
private logger: IRustBridgeLogger;
constructor(maxPayloadSize: number, logger: IRustBridgeLogger) {
this.maxPayloadSize = maxPayloadSize;
this.logger = logger;
}
/**
* Feed a chunk of data. Calls onLine for each complete newline-terminated line found.
*/
public push(chunk: Buffer, onLine: (line: string) => void): void {
this.buffer = Buffer.concat([this.buffer, chunk]);
let newlineIndex: number;
while ((newlineIndex = this.buffer.indexOf(0x0A)) !== -1) {
const lineBuffer = this.buffer.subarray(0, newlineIndex);
this.buffer = this.buffer.subarray(newlineIndex + 1);
if (lineBuffer.length > this.maxPayloadSize) {
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
continue;
}
const line = lineBuffer.toString('utf8').trim();
if (line) {
onLine(line);
}
}
// Prevent OOM if sender never sends newline
if (this.buffer.length > this.maxPayloadSize) {
this.logger.log('error', `Buffer exceeded maxPayloadSize (${this.buffer.length} bytes) without newline, clearing`);
this.buffer = Buffer.alloc(0);
}
}
/** Reset the internal buffer. */
public clear(): void {
this.buffer = Buffer.alloc(0);
}
}

View File

@@ -0,0 +1,150 @@
import * as plugins from './plugins.js';
import type { IBinaryLocatorOptions, IRustBridgeLogger } from './interfaces/index.js';
const defaultLogger: IRustBridgeLogger = {
log() {},
};
/**
* Locates a Rust binary using a priority-ordered search strategy:
* 1. Explicit binaryPath override
* 2. Environment variable
* 3. Platform-specific npm package
* 4. Local development build paths
* 5. System PATH
*/
export class RustBinaryLocator {
private options: IBinaryLocatorOptions;
private logger: IRustBridgeLogger;
private cachedPath: string | null = null;
constructor(options: IBinaryLocatorOptions, logger?: IRustBridgeLogger) {
this.options = options;
this.logger = logger || defaultLogger;
}
/**
* Find the binary path.
* Returns null if no binary is available.
*/
public async findBinary(): Promise<string | null> {
if (this.cachedPath !== null) {
return this.cachedPath;
}
const path = await this.searchBinary();
this.cachedPath = path;
return path;
}
/**
* Clear the cached binary path.
*/
public clearCache(): void {
this.cachedPath = null;
}
private async searchBinary(): Promise<string | null> {
const { binaryName } = this.options;
// 1. Explicit binary path override
if (this.options.binaryPath) {
if (await this.isExecutable(this.options.binaryPath)) {
this.logger.log('info', `Binary found via explicit path: ${this.options.binaryPath}`);
return this.options.binaryPath;
}
this.logger.log('warn', `Explicit binary path not executable: ${this.options.binaryPath}`);
}
// 2. Environment variable override
if (this.options.envVarName) {
const envPath = process.env[this.options.envVarName];
if (envPath) {
if (await this.isExecutable(envPath)) {
this.logger.log('info', `Binary found via ${this.options.envVarName}: ${envPath}`);
return envPath;
}
this.logger.log('warn', `${this.options.envVarName} set but not executable: ${envPath}`);
}
}
// 3. Platform-specific npm package
if (this.options.platformPackagePrefix) {
const platformBinary = await this.findPlatformPackageBinary();
if (platformBinary) {
this.logger.log('info', `Binary found in platform package: ${platformBinary}`);
return platformBinary;
}
}
// 4. Local development build paths
const localPaths = this.options.localPaths || [
plugins.path.resolve(process.cwd(), `rust/target/release/${binaryName}`),
plugins.path.resolve(process.cwd(), `rust/target/debug/${binaryName}`),
];
for (const localPath of localPaths) {
if (await this.isExecutable(localPath)) {
this.logger.log('info', `Binary found at local path: ${localPath}`);
return localPath;
}
}
// 5. System PATH
if (this.options.searchSystemPath !== false) {
const systemPath = await this.findInPath(binaryName);
if (systemPath) {
this.logger.log('info', `Binary found in system PATH: ${systemPath}`);
return systemPath;
}
}
this.logger.log('error', `No binary '${binaryName}' found. Provide an explicit path, set an env var, install the platform package, or build from source.`);
return null;
}
private async findPlatformPackageBinary(): Promise<string | null> {
const { binaryName, platformPackagePrefix } = this.options;
const platform = process.platform;
const arch = process.arch;
const packageName = `${platformPackagePrefix}-${platform}-${arch}`;
try {
const resolved = import.meta.resolve(`${packageName}/${binaryName}`);
const packagePath = plugins.url.fileURLToPath(resolved);
if (await this.isExecutable(packagePath)) {
return packagePath;
}
} catch {
// Package not installed - expected for development
}
return null;
}
private async isExecutable(filePath: string): Promise<boolean> {
try {
await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK);
return true;
} catch {
// File may exist but lack execute bit (common after npm/pnpm install).
// Try to make it executable.
try {
await plugins.fs.promises.access(filePath, plugins.fs.constants.F_OK);
await plugins.fs.promises.chmod(filePath, 0o755);
this.logger.log('info', `Auto-fixed missing execute permission on: ${filePath}`);
return true;
} catch {
return false;
}
}
}
private async findInPath(binaryName: string): Promise<string | null> {
const pathDirs = (process.env.PATH || '').split(plugins.path.delimiter);
for (const dir of pathDirs) {
const fullPath = plugins.path.join(dir, binaryName);
if (await this.isExecutable(fullPath)) {
return fullPath;
}
}
return null;
}
}

341
ts/classes.rustbridge.ts Normal file
View File

@@ -0,0 +1,341 @@
import * as plugins from './plugins.js';
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
import { StreamingResponse } from './classes.streamingresponse.js';
import { StdioTransport } from './classes.stdiotransport.js';
import { SocketTransport } from './classes.sockettransport.js';
import type {
IRustBridgeOptions,
IRustBridgeLogger,
ISocketConnectOptions,
TCommandMap,
IManagementRequest,
IManagementResponse,
IManagementEvent,
TStreamingCommandKeys,
TExtractChunk,
IRustTransport,
} from './interfaces/index.js';
const defaultLogger: IRustBridgeLogger = {
log() {},
};
/**
* Generic bridge between TypeScript and a Rust binary.
* Communicates via JSON-over-stdin/stdout IPC protocol (stdio mode)
* or JSON-over-Unix-socket/named-pipe (socket mode).
*
* @typeParam TCommands - Map of command names to their param/result types
*/
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
private locator: RustBinaryLocator;
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
private logger: IRustBridgeLogger;
private transport: IRustTransport | null = null;
private pendingRequests = new Map<string, {
resolve: (value: any) => void;
reject: (error: Error) => void;
timer: ReturnType<typeof setTimeout>;
streaming?: StreamingResponse<any, any>;
}>();
private requestCounter = 0;
private isRunning = false;
private binaryPath: string | null = null;
constructor(options: IRustBridgeOptions) {
super();
this.logger = options.logger || defaultLogger;
this.options = {
cliArgs: ['--management'],
requestTimeoutMs: 30000,
readyTimeoutMs: 10000,
readyEventName: 'ready',
maxPayloadSize: 50 * 1024 * 1024,
...options,
};
this.locator = new RustBinaryLocator(options, this.logger);
}
/**
* Spawn the Rust binary and wait for it to signal readiness.
* Returns true if the binary was found and spawned successfully.
*/
public async spawn(): Promise<boolean> {
this.binaryPath = await this.locator.findBinary();
if (!this.binaryPath) {
return false;
}
const transport = new StdioTransport({
binaryPath: this.binaryPath,
cliArgs: this.options.cliArgs,
env: this.options.env,
maxPayloadSize: this.options.maxPayloadSize,
logger: this.logger,
});
return this.connectWithTransport(transport);
}
/**
* Connect to an already-running Rust daemon via Unix socket or named pipe.
* Returns true if the connection was established and the daemon signaled readiness.
*
* @param socketPath - Path to Unix socket or Windows named pipe
* @param socketOptions - Optional socket connection options (reconnect, etc.)
*/
public async connect(socketPath: string, socketOptions?: ISocketConnectOptions): Promise<boolean> {
const transport = new SocketTransport({
socketPath,
maxPayloadSize: this.options.maxPayloadSize,
logger: this.logger,
autoReconnect: socketOptions?.autoReconnect,
reconnectBaseDelayMs: socketOptions?.reconnectBaseDelayMs,
reconnectMaxDelayMs: socketOptions?.reconnectMaxDelayMs,
maxReconnectAttempts: socketOptions?.maxReconnectAttempts,
});
return this.connectWithTransport(transport);
}
/**
* Internal: wire up any transport and wait for the ready handshake.
*/
private connectWithTransport(transport: IRustTransport): Promise<boolean> {
return new Promise<boolean>((resolve) => {
try {
this.transport = transport;
// Wire transport events
transport.on('line', (line: string) => this.handleLine(line));
transport.on('stderr', (line: string) => {
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
this.emit('stderr', line);
});
transport.on('close', (...args: any[]) => {
this.logger.log('info', `Transport closed`);
this.cleanup();
this.emit('exit', ...args);
});
transport.on('error', (err: Error) => {
this.logger.log('error', `Transport error: ${err.message}`);
this.cleanup();
resolve(false);
});
transport.on('reconnected', () => {
this.logger.log('info', 'Transport reconnected, waiting for ready event');
this.emit('reconnected');
});
// Connect the transport
transport.connect().then(() => {
// Wait for the ready event from the protocol layer
const readyTimeout = setTimeout(() => {
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
this.kill();
resolve(false);
}, this.options.readyTimeoutMs);
this.once(`management:${this.options.readyEventName}`, () => {
clearTimeout(readyTimeout);
this.isRunning = true;
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
this.emit('ready');
resolve(true);
});
}).catch((err: Error) => {
this.logger.log('error', `Transport connect failed: ${err.message}`);
resolve(false);
});
} catch (err: any) {
this.logger.log('error', `Failed to connect: ${err.message}`);
resolve(false);
}
});
}
/**
* Send a typed command to the Rust process and wait for the response.
*/
public async sendCommand<K extends string & keyof TCommands>(
method: K,
params: TCommands[K]['params'],
): Promise<TCommands[K]['result']> {
if (!this.transport?.connected || !this.isRunning) {
throw new Error(`${this.options.binaryName} bridge is not running`);
}
const id = `req_${++this.requestCounter}`;
const request: IManagementRequest = { id, method, params };
const json = JSON.stringify(request);
// Check outbound payload size
const byteLength = Buffer.byteLength(json, 'utf8');
if (byteLength > this.options.maxPayloadSize) {
throw new Error(
`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`
);
}
return new Promise<TCommands[K]['result']>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`Command '${method}' timed out after ${this.options.requestTimeoutMs}ms`));
}, this.options.requestTimeoutMs);
this.pendingRequests.set(id, { resolve, reject, timer });
this.transport!.write(json + '\n').catch((err) => {
clearTimeout(timer);
this.pendingRequests.delete(id);
reject(new Error(`Failed to write to transport: ${err.message}`));
});
});
}
/**
* Send a streaming command to the Rust process.
* Returns a StreamingResponse that yields chunks via `for await...of`
* and exposes `.result` for the final response.
*/
public sendCommandStreaming<K extends string & TStreamingCommandKeys<TCommands>>(
method: K,
params: TCommands[K]['params'],
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
if (!this.transport?.connected || !this.isRunning) {
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
return streaming;
}
const id = `req_${++this.requestCounter}`;
const request: IManagementRequest = { id, method, params };
const json = JSON.stringify(request);
const byteLength = Buffer.byteLength(json, 'utf8');
if (byteLength > this.options.maxPayloadSize) {
streaming.fail(
new Error(`Outbound message exceeds maxPayloadSize (${byteLength} > ${this.options.maxPayloadSize})`)
);
return streaming;
}
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
streaming.fail(new Error(`Streaming command '${method}' timed out after ${timeoutMs}ms`));
}, timeoutMs);
this.pendingRequests.set(id, {
resolve: (result: any) => streaming.finish(result),
reject: (error: Error) => streaming.fail(error),
timer,
streaming,
});
this.transport!.write(json + '\n').catch((err) => {
clearTimeout(timer);
this.pendingRequests.delete(id);
streaming.fail(new Error(`Failed to write to transport: ${err.message}`));
});
return streaming;
}
/**
* Kill the connection and clean up all resources.
* For stdio: kills the child process (SIGTERM, then SIGKILL).
* For socket: closes the socket connection (does NOT kill the daemon).
*/
public kill(): void {
if (this.transport) {
const transport = this.transport;
this.transport = null;
this.isRunning = false;
// Reject pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error(`${this.options.binaryName} process killed`));
}
this.pendingRequests.clear();
transport.removeAllListeners();
transport.disconnect();
}
}
/**
* Whether the bridge is currently running.
*/
public get running(): boolean {
return this.isRunning;
}
private handleLine(line: string): void {
if (!line) return;
let parsed: any;
try {
parsed = JSON.parse(line);
} catch {
this.logger.log('warn', `Non-JSON output: ${line}`);
return;
}
// Check if it's an event (has 'event' field, no 'id')
if ('event' in parsed && !('id' in parsed)) {
const event = parsed as IManagementEvent;
this.emit(`management:${event.event}`, event.data);
return;
}
// Stream chunk (has 'id' + stream === true + 'data')
if ('id' in parsed && parsed.stream === true && 'data' in parsed) {
const pending = this.pendingRequests.get(parsed.id);
if (pending?.streaming) {
// Reset inactivity timeout
clearTimeout(pending.timer);
const timeoutMs = this.options.streamTimeoutMs ?? this.options.requestTimeoutMs;
pending.timer = setTimeout(() => {
this.pendingRequests.delete(parsed.id);
pending.reject(new Error(`Streaming command timed out after ${timeoutMs}ms of inactivity`));
}, timeoutMs);
pending.streaming.pushChunk(parsed.data);
}
return;
}
// Otherwise it's a response (has 'id' field)
if ('id' in parsed) {
const response = parsed as IManagementResponse;
const pending = this.pendingRequests.get(response.id);
if (pending) {
clearTimeout(pending.timer);
this.pendingRequests.delete(response.id);
if (response.success) {
pending.resolve(response.result);
} else {
pending.reject(new Error(response.error || 'Unknown error from Rust process'));
}
}
}
}
private cleanup(): void {
this.isRunning = false;
this.transport = null;
// Reject all pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error(`${this.options.binaryName} process exited`));
}
this.pendingRequests.clear();
}
}

View File

@@ -0,0 +1,187 @@
import * as plugins from './plugins.js';
import { LineScanner } from './classes.linescanner.js';
import type { IRustBridgeLogger } from './interfaces/index.js';
import type { IRustTransport } from './interfaces/transport.js';
export interface ISocketTransportOptions {
/** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */
socketPath: string;
/** Maximum inbound message size in bytes */
maxPayloadSize: number;
/** Logger instance */
logger: IRustBridgeLogger;
/** Enable auto-reconnect on unexpected disconnect (default: false) */
autoReconnect?: boolean;
/** Initial delay between reconnect attempts in ms (default: 100) */
reconnectBaseDelayMs?: number;
/** Maximum delay between reconnect attempts in ms (default: 30000) */
reconnectMaxDelayMs?: number;
/** Maximum number of reconnect attempts before giving up (default: 10) */
maxReconnectAttempts?: number;
}
interface IResolvedSocketTransportOptions {
socketPath: string;
maxPayloadSize: number;
logger: IRustBridgeLogger;
autoReconnect: boolean;
reconnectBaseDelayMs: number;
reconnectMaxDelayMs: number;
maxReconnectAttempts: number;
}
/**
* Transport that connects to an already-running process via Unix socket or Windows named pipe.
* The JSON-over-newline protocol is identical to stdio; only the transport changes.
*/
export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport {
private options: IResolvedSocketTransportOptions;
private socket: plugins.net.Socket | null = null;
private lineScanner: LineScanner;
private _connected: boolean = false;
private intentionalDisconnect: boolean = false;
private reconnectAttempts: number = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ISocketTransportOptions) {
super();
this.options = {
autoReconnect: false,
reconnectBaseDelayMs: 100,
reconnectMaxDelayMs: 30000,
maxReconnectAttempts: 10,
...options,
};
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
}
public get connected(): boolean {
return this._connected;
}
/**
* Connect to the socket. Resolves when the TCP/Unix connection is established.
*/
public async connect(): Promise<void> {
this.intentionalDisconnect = false;
this.reconnectAttempts = 0;
return this.doConnect();
}
private doConnect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
let settled = false;
this.socket = plugins.net.connect({ path: this.options.socketPath });
this.socket.on('connect', () => {
if (!settled) {
settled = true;
this._connected = true;
this.reconnectAttempts = 0;
resolve();
}
});
this.socket.on('data', (chunk: Buffer) => {
this.lineScanner.push(chunk, (line) => {
this.emit('line', line);
});
});
this.socket.on('close', () => {
const wasConnected = this._connected;
this._connected = false;
this.lineScanner.clear();
if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) {
this.attemptReconnect();
}
this.emit('close');
});
this.socket.on('error', (err: Error) => {
this._connected = false;
if (!settled) {
settled = true;
reject(err);
} else if (!this.intentionalDisconnect) {
this.emit('error', err);
}
});
});
}
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`);
this.emit('reconnect_failed');
return;
}
const delay = Math.min(
this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts),
this.options.reconnectMaxDelayMs,
);
this.reconnectAttempts++;
this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
try {
await this.doConnect();
this.emit('reconnected');
} catch {
// doConnect rejected — the 'close' handler on the new socket will trigger another attempt
}
}, delay);
}
/**
* Write data to the socket with backpressure support.
*/
public async write(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.socket || !this._connected) {
reject(new Error('Socket not connected'));
return;
}
const canContinue = this.socket.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
this.socket.once('drain', () => {
resolve();
});
}
});
}
/**
* Close the socket connection. Does NOT kill the remote daemon.
*/
public disconnect(): void {
this.intentionalDisconnect = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.socket) {
const sock = this.socket;
this.socket = null;
this._connected = false;
this.lineScanner.clear();
sock.removeAllListeners();
sock.destroy();
}
}
}

View File

@@ -0,0 +1,149 @@
import * as plugins from './plugins.js';
import { LineScanner } from './classes.linescanner.js';
import type { IRustBridgeLogger } from './interfaces/index.js';
import type { IRustTransport } from './interfaces/transport.js';
export interface IStdioTransportOptions {
binaryPath: string;
cliArgs: string[];
env?: Record<string, string>;
maxPayloadSize: number;
logger: IRustBridgeLogger;
}
/**
* Transport that spawns a child process and communicates via stdin/stdout.
* Extracted from the original RustBridge process management logic.
*/
export class StdioTransport extends plugins.events.EventEmitter implements IRustTransport {
private options: IStdioTransportOptions;
private childProcess: plugins.childProcess.ChildProcess | null = null;
private lineScanner: LineScanner;
private stderrRemainder: string = '';
private _connected: boolean = false;
constructor(options: IStdioTransportOptions) {
super();
this.options = options;
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
}
public get connected(): boolean {
return this._connected;
}
/**
* Spawn the child process. Resolves when the process is running (not necessarily ready).
*/
public async connect(): Promise<void> {
const env = this.options.env
? { ...process.env, ...this.options.env }
: { ...process.env };
this.childProcess = plugins.childProcess.spawn(
this.options.binaryPath,
this.options.cliArgs,
{ stdio: ['pipe', 'pipe', 'pipe'], env },
);
// Handle stderr with cross-chunk buffering
this.childProcess.stderr?.on('data', (data: Buffer) => {
this.stderrRemainder += data.toString();
const lines = this.stderrRemainder.split('\n');
this.stderrRemainder = lines.pop()!;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
this.emit('stderr', trimmed);
}
}
});
// Handle stdout via LineScanner
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
this.lineScanner.push(chunk, (line) => {
this.emit('line', line);
});
});
// Handle process exit
this.childProcess.on('exit', (code: number | null, signal: string | null) => {
// Flush remaining stderr
if (this.stderrRemainder.trim()) {
this.emit('stderr', this.stderrRemainder.trim());
}
this._connected = false;
this.lineScanner.clear();
this.stderrRemainder = '';
this.emit('close', code, signal);
});
this.childProcess.on('error', (err: Error) => {
this._connected = false;
this.emit('error', err);
});
this._connected = true;
}
/**
* Write data to stdin with backpressure support.
*/
public async write(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.childProcess?.stdin) {
reject(new Error('stdin not available'));
return;
}
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
this.childProcess.stdin.once('drain', () => {
resolve();
});
}
});
}
/**
* Kill the child process. Sends SIGTERM, then SIGKILL after 5s.
*/
public disconnect(): void {
if (!this.childProcess) return;
const proc = this.childProcess;
this.childProcess = null;
this._connected = false;
this.lineScanner.clear();
this.stderrRemainder = '';
// Remove all listeners
proc.removeAllListeners();
proc.stdout?.removeAllListeners();
proc.stderr?.removeAllListeners();
proc.stdin?.removeAllListeners();
// Kill the process
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
// Destroy stdio pipes
try { proc.stdin?.destroy(); } catch { /* ignore */ }
try { proc.stdout?.destroy(); } catch { /* ignore */ }
try { proc.stderr?.destroy(); } catch { /* ignore */ }
// Unref so Node doesn't wait
try { proc.unref(); } catch { /* ignore */ }
// Force kill after 5 seconds
setTimeout(() => {
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
}, 5000).unref();
}
}

View File

@@ -0,0 +1,110 @@
/**
* Represents a streaming response from a Rust bridge command.
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
* and exposes `.result` for the final response once the stream ends.
*
* @typeParam TChunk - Type of each streamed chunk
* @typeParam TResult - Type of the final result
*/
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
/** Resolves with the final result when the stream ends successfully. */
public readonly result: Promise<TResult>;
private resolveResult!: (value: TResult) => void;
private rejectResult!: (error: Error) => void;
/** Buffered chunks not yet consumed by the iterator. */
private buffer: TChunk[] = [];
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
/** Waiting consumer reject callback. */
private waitingReject: ((error: Error) => void) | null = null;
private done = false;
private error: Error | null = null;
constructor() {
this.result = new Promise<TResult>((resolve, reject) => {
this.resolveResult = resolve;
this.rejectResult = reject;
});
}
/**
* Push a chunk into the stream. Called internally by RustBridge.
*/
public pushChunk(chunk: TChunk): void {
if (this.done) return;
if (this.waiting) {
// A consumer is waiting — deliver immediately
const resolve = this.waiting;
this.waiting = null;
this.waitingReject = null;
resolve({ value: chunk, done: false });
} else {
// No consumer waiting — buffer the chunk
this.buffer.push(chunk);
}
}
/**
* End the stream successfully with a final result. Called internally by RustBridge.
*/
public finish(result: TResult): void {
if (this.done) return;
this.done = true;
this.resolveResult(result);
// If a consumer is waiting, signal end of iteration
if (this.waiting) {
const resolve = this.waiting;
this.waiting = null;
this.waitingReject = null;
resolve({ value: undefined as any, done: true });
}
}
/**
* End the stream with an error. Called internally by RustBridge.
*/
public fail(error: Error): void {
if (this.done) return;
this.done = true;
this.error = error;
this.rejectResult(error);
// If a consumer is waiting, reject it
if (this.waitingReject) {
const reject = this.waitingReject;
this.waiting = null;
this.waitingReject = null;
reject(error);
}
}
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
return {
next: (): Promise<IteratorResult<TChunk>> => {
// If there are buffered chunks, deliver one
if (this.buffer.length > 0) {
return Promise.resolve({ value: this.buffer.shift()!, done: false });
}
// If the stream is done, signal end
if (this.done) {
if (this.error) {
return Promise.reject(this.error);
}
return Promise.resolve({ value: undefined as any, done: true });
}
// No buffered chunks and not done — wait for the next push
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
this.waiting = resolve;
this.waitingReject = reject;
});
},
};
}
}

View File

@@ -1,3 +1,7 @@
import * as plugins from './plugins.js'; export { RustBridge } from './classes.rustbridge.js';
export { RustBinaryLocator } from './classes.rustbinarylocator.js';
export let demoExport = 'Hi there! :) This is an exported string'; export { StreamingResponse } from './classes.streamingresponse.js';
export { StdioTransport } from './classes.stdiotransport.js';
export { SocketTransport } from './classes.sockettransport.js';
export { LineScanner } from './classes.linescanner.js';
export * from './interfaces/index.js';

61
ts/interfaces/config.ts Normal file
View File

@@ -0,0 +1,61 @@
/**
* Minimal logger interface for the bridge.
*/
export interface IRustBridgeLogger {
log(level: string, message: string, data?: Record<string, any>): void;
}
/**
* Options for locating a Rust binary.
*/
export interface IBinaryLocatorOptions {
/** Name of the binary (e.g., 'rustproxy') */
binaryName: string;
/** Environment variable to check for explicit binary path (e.g., 'SMARTPROXY_RUST_BINARY') */
envVarName?: string;
/** Prefix for platform-specific npm packages (e.g., '@push.rocks/smartproxy') */
platformPackagePrefix?: string;
/** Additional local paths to search (defaults to ./rust/target/release/<binaryName> and ./rust/target/debug/<binaryName>) */
localPaths?: string[];
/** Whether to search the system PATH (default: true) */
searchSystemPath?: boolean;
/** Explicit binary path override - skips all other search */
binaryPath?: string;
}
/**
* Options for the RustBridge.
*/
export interface IRustBridgeOptions extends IBinaryLocatorOptions {
/** CLI arguments passed to the binary (default: ['--management']) */
cliArgs?: string[];
/** Timeout for individual requests in ms (default: 30000) */
requestTimeoutMs?: number;
/** Timeout for the ready event during spawn in ms (default: 10000) */
readyTimeoutMs?: number;
/** Additional environment variables for the child process */
env?: Record<string, string>;
/** Name of the ready event emitted by the Rust binary (default: 'ready') */
readyEventName?: string;
/** Optional logger instance */
logger?: IRustBridgeLogger;
/** Maximum message size in bytes (default: 50MB). Messages exceeding this are rejected. */
maxPayloadSize?: number;
/** Inactivity timeout for streaming commands in ms (default: same as requestTimeoutMs).
* Resets on each chunk received. */
streamTimeoutMs?: number;
}
/**
* Options for connecting to an already-running daemon via Unix socket or named pipe.
*/
export interface ISocketConnectOptions {
/** Enable auto-reconnect on unexpected disconnect (default: false) */
autoReconnect?: boolean;
/** Initial delay between reconnect attempts in ms (default: 100) */
reconnectBaseDelayMs?: number;
/** Maximum delay between reconnect attempts in ms (default: 30000) */
reconnectMaxDelayMs?: number;
/** Maximum number of reconnect attempts before giving up (default: 10) */
maxReconnectAttempts?: number;
}

3
ts/interfaces/index.ts Normal file
View File

@@ -0,0 +1,3 @@
export * from './ipc.js';
export * from './config.js';
export * from './transport.js';

62
ts/interfaces/ipc.ts Normal file
View File

@@ -0,0 +1,62 @@
/**
* Management request sent to the Rust binary via stdin.
*/
export interface IManagementRequest {
id: string;
method: string;
params: Record<string, any>;
}
/**
* Management response received from the Rust binary via stdout.
*/
export interface IManagementResponse {
id: string;
success: boolean;
result?: any;
error?: string;
}
/**
* Management event received from the Rust binary (unsolicited, no id field).
*/
export interface IManagementEvent {
event: string;
data: any;
}
/**
* Definition of a single command supported by a Rust binary.
*/
export interface ICommandDefinition<TParams = any, TResult = any> {
params: TParams;
result: TResult;
}
/**
* Map of command names to their definitions.
* Used to type-safe the bridge's sendCommand method.
*/
export type TCommandMap = Record<string, ICommandDefinition>;
/**
* Stream chunk message received from the Rust binary during a streaming command.
*/
export interface IManagementStreamChunk {
id: string;
stream: true;
data: any;
}
/**
* Extract keys from a command map whose definitions include a `chunk` field,
* indicating they support streaming responses.
*/
export type TStreamingCommandKeys<TCommands extends TCommandMap> = {
[K in keyof TCommands]: TCommands[K] extends { chunk: any } ? K : never;
}[keyof TCommands];
/**
* Extract the chunk type from a command definition that has a `chunk` field.
*/
export type TExtractChunk<TDef> = TDef extends { chunk: infer C } ? C : never;

View File

@@ -0,0 +1,26 @@
import type * as plugins from '../plugins.js';
/**
* Abstract transport for communicating with a Rust process.
* Both stdio and socket transports implement this interface.
*
* Events emitted:
* - 'line' (line: string) — a complete newline-terminated JSON line received
* - 'error' (err: Error) — transport-level error
* - 'close' (...args: any[]) — transport has closed/disconnected
* - 'stderr' (line: string) — stderr output (stdio transport only)
* - 'reconnected' () — transport reconnected after unexpected disconnect (socket only)
*/
export interface IRustTransport extends plugins.events.EventEmitter {
/** Connect the transport (spawn process or connect to socket). Resolves when I/O channel is open. */
connect(): Promise<void>;
/** Write a string to the transport. Handles backpressure. */
write(data: string): Promise<void>;
/** Disconnect the transport. For stdio: kills the process. For socket: closes the connection. */
disconnect(): void;
/** Whether the transport is currently connected and writable. */
readonly connected: boolean;
}

View File

@@ -1,7 +1,12 @@
// native scope // native scope
import * as path from 'path'; import * as path from 'path';
import * as fs from 'fs';
import * as childProcess from 'child_process';
import * as events from 'events';
import * as url from 'url';
import * as net from 'net';
export { path }; export { path, fs, childProcess, events, url, net };
// @push.rocks scope // @push.rocks scope
import * as smartpath from '@push.rocks/smartpath'; import * as smartpath from '@push.rocks/smartpath';