10 Commits

Author SHA1 Message Date
jkunz d69c336c81 v1.4.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-30 08:39:35 +00:00
jkunz f4673c3029 feat(package): improve package metadata and standardize project configuration 2026-04-30 08:39:35 +00:00
jkunz d079f245ef v1.3.2
Default (tags) / security (push) Successful in 27s
Default (tags) / test (push) Failing after 3m56s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-14 23:42:26 +00:00
jkunz eb2bf4ba98 fix(rustbinarylocator): support resolving platform-suffixed local Rust binaries 2026-03-14 23:42:26 +00:00
jkunz 1ec6caca3b v1.3.1
Default (tags) / security (push) Successful in 26s
Default (tags) / test (push) Failing after 3m58s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 08:50:53 +00:00
jkunz a29201b1c5 fix(readme): document socket transport and clarify stdio/socket differences in README 2026-02-26 08:50:53 +00:00
jkunz d762c26565 v1.3.0
Default (tags) / security (push) Successful in 27s
Default (tags) / test (push) Failing after 3m58s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 08:44:28 +00:00
jkunz deda8cc4ee feat(transport): introduce transport abstraction and socket-mode support for RustBridge 2026-02-26 08:44:28 +00:00
jkunz 0c39e157c2 v1.2.1
Default (tags) / security (push) Successful in 34s
Default (tags) / test (push) Failing after 3m58s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-12 21:25:11 +00:00
jkunz b7e3e30ce5 fix(rust-binary-locator): auto-fix missing execute permission for located Rust binaries 2026-02-12 21:25:11 +00:00
22 changed files with 3154 additions and 1836 deletions
+37
View File
@@ -0,0 +1,37 @@
{
"@git.zone/cli": {
"projectType": "npm",
"module": {
"githost": "code.foss.global",
"gitscope": "push.rocks",
"gitrepo": "smartrust",
"description": "A type-safe bridge between JavaScript engines and Rust binaries.",
"npmPackagename": "@push.rocks/smartrust",
"license": "MIT",
"projectDomain": "push.rocks",
"keywords": [
"rust",
"typescript",
"ipc",
"bridge",
"stdio",
"socket",
"binary",
"transport"
]
},
"release": {
"accessLevel": "public",
"registries": [
"https://verdaccio.lossless.digital",
"https://registry.npmjs.org"
]
}
},
"@git.zone/tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.\n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.\n\nUse of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.\n\n### Company Information\n\nTask Venture Capital GmbH\nRegistered at District Court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
},
"@ship.zone/szci": {
"npmGlobalTools": []
}
}
+44
View File
@@ -1,5 +1,49 @@
# Changelog # Changelog
## 2026-04-30 - 1.4.0 - feat(package)
improve package metadata and standardize project configuration
- replace npmextra.json with .smartconfig.json and expand project metadata
- add package publishing metadata including files, keywords, homepage, repository, bugs, and packageManager
- tighten TypeScript configuration with noImplicitAny and explicit node types
- update build tooling dependencies and remove the allowimplicitany build flag
- add a license file and align readme references and install instructions
## 2026-03-14 - 1.3.2 - fix(rustbinarylocator)
support resolving platform-suffixed local Rust binaries
- Checks local Rust build paths for binaries with platform suffixes such as _linux_amd64 in addition to unsuffixed names
- Adds platform and architecture suffix mapping for linux, darwin, windows, x64, and arm64
## 2026-02-26 - 1.3.1 - fix(readme)
document socket transport and clarify stdio/socket differences in README
- Add 'Two Transport Modes' section documenting stdio (spawn) and socket (connect) modes
- Add examples for connect(), socket usage, and auto-reconnect with exponential backoff
- Clarify protocol is transport-agnostic and update ready/stream/event descriptions
- Update event docs: mark stderr as stdio-only and add 'reconnected' event for socket transports
- Clarify kill() behavior for both stdio and socket transports
- Add API reference entries for SocketTransport, StdioTransport, ISocketConnectOptions, IRustTransport, and LineScanner
- Add platform notes, architecture diagram, and minimal Rust/socket usage guidance
## 2026-02-26 - 1.3.0 - feat(transport)
introduce transport abstraction and socket-mode support for RustBridge
- Add IRustTransport interface and two transport implementations: StdioTransport (spawns child process and uses stdin/stdout) and SocketTransport (connects to Unix socket / Windows named pipe).
- Refactor RustBridge to use a transport abstraction (connectWithTransport) and add connect(socketPath) to attach to an existing daemon via socket.
- Introduce LineScanner: a buffer-based newline scanner used by both transports to handle large/newline-delimited messages and avoid OOMs.
- Add socket connection options (autoReconnect, reconnectBaseDelayMs, reconnectMaxDelayMs, maxReconnectAttempts) and implement auto-reconnect/backoff behavior in SocketTransport.
- Implement backpressure-aware write semantics and proper disconnect/cleanup for both transports; RustBridge.kill() now disconnects the transport instead of directly managing processes.
- Add tests and tooling: socket transport tests, line scanner tests, and a mock-socket-server.mjs helper script for testing socket mode.
- Export new symbols (StdioTransport, SocketTransport, LineScanner) and update plugins to expose net; update interfaces to export transport types.
## 2026-02-12 - 1.2.1 - fix(rust-binary-locator)
auto-fix missing execute permission for located Rust binaries
- If a located binary exists but lacks the execute bit, attempt to chmod it to 0o755 and treat it as executable.
- Logs an info message when the auto-fix is applied: 'Auto-fixed missing execute permission on: <filePath>'.
- Addresses cases where npm/pnpm installs remove the execute permission from bundled binaries.
## 2026-02-11 - 1.2.0 - feat(rustbridge) ## 2026-02-11 - 1.2.0 - feat(rustbridge)
add streaming responses and robust large-payload/backpressure handling to RustBridge add streaming responses and robust large-payload/backpressure handling to RustBridge
+19
View File
@@ -0,0 +1,19 @@
Copyright (c) 2014 Task Venture Capital GmbH (hello@task.vc)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
-24
View File
@@ -1,24 +0,0 @@
{
"@git.zone/cli": {
"projectType": "npm",
"module": {
"githost": "code.foss.global",
"gitscope": "push.rocks",
"gitrepo": "smartrust",
"description": "a bridge between JS engines and rust",
"npmPackagename": "@push.rocks/smartrust",
"license": "MIT",
"projectDomain": "push.rocks"
},
"release": {
"accessLevel": "public",
"registries": [
"https://verdaccio.lossless.digital",
"https://registry.npmjs.org"
]
}
},
"@ship.zone/szci": {
"npmGlobalTools": []
}
}
+38 -8
View File
@@ -1,8 +1,8 @@
{ {
"name": "@push.rocks/smartrust", "name": "@push.rocks/smartrust",
"version": "1.2.0", "version": "1.4.0",
"private": false, "private": false,
"description": "a bridge between JS engines and rust", "description": "A type-safe bridge between JavaScript engines and Rust binaries.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts", "typings": "dist_ts/index.d.ts",
"type": "module", "type": "module",
@@ -10,16 +10,46 @@
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"test": "(tstest test/ --verbose --logfile --timeout 60)", "test": "(tstest test/ --verbose --logfile --timeout 60)",
"build": "(tsbuild tsfolders --allowimplicitany)", "build": "(tsbuild tsfolders)",
"buildDocs": "(tsdoc)" "buildDocs": "(tsdoc)"
}, },
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^4.1.2", "@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsrun": "^2.0.1", "@git.zone/tsrun": "^2.0.2",
"@git.zone/tstest": "^3.1.8", "@git.zone/tstest": "^3.6.3",
"@types/node": "^25.2.0" "@types/node": "^25.6.0"
}, },
"dependencies": { "dependencies": {
"@push.rocks/smartpath": "^6.0.0" "@push.rocks/smartpath": "^6.0.0"
} },
"files": [
"ts/**/*",
"dist/**/*",
"dist_*/**/*",
"dist_ts/**/*",
"assets/**/*",
"cli.js",
".smartconfig.json",
"license",
"readme.md"
],
"keywords": [
"rust",
"typescript",
"ipc",
"bridge",
"stdio",
"socket",
"binary",
"transport"
],
"homepage": "https://code.foss.global/push.rocks/smartrust#readme",
"repository": {
"type": "git",
"url": "https://code.foss.global/push.rocks/smartrust.git"
},
"bugs": {
"url": "https://code.foss.global/push.rocks/smartrust/issues"
},
"packageManager": "pnpm@10.28.2"
} }
+1772 -1631
View File
File diff suppressed because it is too large Load Diff
+160 -20
View File
@@ -1,6 +1,6 @@
# @push.rocks/smartrust # @push.rocks/smartrust
A type-safe, production-ready bridge between TypeScript and Rust binaries via JSON-over-stdin/stdout IPC — with support for request/response, streaming, and event patterns. A type-safe, production-ready bridge between TypeScript and Rust binaries — with support for **stdio** (child process) and **socket** (Unix socket / Windows named pipe) transports, request/response, streaming, and event patterns.
## Issue Reporting and Security ## Issue Reporting and Security
@@ -9,32 +9,39 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
## Install 📦 ## Install 📦
```bash ```bash
npm install @push.rocks/smartrust pnpm add @push.rocks/smartrust
# or
pnpm install @push.rocks/smartrust
``` ```
## Overview 🔭 ## Overview 🔭
`@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing. `@push.rocks/smartrust` provides a complete bridge for TypeScript applications that need to communicate with Rust binaries. It handles the entire lifecycle — binary discovery, process spawning **or socket connection**, request/response correlation, **streaming responses**, event pub/sub, and graceful shutdown — so you can focus on your command definitions instead of IPC plumbing.
### Two Transport Modes 🔌
| Mode | Method | Use Case |
|------|--------|----------|
| **Stdio** | `bridge.spawn()` | Spawn the Rust binary as a child process. Communicate via stdin/stdout. |
| **Socket** | `bridge.connect(path)` | Connect to an **already-running** Rust daemon via Unix socket or Windows named pipe. |
The JSON protocol is identical in both modes — only the transport layer changes. Socket mode enables use cases where the Rust binary runs as a **privileged system service** (e.g., a VPN daemon needing root for TUN devices, a network proxy binding to privileged ports) while the TypeScript app connects to it unprivileged.
### Why? 🤔 ### Why? 🤔
If you're integrating Rust into a Node.js project, you'll inevitably need: If you're integrating Rust into a Node.js project, you'll inevitably need:
- A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages) - A way to **find** the compiled Rust binary across different environments (dev, CI, production, platform packages)
- A way to **spawn** it and establish reliable two-way communication - A way to **spawn it** or **connect to it** and establish reliable two-way communication
- **Type-safe** request/response patterns with proper error handling - **Type-safe** request/response patterns with proper error handling
- **Streaming responses** for progressive data processing, log tailing, or chunked transfers - **Streaming responses** for progressive data processing, log tailing, or chunked transfers
- **Event streaming** from Rust to TypeScript - **Event streaming** from Rust to TypeScript
- **Graceful lifecycle management** (ready detection, clean shutdown, force kill) - **Graceful lifecycle management** (ready detection, clean shutdown, auto-reconnection)
`smartrust` wraps all of this into three classes: `RustBridge`, `RustBinaryLocator`, and `StreamingResponse`. `smartrust` wraps all of this into a clean API: `RustBridge`, `RustBinaryLocator`, `StreamingResponse`, and pluggable transports.
## Usage 🚀 ## Usage 🚀
### The IPC Protocol ### The IPC Protocol
`smartrust` uses a simple, newline-delimited JSON protocol over stdin/stdout: `smartrust` uses a simple, newline-delimited JSON protocol:
| Direction | Format | Description | | Direction | Format | Description |
|-----------|--------|-------------| |-----------|--------|-------------|
@@ -44,7 +51,7 @@ If you're integrating Rust into a Node.js project, you'll inevitably need:
| **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) | | **Rust → TS** (Stream Chunk) | `{"id": "req_1", "stream": true, "data": {...}}` | Intermediate chunk (zero or more) |
| **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) | | **Rust → TS** (Event) | `{"event": "ready", "data": {...}}` | Unsolicited event (no ID) |
Your Rust binary reads JSON lines from stdin and writes JSON lines to stdout. That's it. Stderr is free for logging. This protocol works identically over stdio and socket transports. Your Rust binary reads JSON lines from one end and writes JSON lines to the other. That's it.
### Defining Your Commands ### Defining Your Commands
@@ -62,7 +69,9 @@ type TMyCommands = {
}; };
``` ```
### Creating and Using the Bridge ### Stdio Mode — Spawn a Child Process
This is the classic mode. The bridge spawns the Rust binary and communicates via stdin/stdout:
```typescript ```typescript
const bridge = new RustBridge<TMyCommands>({ const bridge = new RustBridge<TMyCommands>({
@@ -90,10 +99,62 @@ bridge.on('management:configChanged', (data) => {
console.log('Config was changed:', data); console.log('Config was changed:', data);
}); });
// Clean shutdown // Clean shutdown (SIGTERM → SIGKILL after 5s)
bridge.kill(); bridge.kill();
``` ```
### Socket Mode — Connect to a Running Daemon 🔗
When the Rust binary runs as a system service (e.g., via `systemd`, `launchd`, or a Windows Service), use `connect()` to talk to it over a Unix socket or named pipe:
```typescript
const bridge = new RustBridge<TMyCommands>({
binaryName: 'my-daemon', // used for logging / error messages
});
// Connect to the daemon's management socket
const ok = await bridge.connect('/var/run/my-daemon.sock');
if (!ok) {
console.error('Failed to connect to daemon');
process.exit(1);
}
// Same API as stdio mode — completely transparent!
const { pid } = await bridge.sendCommand('start', { port: 8080, host: '0.0.0.0' });
const metrics = await bridge.sendCommand('getMetrics', {});
// kill() closes the socket — it does NOT kill the daemon
bridge.kill();
```
#### Auto-Reconnect
For long-running applications, enable automatic reconnection with exponential backoff:
```typescript
const ok = await bridge.connect('/var/run/my-daemon.sock', {
autoReconnect: true, // reconnect on unexpected disconnect
reconnectBaseDelayMs: 100, // initial retry delay (doubles each attempt)
reconnectMaxDelayMs: 30000, // max retry delay cap
maxReconnectAttempts: 10, // give up after 10 attempts
});
// Listen for reconnection events
bridge.on('reconnected', () => {
console.log('Reconnected to daemon!');
});
```
#### Platform Notes
| Platform | Socket Path Format | Example |
|----------|-------------------|---------|
| **Linux** | `/var/run/<name>.sock` or `$XDG_RUNTIME_DIR/<name>.sock` | `/var/run/my-daemon.sock` |
| **macOS** | `/var/run/<name>.sock` | `/var/run/my-daemon.sock` |
| **Windows** | `\\.\pipe\<name>` | `\\.\pipe\my-daemon` |
Node.js `net.connect()` handles all formats transparently — no platform-specific code needed.
### Streaming Commands 🌊 ### Streaming Commands 🌊
For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output. For commands where the Rust binary sends a series of chunks before a final result, use `sendCommandStreaming`. This is perfect for progressive data processing, log tailing, search results, or any scenario where you want incremental output.
@@ -231,6 +292,17 @@ const bridge = new RustBridge<TMyCommands>({
}); });
``` ```
Socket connection options (passed to `bridge.connect()`):
```typescript
interface ISocketConnectOptions {
autoReconnect?: boolean; // default: false
reconnectBaseDelayMs?: number; // default: 100
reconnectMaxDelayMs?: number; // default: 30000
maxReconnectAttempts?: number; // default: 10
}
```
### Events 📡 ### Events 📡
`RustBridge` extends `EventEmitter` and emits the following events: `RustBridge` extends `EventEmitter` and emits the following events:
@@ -238,8 +310,9 @@ const bridge = new RustBridge<TMyCommands>({
| Event | Payload | Description | | Event | Payload | Description |
|-------|---------|-------------| |-------|---------|-------------|
| `ready` | — | Bridge connected and binary reported ready | | `ready` | — | Bridge connected and binary reported ready |
| `exit` | `(code, signal)` | Rust process exited | | `exit` | `(code, signal)` | Transport closed (process exited or socket disconnected) |
| `stderr` | `string` | A line from the binary's stderr | | `stderr` | `string` | A line from the binary's stderr (stdio mode only) |
| `reconnected` | — | Socket transport reconnected after unexpected disconnect |
| `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) | | `management:<name>` | `any` | Custom event from Rust (e.g. `management:configChanged`) |
### Custom Logger 📝 ### Custom Logger 📝
@@ -263,7 +336,9 @@ const bridge = new RustBridge<TMyCommands>({
### Writing the Rust Side 🦀 ### Writing the Rust Side 🦀
Your Rust binary needs to implement a simple protocol: Your Rust binary needs to implement a simple protocol. The transport (stdio or socket) doesn't change the message format — only how connections are established.
#### Stdio Mode (Child Process)
1. **On startup**, write a ready event to stdout: 1. **On startup**, write a ready event to stdout:
``` ```
@@ -280,7 +355,14 @@ Your Rust binary needs to implement a simple protocol:
6. **Use stderr** for logging — it won't interfere with the IPC protocol 6. **Use stderr** for logging — it won't interfere with the IPC protocol
Here's a minimal Rust skeleton: #### Socket Mode (Daemon)
1. **Listen** on a Unix socket (e.g., `/var/run/my-daemon.sock`) or Windows named pipe
2. **On each new client connection**, send the `{"event":"ready","data":{...}}\n` event
3. Read/write JSON lines on the socket (same protocol as stdio)
4. Support multiple concurrent clients — each connection is independent
Here's a minimal Rust skeleton (stdio mode):
```rust ```rust
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -368,6 +450,33 @@ fn main() {
} }
``` ```
## Architecture 🏗️
```
┌──────────────────────────────────────────────────────┐
│ RustBridge<T> │
│ (protocol layer: handleLine, sendCommand, events) │
├──────────────┬───────────────────────────────────────┤
│ StdioTransport │ SocketTransport │
│ spawn() + │ net.connect() + auto-reconnect │
│ stdin/stdout │ Unix socket / named pipe │
├──────────────┴───────────────────────────────────────┤
│ IRustTransport interface │
│ connect() / write() / disconnect() │
├──────────────────────────────────────────────────────┤
│ LineScanner (shared newline scanner) │
├──────────────────────────────────────────────────────┤
│ RustBinaryLocator │
│ (binary search — stdio mode only) │
└──────────────────────────────────────────────────────┘
```
- **`RustBridge`** — The main class. Protocol-level logic (JSON parsing, request correlation, streaming, events) is transport-agnostic.
- **`StdioTransport`** — Spawns a child process, manages stdin/stdout/stderr, handles SIGTERM/SIGKILL.
- **`SocketTransport`** — Connects to an existing Unix socket or named pipe, with optional auto-reconnect and exponential backoff.
- **`LineScanner`** — Shared buffer-based newline scanner used by both transports for efficient message framing.
- **`RustBinaryLocator`** — Priority-ordered binary search (used by stdio mode only).
## API Reference 📖 ## API Reference 📖
### `RustBridge<TCommands>` ### `RustBridge<TCommands>`
@@ -375,11 +484,12 @@ fn main() {
| Method / Property | Signature | Description | | Method / Property | Signature | Description |
|---|---|---| |---|---|---|
| `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance | | `constructor` | `new RustBridge<T>(options: IRustBridgeOptions)` | Create a new bridge instance |
| `spawn()` | `Promise<boolean>` | Spawn the binary and wait for ready; returns `false` on failure | | `spawn()` | `Promise<boolean>` | **Stdio mode**: Spawn the binary and wait for ready; returns `false` on failure |
| `connect(socketPath, options?)` | `Promise<boolean>` | **Socket mode**: Connect to a running daemon; returns `false` on failure |
| `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response | | `sendCommand(method, params)` | `Promise<TCommands[K]['result']>` | Send a typed command and await the response |
| `sendCommandStreaming(method, params)` | `StreamingResponse<TChunk, TResult>` | Send a streaming command; returns immediately | | `sendCommandStreaming(method, params)` | `StreamingResponse<TChunk, TResult>` | Send a streaming command; returns immediately |
| `kill()` | `void` | SIGTERM the process, reject pending requests, force SIGKILL after 5s | | `kill()` | `void` | Stdio: SIGTERM the process, SIGKILL after 5s. Socket: close the connection (daemon stays alive) |
| `running` | `boolean` | Whether the bridge is currently connected | | `running` | `boolean` | Whether the bridge is currently connected and ready |
### `StreamingResponse<TChunk, TResult>` ### `StreamingResponse<TChunk, TResult>`
@@ -396,13 +506,43 @@ fn main() {
| `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached | | `findBinary()` | `Promise<string \| null>` | Find the binary using the priority search; result is cached |
| `clearCache()` | `void` | Clear the cached path to force a fresh search | | `clearCache()` | `void` | Clear the cached path to force a fresh search |
### `StdioTransport`
| Method / Property | Signature | Description |
|---|---|---|
| `constructor` | `new StdioTransport(options: IStdioTransportOptions)` | Create a stdio transport |
| `connect()` | `Promise<void>` | Spawn the child process |
| `write(data)` | `Promise<void>` | Write to stdin with backpressure handling |
| `disconnect()` | `void` | Kill the process (SIGTERM → SIGKILL after 5s) |
| `connected` | `boolean` | Whether the process is running |
### `SocketTransport`
| Method / Property | Signature | Description |
|---|---|---|
| `constructor` | `new SocketTransport(options: ISocketTransportOptions)` | Create a socket transport |
| `connect()` | `Promise<void>` | Connect to the Unix socket / named pipe |
| `write(data)` | `Promise<void>` | Write to socket with backpressure handling |
| `disconnect()` | `void` | Close the socket (does not kill the daemon) |
| `connected` | `boolean` | Whether the socket is connected |
### `LineScanner`
| Method / Property | Signature | Description |
|---|---|---|
| `constructor` | `new LineScanner(maxPayloadSize, logger)` | Create a line scanner |
| `push(chunk, onLine)` | `void` | Feed a `Buffer` chunk; calls `onLine` for each complete line |
| `clear()` | `void` | Reset the internal buffer |
### Exported Interfaces & Types ### Exported Interfaces & Types
| Interface / Type | Description | | Interface / Type | Description |
|---|---| |---|---|
| `IRustBridgeOptions` | Full configuration for `RustBridge` | | `IRustBridgeOptions` | Full configuration for `RustBridge` |
| `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` | | `IBinaryLocatorOptions` | Configuration for `RustBinaryLocator` |
| `ISocketConnectOptions` | Socket connection options (reconnect settings) |
| `IRustBridgeLogger` | Logger interface: `{ log(level, message, data?) }` | | `IRustBridgeLogger` | Logger interface: `{ log(level, message, data?) }` |
| `IRustTransport` | Transport interface (extends `EventEmitter`) |
| `IManagementRequest` | IPC request shape: `{ id, method, params }` | | `IManagementRequest` | IPC request shape: `{ id, method, params }` |
| `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` | | `IManagementResponse` | IPC response shape: `{ id, success, result?, error? }` |
| `IManagementEvent` | IPC event shape: `{ event, data }` | | `IManagementEvent` | IPC event shape: `{ event, data }` |
@@ -414,7 +554,7 @@ fn main() {
## License and Legal Information ## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
+124
View File
@@ -0,0 +1,124 @@
#!/usr/bin/env node
/**
* Mock "Rust daemon" for testing the SocketTransport and RustBridge socket mode.
* Creates a Unix socket server, accepts connections, and speaks the same
* JSON-over-newline IPC protocol as mock-rust-binary.mjs.
*
* Usage: node mock-socket-server.mjs <socket-path>
* Signals readiness by writing a JSON line to stdout: {"socketPath":"...","ready":true}
*/
import * as net from 'net';
import * as fs from 'fs';
const socketPath = process.argv[2];
if (!socketPath) {
process.stderr.write('Usage: mock-socket-server.mjs <socket-path>\n');
process.exit(1);
}
// Remove stale socket file
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
/**
* Backpressure-aware write to a socket.
*/
function writeResponse(conn, data) {
const json = JSON.stringify(data) + '\n';
if (!conn.write(json)) {
conn.once('drain', () => {});
}
}
function handleLine(line, conn) {
let request;
try {
request = JSON.parse(line);
} catch {
return;
}
const { id, method, params } = request;
if (method === 'echo') {
writeResponse(conn, { id, success: true, result: params });
} else if (method === 'largeEcho') {
writeResponse(conn, { id, success: true, result: params });
} else if (method === 'error') {
writeResponse(conn, { id, success: false, error: 'Test error message' });
} else if (method === 'emitEvent') {
writeResponse(conn, { event: params.eventName, data: params.eventData });
writeResponse(conn, { id, success: true, result: null });
} else if (method === 'slow') {
setTimeout(() => {
writeResponse(conn, { id, success: true, result: { delayed: true } });
}, 100);
} else if (method === 'streamEcho') {
const count = params.count || 0;
let sent = 0;
const interval = setInterval(() => {
if (sent < count) {
writeResponse(conn, { id, stream: true, data: { index: sent, value: `chunk_${sent}` } });
sent++;
} else {
clearInterval(interval);
writeResponse(conn, { id, success: true, result: { totalChunks: count } });
}
}, 10);
} else if (method === 'streamError') {
writeResponse(conn, { id, stream: true, data: { index: 0, value: 'before_error' } });
setTimeout(() => {
writeResponse(conn, { id, success: false, error: 'Stream error after chunk' });
}, 20);
} else if (method === 'streamEmpty') {
writeResponse(conn, { id, success: true, result: { totalChunks: 0 } });
} else if (method === 'exit') {
writeResponse(conn, { id, success: true, result: null });
// In socket mode, 'exit' just closes this connection, not the server
setTimeout(() => conn.end(), 50);
} else {
writeResponse(conn, { id, success: false, error: `Unknown method: ${method}` });
}
}
const server = net.createServer((conn) => {
// Send ready event on each new connection
writeResponse(conn, { event: 'ready', data: { version: '1.0.0' } });
// Buffer-based newline scanner for incoming data
let buffer = Buffer.alloc(0);
conn.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
let idx;
while ((idx = buffer.indexOf(0x0A)) !== -1) {
const lineBuffer = buffer.subarray(0, idx);
buffer = buffer.subarray(idx + 1);
const line = lineBuffer.toString('utf8').trim();
if (line) {
handleLine(line, conn);
}
}
});
conn.on('error', () => { /* ignore client errors */ });
});
server.listen(socketPath, () => {
// Signal to parent that the server is ready
process.stdout.write(JSON.stringify({ socketPath, ready: true }) + '\n');
});
// Handle SIGTERM gracefully
process.on('SIGTERM', () => {
server.close();
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
process.exit(0);
});
// Handle SIGINT
process.on('SIGINT', () => {
server.close();
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
process.exit(0);
});
+89
View File
@@ -0,0 +1,89 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { LineScanner } from '../ts/classes.linescanner.js';
const noopLogger = { log() {} };
tap.test('should parse a single complete line', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('{"hello":"world"}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"hello":"world"}');
});
tap.test('should parse multiple lines in one chunk', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('{"a":1}\n{"b":2}\n{"c":3}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(3);
expect(lines[0]).toEqual('{"a":1}');
expect(lines[1]).toEqual('{"b":2}');
expect(lines[2]).toEqual('{"c":3}');
});
tap.test('should handle a line split across chunks', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('{"hel'), (line) => lines.push(line));
expect(lines.length).toEqual(0);
scanner.push(Buffer.from('lo":"world"}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"hello":"world"}');
});
tap.test('should drop oversized lines', async () => {
const scanner = new LineScanner(100, noopLogger);
const lines: string[] = [];
// Line is 200 chars + newline, exceeds maxPayloadSize of 100
const oversized = 'x'.repeat(200) + '\n';
scanner.push(Buffer.from(oversized), (line) => lines.push(line));
expect(lines.length).toEqual(0);
});
tap.test('should clear buffer on OOM (no newline, exceeds max)', async () => {
const scanner = new LineScanner(100, noopLogger);
const lines: string[] = [];
// Push 200 bytes without any newline — exceeds maxPayloadSize
scanner.push(Buffer.from('x'.repeat(200)), (line) => lines.push(line));
expect(lines.length).toEqual(0);
// After clearing, should work normally again
scanner.push(Buffer.from('{"ok":true}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"ok":true}');
});
tap.test('should skip empty lines', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
scanner.push(Buffer.from('\n\n{"a":1}\n\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"a":1}');
});
tap.test('should handle mixed complete and partial lines', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
// First chunk: one complete line + start of second line
scanner.push(Buffer.from('{"first":1}\n{"sec'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
// Second chunk: end of second line + complete third line
scanner.push(Buffer.from('ond":2}\n{"third":3}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(3);
expect(lines[1]).toEqual('{"second":2}');
expect(lines[2]).toEqual('{"third":3}');
});
tap.test('clear should reset the buffer', async () => {
const scanner = new LineScanner(1024 * 1024, noopLogger);
const lines: string[] = [];
// Push partial data
scanner.push(Buffer.from('{"partial":'), (line) => lines.push(line));
// Clear
scanner.clear();
// Now push a complete new line — old partial should not affect it
scanner.push(Buffer.from('{"fresh":true}\n'), (line) => lines.push(line));
expect(lines.length).toEqual(1);
expect(lines[0]).toEqual('{"fresh":true}');
});
export default tap.start();
+312
View File
@@ -0,0 +1,312 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as path from 'path';
import * as fs from 'fs';
import * as childProcess from 'child_process';
import * as os from 'os';
import { RustBridge } from '../ts/classes.rustbridge.js';
import type { ICommandDefinition } from '../ts/interfaces/index.js';
const testDir = path.resolve(path.dirname(new URL(import.meta.url).pathname));
const mockServerPath = path.join(testDir, 'helpers/mock-socket-server.mjs');
// Define the command types for our mock binary
type TMockCommands = {
echo: { params: Record<string, any>; result: Record<string, any> };
largeEcho: { params: Record<string, any>; result: Record<string, any> };
error: { params: {}; result: never };
emitEvent: { params: { eventName: string; eventData: any }; result: null };
slow: { params: {}; result: { delayed: boolean } };
exit: { params: {}; result: null };
streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } };
streamError: { params: {}; chunk: { index: number; value: string }; result: never };
streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } };
};
/**
* Start the mock socket server and return the socket path.
* Returns { proc, socketPath }.
*/
async function startMockServer(testName: string): Promise<{ proc: childProcess.ChildProcess; socketPath: string }> {
const socketPath = path.join(os.tmpdir(), `smartrust-test-${Date.now()}-${testName}.sock`);
const proc = childProcess.spawn('node', [mockServerPath, socketPath], {
stdio: ['pipe', 'pipe', 'pipe'],
});
// Wait for the server to signal readiness via stdout
return new Promise((resolve, reject) => {
let stdoutData = '';
const timeout = setTimeout(() => {
proc.kill();
reject(new Error('Mock server did not start within 5s'));
}, 5000);
proc.stdout!.on('data', (data: Buffer) => {
stdoutData += data.toString();
const lines = stdoutData.split('\n');
for (const line of lines) {
if (line.trim()) {
try {
const parsed = JSON.parse(line.trim());
if (parsed.ready) {
clearTimeout(timeout);
resolve({ proc, socketPath: parsed.socketPath });
return;
}
} catch { /* not JSON yet */ }
}
}
});
proc.on('error', (err) => {
clearTimeout(timeout);
reject(err);
});
proc.on('exit', (code) => {
clearTimeout(timeout);
reject(new Error(`Mock server exited with code ${code}`));
});
});
}
function stopMockServer(proc: childProcess.ChildProcess, socketPath: string) {
try { proc.kill('SIGTERM'); } catch { /* ignore */ }
try { fs.unlinkSync(socketPath); } catch { /* ignore */ }
}
// === Socket Transport Tests via RustBridge.connect() ===
tap.test('socket: should connect and receive ready event', async () => {
const { proc, socketPath } = await startMockServer('connect-ready');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
const result = await bridge.connect(socketPath);
expect(result).toBeTrue();
expect(bridge.running).toBeTrue();
bridge.kill();
expect(bridge.running).toBeFalse();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should send command and receive response', async () => {
const { proc, socketPath } = await startMockServer('send-command');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
const result = await bridge.sendCommand('echo', { hello: 'world', num: 42 });
expect(result).toEqual({ hello: 'world', num: 42 });
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should handle error responses', async () => {
const { proc, socketPath } = await startMockServer('error-response');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
let threw = false;
try {
await bridge.sendCommand('error', {});
} catch (err: any) {
threw = true;
expect(err.message).toInclude('Test error message');
}
expect(threw).toBeTrue();
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should receive custom events', async () => {
const { proc, socketPath } = await startMockServer('custom-events');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
const eventPromise = new Promise<any>((resolve) => {
bridge.once('management:testEvent', (data) => resolve(data));
});
await bridge.sendCommand('emitEvent', {
eventName: 'testEvent',
eventData: { key: 'value' },
});
const eventData = await eventPromise;
expect(eventData).toEqual({ key: 'value' });
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should handle multiple concurrent commands', async () => {
const { proc, socketPath } = await startMockServer('concurrent');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge.connect(socketPath);
const results = await Promise.all([
bridge.sendCommand('echo', { id: 1 }),
bridge.sendCommand('echo', { id: 2 }),
bridge.sendCommand('echo', { id: 3 }),
]);
expect(results[0]).toEqual({ id: 1 });
expect(results[1]).toEqual({ id: 2 });
expect(results[2]).toEqual({ id: 3 });
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should handle 1MB payload round-trip', async () => {
const { proc, socketPath } = await startMockServer('large-payload');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
requestTimeoutMs: 30000,
});
await bridge.connect(socketPath);
const largeString = 'x'.repeat(1024 * 1024);
const result = await bridge.sendCommand('largeEcho', { data: largeString });
expect(result.data).toEqual(largeString);
expect(result.data.length).toEqual(1024 * 1024);
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should disconnect without killing the daemon', async () => {
const { proc, socketPath } = await startMockServer('no-kill-daemon');
try {
// First connection
const bridge1 = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
await bridge1.connect(socketPath);
expect(bridge1.running).toBeTrue();
// Disconnect
bridge1.kill();
expect(bridge1.running).toBeFalse();
// Second connection — daemon should still be alive
const bridge2 = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
});
const result = await bridge2.connect(socketPath);
expect(result).toBeTrue();
expect(bridge2.running).toBeTrue();
// Verify the daemon is functional
const echoResult = await bridge2.sendCommand('echo', { reconnected: true });
expect(echoResult).toEqual({ reconnected: true });
bridge2.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should stream responses via socket', async () => {
const { proc, socketPath } = await startMockServer('streaming');
try {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 5000,
requestTimeoutMs: 10000,
});
await bridge.connect(socketPath);
const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 });
const chunks: Array<{ index: number; value: string }> = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
expect(chunks.length).toEqual(5);
for (let i = 0; i < 5; i++) {
expect(chunks[i].index).toEqual(i);
expect(chunks[i].value).toEqual(`chunk_${i}`);
}
const result = await stream.result;
expect(result.totalChunks).toEqual(5);
bridge.kill();
} finally {
stopMockServer(proc, socketPath);
}
});
tap.test('socket: should return false when socket path does not exist', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
readyTimeoutMs: 3000,
});
const result = await bridge.connect('/tmp/nonexistent-smartrust-test.sock');
expect(result).toBeFalse();
expect(bridge.running).toBeFalse();
});
tap.test('socket: should throw when sending command while not connected', async () => {
const bridge = new RustBridge<TMockCommands>({
binaryName: 'mock-daemon',
});
let threw = false;
try {
await bridge.sendCommand('echo', {});
} catch (err: any) {
threw = true;
expect(err.message).toInclude('not running');
}
expect(threw).toBeTrue();
});
export default tap.start();
+2 -2
View File
@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartrust', name: '@push.rocks/smartrust',
version: '1.2.0', version: '1.4.0',
description: 'a bridge between JS engines and rust' description: 'A type-safe bridge between JavaScript engines and Rust binaries.'
} }
+51
View File
@@ -0,0 +1,51 @@
import type { IRustBridgeLogger } from './interfaces/index.js';
/**
* Buffer-based newline scanner for streaming binary data.
* Accumulates chunks and emits complete lines via callback.
* Used by both StdioTransport and SocketTransport.
*/
export class LineScanner {
private buffer: Buffer = Buffer.alloc(0);
private maxPayloadSize: number;
private logger: IRustBridgeLogger;
constructor(maxPayloadSize: number, logger: IRustBridgeLogger) {
this.maxPayloadSize = maxPayloadSize;
this.logger = logger;
}
/**
* Feed a chunk of data. Calls onLine for each complete newline-terminated line found.
*/
public push(chunk: Buffer, onLine: (line: string) => void): void {
this.buffer = Buffer.concat([this.buffer, chunk]);
let newlineIndex: number;
while ((newlineIndex = this.buffer.indexOf(0x0A)) !== -1) {
const lineBuffer = this.buffer.subarray(0, newlineIndex);
this.buffer = this.buffer.subarray(newlineIndex + 1);
if (lineBuffer.length > this.maxPayloadSize) {
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
continue;
}
const line = lineBuffer.toString('utf8').trim();
if (line) {
onLine(line);
}
}
// Prevent OOM if sender never sends newline
if (this.buffer.length > this.maxPayloadSize) {
this.logger.log('error', `Buffer exceeded maxPayloadSize (${this.buffer.length} bytes) without newline, clearing`);
this.buffer = Buffer.alloc(0);
}
}
/** Reset the internal buffer. */
public clear(): void {
this.buffer = Buffer.alloc(0);
}
}
+24
View File
@@ -81,11 +81,18 @@ export class RustBinaryLocator {
plugins.path.resolve(process.cwd(), `rust/target/release/${binaryName}`), plugins.path.resolve(process.cwd(), `rust/target/release/${binaryName}`),
plugins.path.resolve(process.cwd(), `rust/target/debug/${binaryName}`), plugins.path.resolve(process.cwd(), `rust/target/debug/${binaryName}`),
]; ];
const platformSuffix = this.getPlatformSuffix();
for (const localPath of localPaths) { for (const localPath of localPaths) {
if (await this.isExecutable(localPath)) { if (await this.isExecutable(localPath)) {
this.logger.log('info', `Binary found at local path: ${localPath}`); this.logger.log('info', `Binary found at local path: ${localPath}`);
return localPath; return localPath;
} }
// Also try with platform suffix (tsrust convention: binaryName_linux_amd64)
const suffixedPath = `${localPath}_${platformSuffix}`;
if (await this.isExecutable(suffixedPath)) {
this.logger.log('info', `Binary found at local path (platform-suffixed): ${suffixedPath}`);
return suffixedPath;
}
} }
// 5. System PATH // 5. System PATH
@@ -123,10 +130,27 @@ export class RustBinaryLocator {
try { try {
await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK); await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK);
return true; return true;
} catch {
// File may exist but lack execute bit (common after npm/pnpm install).
// Try to make it executable.
try {
await plugins.fs.promises.access(filePath, plugins.fs.constants.F_OK);
await plugins.fs.promises.chmod(filePath, 0o755);
this.logger.log('info', `Auto-fixed missing execute permission on: ${filePath}`);
return true;
} catch { } catch {
return false; return false;
} }
} }
}
private getPlatformSuffix(): string {
const archMap: Record<string, string> = { x64: 'amd64', arm64: 'arm64' };
const platformMap: Record<string, string> = { linux: 'linux', darwin: 'darwin', win32: 'windows' };
const platform = platformMap[process.platform] || process.platform;
const arch = archMap[process.arch] || process.arch;
return `${platform}_${arch}`;
}
private async findInPath(binaryName: string): Promise<string | null> { private async findInPath(binaryName: string): Promise<string | null> {
const pathDirs = (process.env.PATH || '').split(plugins.path.delimiter); const pathDirs = (process.env.PATH || '').split(plugins.path.delimiter);
+83 -137
View File
@@ -1,15 +1,19 @@
import * as plugins from './plugins.js'; import * as plugins from './plugins.js';
import { RustBinaryLocator } from './classes.rustbinarylocator.js'; import { RustBinaryLocator } from './classes.rustbinarylocator.js';
import { StreamingResponse } from './classes.streamingresponse.js'; import { StreamingResponse } from './classes.streamingresponse.js';
import { StdioTransport } from './classes.stdiotransport.js';
import { SocketTransport } from './classes.sockettransport.js';
import type { import type {
IRustBridgeOptions, IRustBridgeOptions,
IRustBridgeLogger, IRustBridgeLogger,
ISocketConnectOptions,
TCommandMap, TCommandMap,
IManagementRequest, IManagementRequest,
IManagementResponse, IManagementResponse,
IManagementEvent, IManagementEvent,
TStreamingCommandKeys, TStreamingCommandKeys,
TExtractChunk, TExtractChunk,
IRustTransport,
} from './interfaces/index.js'; } from './interfaces/index.js';
const defaultLogger: IRustBridgeLogger = { const defaultLogger: IRustBridgeLogger = {
@@ -18,7 +22,8 @@ const defaultLogger: IRustBridgeLogger = {
/** /**
* Generic bridge between TypeScript and a Rust binary. * Generic bridge between TypeScript and a Rust binary.
* Communicates via JSON-over-stdin/stdout IPC protocol. * Communicates via JSON-over-stdin/stdout IPC protocol (stdio mode)
* or JSON-over-Unix-socket/named-pipe (socket mode).
* *
* @typeParam TCommands - Map of command names to their param/result types * @typeParam TCommands - Map of command names to their param/result types
*/ */
@@ -26,9 +31,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
private locator: RustBinaryLocator; private locator: RustBinaryLocator;
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions; private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName' | 'maxPayloadSize'>> & IRustBridgeOptions;
private logger: IRustBridgeLogger; private logger: IRustBridgeLogger;
private childProcess: plugins.childProcess.ChildProcess | null = null; private transport: IRustTransport | null = null;
private stdoutBuffer: Buffer = Buffer.alloc(0);
private stderrRemainder: string = '';
private pendingRequests = new Map<string, { private pendingRequests = new Map<string, {
resolve: (value: any) => void; resolve: (value: any) => void;
reject: (error: Error) => void; reject: (error: Error) => void;
@@ -63,56 +66,74 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
return false; return false;
} }
const transport = new StdioTransport({
binaryPath: this.binaryPath,
cliArgs: this.options.cliArgs,
env: this.options.env,
maxPayloadSize: this.options.maxPayloadSize,
logger: this.logger,
});
return this.connectWithTransport(transport);
}
/**
* Connect to an already-running Rust daemon via Unix socket or named pipe.
* Returns true if the connection was established and the daemon signaled readiness.
*
* @param socketPath - Path to Unix socket or Windows named pipe
* @param socketOptions - Optional socket connection options (reconnect, etc.)
*/
public async connect(socketPath: string, socketOptions?: ISocketConnectOptions): Promise<boolean> {
const transport = new SocketTransport({
socketPath,
maxPayloadSize: this.options.maxPayloadSize,
logger: this.logger,
autoReconnect: socketOptions?.autoReconnect,
reconnectBaseDelayMs: socketOptions?.reconnectBaseDelayMs,
reconnectMaxDelayMs: socketOptions?.reconnectMaxDelayMs,
maxReconnectAttempts: socketOptions?.maxReconnectAttempts,
});
return this.connectWithTransport(transport);
}
/**
* Internal: wire up any transport and wait for the ready handshake.
*/
private connectWithTransport(transport: IRustTransport): Promise<boolean> {
return new Promise<boolean>((resolve) => { return new Promise<boolean>((resolve) => {
try { try {
const env = this.options.env this.transport = transport;
? { ...process.env, ...this.options.env }
: { ...process.env };
this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, { // Wire transport events
stdio: ['pipe', 'pipe', 'pipe'], transport.on('line', (line: string) => this.handleLine(line));
env,
transport.on('stderr', (line: string) => {
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
this.emit('stderr', line);
}); });
// Handle stderr with cross-chunk buffering transport.on('close', (...args: any[]) => {
this.childProcess.stderr?.on('data', (data: Buffer) => { this.logger.log('info', `Transport closed`);
this.stderrRemainder += data.toString();
const lines = this.stderrRemainder.split('\n');
// Keep the last element (incomplete line) as remainder
this.stderrRemainder = lines.pop()!;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`);
this.emit('stderr', trimmed);
}
}
});
// Handle stdout via Buffer-based newline scanner
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
this.handleStdoutChunk(chunk);
});
// Handle process exit
this.childProcess.on('exit', (code, signal) => {
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
// Flush any remaining stderr
if (this.stderrRemainder.trim()) {
this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`);
this.emit('stderr', this.stderrRemainder.trim());
}
this.cleanup(); this.cleanup();
this.emit('exit', code, signal); this.emit('exit', ...args);
}); });
this.childProcess.on('error', (err) => { transport.on('error', (err: Error) => {
this.logger.log('error', `Process error: ${err.message}`); this.logger.log('error', `Transport error: ${err.message}`);
this.cleanup(); this.cleanup();
resolve(false); resolve(false);
}); });
// Wait for the ready event transport.on('reconnected', () => {
this.logger.log('info', 'Transport reconnected, waiting for ready event');
this.emit('reconnected');
});
// Connect the transport
transport.connect().then(() => {
// Wait for the ready event from the protocol layer
const readyTimeout = setTimeout(() => { const readyTimeout = setTimeout(() => {
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`); this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
this.kill(); this.kill();
@@ -126,8 +147,12 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
this.emit('ready'); this.emit('ready');
resolve(true); resolve(true);
}); });
}).catch((err: Error) => {
this.logger.log('error', `Transport connect failed: ${err.message}`);
resolve(false);
});
} catch (err: any) { } catch (err: any) {
this.logger.log('error', `Failed to spawn: ${err.message}`); this.logger.log('error', `Failed to connect: ${err.message}`);
resolve(false); resolve(false);
} }
}); });
@@ -140,7 +165,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
method: K, method: K,
params: TCommands[K]['params'], params: TCommands[K]['params'],
): Promise<TCommands[K]['result']> { ): Promise<TCommands[K]['result']> {
if (!this.childProcess || !this.isRunning) { if (!this.transport?.connected || !this.isRunning) {
throw new Error(`${this.options.binaryName} bridge is not running`); throw new Error(`${this.options.binaryName} bridge is not running`);
} }
@@ -164,10 +189,10 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
this.pendingRequests.set(id, { resolve, reject, timer }); this.pendingRequests.set(id, { resolve, reject, timer });
this.writeToStdin(json + '\n').catch((err) => { this.transport!.write(json + '\n').catch((err) => {
clearTimeout(timer); clearTimeout(timer);
this.pendingRequests.delete(id); this.pendingRequests.delete(id);
reject(new Error(`Failed to write to stdin: ${err.message}`)); reject(new Error(`Failed to write to transport: ${err.message}`));
}); });
}); });
} }
@@ -183,7 +208,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> { ): StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']> {
const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>(); const streaming = new StreamingResponse<TExtractChunk<TCommands[K]>, TCommands[K]['result']>();
if (!this.childProcess || !this.isRunning) { if (!this.transport?.connected || !this.isRunning) {
streaming.fail(new Error(`${this.options.binaryName} bridge is not running`)); streaming.fail(new Error(`${this.options.binaryName} bridge is not running`));
return streaming; return streaming;
} }
@@ -213,28 +238,26 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
streaming, streaming,
}); });
this.writeToStdin(json + '\n').catch((err) => { this.transport!.write(json + '\n').catch((err) => {
clearTimeout(timer); clearTimeout(timer);
this.pendingRequests.delete(id); this.pendingRequests.delete(id);
streaming.fail(new Error(`Failed to write to stdin: ${err.message}`)); streaming.fail(new Error(`Failed to write to transport: ${err.message}`));
}); });
return streaming; return streaming;
} }
/** /**
* Kill the Rust process and clean up all resources. * Kill the connection and clean up all resources.
* For stdio: kills the child process (SIGTERM, then SIGKILL).
* For socket: closes the socket connection (does NOT kill the daemon).
*/ */
public kill(): void { public kill(): void {
if (this.childProcess) { if (this.transport) {
const proc = this.childProcess; const transport = this.transport;
this.childProcess = null; this.transport = null;
this.isRunning = false; this.isRunning = false;
// Clear buffers
this.stdoutBuffer = Buffer.alloc(0);
this.stderrRemainder = '';
// Reject pending requests // Reject pending requests
for (const [, pending] of this.pendingRequests) { for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer); clearTimeout(pending.timer);
@@ -242,27 +265,8 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
} }
this.pendingRequests.clear(); this.pendingRequests.clear();
// Remove all listeners transport.removeAllListeners();
proc.removeAllListeners(); transport.disconnect();
proc.stdout?.removeAllListeners();
proc.stderr?.removeAllListeners();
proc.stdin?.removeAllListeners();
// Kill the process
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
// Destroy stdio pipes
try { proc.stdin?.destroy(); } catch { /* ignore */ }
try { proc.stdout?.destroy(); } catch { /* ignore */ }
try { proc.stderr?.destroy(); } catch { /* ignore */ }
// Unref so Node doesn't wait
try { proc.unref(); } catch { /* ignore */ }
// Force kill after 5 seconds
setTimeout(() => {
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
}, 5000).unref();
} }
} }
@@ -273,62 +277,6 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
return this.isRunning; return this.isRunning;
} }
/**
* Buffer-based newline scanner for stdout chunks.
* Replaces readline to handle large payloads without buffering entire lines in a separate abstraction.
*/
private handleStdoutChunk(chunk: Buffer): void {
this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]);
let newlineIndex: number;
while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) {
const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex);
this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1);
if (lineBuffer.length > this.options.maxPayloadSize) {
this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`);
continue;
}
const line = lineBuffer.toString('utf8').trim();
this.handleLine(line);
}
// If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM
if (this.stdoutBuffer.length > this.options.maxPayloadSize) {
this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`);
this.stdoutBuffer = Buffer.alloc(0);
}
}
/**
* Write data to stdin with backpressure support.
* Waits for drain if the internal buffer is full.
*/
private writeToStdin(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.childProcess?.stdin) {
reject(new Error('stdin not available'));
return;
}
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
// Wait for drain before resolving
this.childProcess.stdin.once('drain', () => {
resolve();
});
}
});
}
private handleLine(line: string): void { private handleLine(line: string): void {
if (!line) return; if (!line) return;
@@ -381,9 +329,7 @@ export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plu
private cleanup(): void { private cleanup(): void {
this.isRunning = false; this.isRunning = false;
this.childProcess = null; this.transport = null;
this.stdoutBuffer = Buffer.alloc(0);
this.stderrRemainder = '';
// Reject all pending requests // Reject all pending requests
for (const [, pending] of this.pendingRequests) { for (const [, pending] of this.pendingRequests) {
+187
View File
@@ -0,0 +1,187 @@
import * as plugins from './plugins.js';
import { LineScanner } from './classes.linescanner.js';
import type { IRustBridgeLogger } from './interfaces/index.js';
import type { IRustTransport } from './interfaces/transport.js';
export interface ISocketTransportOptions {
/** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */
socketPath: string;
/** Maximum inbound message size in bytes */
maxPayloadSize: number;
/** Logger instance */
logger: IRustBridgeLogger;
/** Enable auto-reconnect on unexpected disconnect (default: false) */
autoReconnect?: boolean;
/** Initial delay between reconnect attempts in ms (default: 100) */
reconnectBaseDelayMs?: number;
/** Maximum delay between reconnect attempts in ms (default: 30000) */
reconnectMaxDelayMs?: number;
/** Maximum number of reconnect attempts before giving up (default: 10) */
maxReconnectAttempts?: number;
}
interface IResolvedSocketTransportOptions {
socketPath: string;
maxPayloadSize: number;
logger: IRustBridgeLogger;
autoReconnect: boolean;
reconnectBaseDelayMs: number;
reconnectMaxDelayMs: number;
maxReconnectAttempts: number;
}
/**
* Transport that connects to an already-running process via Unix socket or Windows named pipe.
* The JSON-over-newline protocol is identical to stdio; only the transport changes.
*/
export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport {
private options: IResolvedSocketTransportOptions;
private socket: plugins.net.Socket | null = null;
private lineScanner: LineScanner;
private _connected: boolean = false;
private intentionalDisconnect: boolean = false;
private reconnectAttempts: number = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ISocketTransportOptions) {
super();
this.options = {
autoReconnect: false,
reconnectBaseDelayMs: 100,
reconnectMaxDelayMs: 30000,
maxReconnectAttempts: 10,
...options,
};
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
}
public get connected(): boolean {
return this._connected;
}
/**
* Connect to the socket. Resolves when the TCP/Unix connection is established.
*/
public async connect(): Promise<void> {
this.intentionalDisconnect = false;
this.reconnectAttempts = 0;
return this.doConnect();
}
private doConnect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
let settled = false;
this.socket = plugins.net.connect({ path: this.options.socketPath });
this.socket.on('connect', () => {
if (!settled) {
settled = true;
this._connected = true;
this.reconnectAttempts = 0;
resolve();
}
});
this.socket.on('data', (chunk: Buffer) => {
this.lineScanner.push(chunk, (line) => {
this.emit('line', line);
});
});
this.socket.on('close', () => {
const wasConnected = this._connected;
this._connected = false;
this.lineScanner.clear();
if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) {
this.attemptReconnect();
}
this.emit('close');
});
this.socket.on('error', (err: Error) => {
this._connected = false;
if (!settled) {
settled = true;
reject(err);
} else if (!this.intentionalDisconnect) {
this.emit('error', err);
}
});
});
}
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`);
this.emit('reconnect_failed');
return;
}
const delay = Math.min(
this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts),
this.options.reconnectMaxDelayMs,
);
this.reconnectAttempts++;
this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
try {
await this.doConnect();
this.emit('reconnected');
} catch {
// doConnect rejected — the 'close' handler on the new socket will trigger another attempt
}
}, delay);
}
/**
* Write data to the socket with backpressure support.
*/
public async write(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.socket || !this._connected) {
reject(new Error('Socket not connected'));
return;
}
const canContinue = this.socket.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
this.socket.once('drain', () => {
resolve();
});
}
});
}
/**
* Close the socket connection. Does NOT kill the remote daemon.
*/
public disconnect(): void {
this.intentionalDisconnect = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.socket) {
const sock = this.socket;
this.socket = null;
this._connected = false;
this.lineScanner.clear();
sock.removeAllListeners();
sock.destroy();
}
}
}
+149
View File
@@ -0,0 +1,149 @@
import * as plugins from './plugins.js';
import { LineScanner } from './classes.linescanner.js';
import type { IRustBridgeLogger } from './interfaces/index.js';
import type { IRustTransport } from './interfaces/transport.js';
export interface IStdioTransportOptions {
binaryPath: string;
cliArgs: string[];
env?: Record<string, string>;
maxPayloadSize: number;
logger: IRustBridgeLogger;
}
/**
* Transport that spawns a child process and communicates via stdin/stdout.
* Extracted from the original RustBridge process management logic.
*/
export class StdioTransport extends plugins.events.EventEmitter implements IRustTransport {
private options: IStdioTransportOptions;
private childProcess: plugins.childProcess.ChildProcess | null = null;
private lineScanner: LineScanner;
private stderrRemainder: string = '';
private _connected: boolean = false;
constructor(options: IStdioTransportOptions) {
super();
this.options = options;
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
}
public get connected(): boolean {
return this._connected;
}
/**
* Spawn the child process. Resolves when the process is running (not necessarily ready).
*/
public async connect(): Promise<void> {
const env = this.options.env
? { ...process.env, ...this.options.env }
: { ...process.env };
this.childProcess = plugins.childProcess.spawn(
this.options.binaryPath,
this.options.cliArgs,
{ stdio: ['pipe', 'pipe', 'pipe'], env },
);
// Handle stderr with cross-chunk buffering
this.childProcess.stderr?.on('data', (data: Buffer) => {
this.stderrRemainder += data.toString();
const lines = this.stderrRemainder.split('\n');
this.stderrRemainder = lines.pop()!;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
this.emit('stderr', trimmed);
}
}
});
// Handle stdout via LineScanner
this.childProcess.stdout!.on('data', (chunk: Buffer) => {
this.lineScanner.push(chunk, (line) => {
this.emit('line', line);
});
});
// Handle process exit
this.childProcess.on('exit', (code: number | null, signal: string | null) => {
// Flush remaining stderr
if (this.stderrRemainder.trim()) {
this.emit('stderr', this.stderrRemainder.trim());
}
this._connected = false;
this.lineScanner.clear();
this.stderrRemainder = '';
this.emit('close', code, signal);
});
this.childProcess.on('error', (err: Error) => {
this._connected = false;
this.emit('error', err);
});
this._connected = true;
}
/**
* Write data to stdin with backpressure support.
*/
public async write(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.childProcess?.stdin) {
reject(new Error('stdin not available'));
return;
}
const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
this.childProcess.stdin.once('drain', () => {
resolve();
});
}
});
}
/**
* Kill the child process. Sends SIGTERM, then SIGKILL after 5s.
*/
public disconnect(): void {
if (!this.childProcess) return;
const proc = this.childProcess;
this.childProcess = null;
this._connected = false;
this.lineScanner.clear();
this.stderrRemainder = '';
// Remove all listeners
proc.removeAllListeners();
proc.stdout?.removeAllListeners();
proc.stderr?.removeAllListeners();
proc.stdin?.removeAllListeners();
// Kill the process
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
// Destroy stdio pipes
try { proc.stdin?.destroy(); } catch { /* ignore */ }
try { proc.stdout?.destroy(); } catch { /* ignore */ }
try { proc.stderr?.destroy(); } catch { /* ignore */ }
// Unref so Node doesn't wait
try { proc.unref(); } catch { /* ignore */ }
// Force kill after 5 seconds
setTimeout(() => {
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
}, 5000).unref();
}
}
+3
View File
@@ -1,4 +1,7 @@
export { RustBridge } from './classes.rustbridge.js'; export { RustBridge } from './classes.rustbridge.js';
export { RustBinaryLocator } from './classes.rustbinarylocator.js'; export { RustBinaryLocator } from './classes.rustbinarylocator.js';
export { StreamingResponse } from './classes.streamingresponse.js'; export { StreamingResponse } from './classes.streamingresponse.js';
export { StdioTransport } from './classes.stdiotransport.js';
export { SocketTransport } from './classes.sockettransport.js';
export { LineScanner } from './classes.linescanner.js';
export * from './interfaces/index.js'; export * from './interfaces/index.js';
+14
View File
@@ -45,3 +45,17 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions {
* Resets on each chunk received. */ * Resets on each chunk received. */
streamTimeoutMs?: number; streamTimeoutMs?: number;
} }
/**
* Options for connecting to an already-running daemon via Unix socket or named pipe.
*/
export interface ISocketConnectOptions {
/** Enable auto-reconnect on unexpected disconnect (default: false) */
autoReconnect?: boolean;
/** Initial delay between reconnect attempts in ms (default: 100) */
reconnectBaseDelayMs?: number;
/** Maximum delay between reconnect attempts in ms (default: 30000) */
reconnectMaxDelayMs?: number;
/** Maximum number of reconnect attempts before giving up (default: 10) */
maxReconnectAttempts?: number;
}
+1
View File
@@ -1,2 +1,3 @@
export * from './ipc.js'; export * from './ipc.js';
export * from './config.js'; export * from './config.js';
export * from './transport.js';
+26
View File
@@ -0,0 +1,26 @@
import type * as plugins from '../plugins.js';
/**
* Abstract transport for communicating with a Rust process.
* Both stdio and socket transports implement this interface.
*
* Events emitted:
* - 'line' (line: string) — a complete newline-terminated JSON line received
* - 'error' (err: Error) — transport-level error
* - 'close' (...args: any[]) — transport has closed/disconnected
* - 'stderr' (line: string) — stderr output (stdio transport only)
* - 'reconnected' () — transport reconnected after unexpected disconnect (socket only)
*/
export interface IRustTransport extends plugins.events.EventEmitter {
/** Connect the transport (spawn process or connect to socket). Resolves when I/O channel is open. */
connect(): Promise<void>;
/** Write a string to the transport. Handles backpressure. */
write(data: string): Promise<void>;
/** Disconnect the transport. For stdio: kills the process. For socket: closes the connection. */
disconnect(): void;
/** Whether the transport is currently connected and writable. */
readonly connected: boolean;
}
+2 -1
View File
@@ -4,8 +4,9 @@ import * as fs from 'fs';
import * as childProcess from 'child_process'; import * as childProcess from 'child_process';
import * as events from 'events'; import * as events from 'events';
import * as url from 'url'; import * as url from 'url';
import * as net from 'net';
export { path, fs, childProcess, events, url }; export { path, fs, childProcess, events, url, net };
// @push.rocks scope // @push.rocks scope
import * as smartpath from '@push.rocks/smartpath'; import * as smartpath from '@push.rocks/smartpath';
+5 -1
View File
@@ -4,7 +4,11 @@
"module": "NodeNext", "module": "NodeNext",
"moduleResolution": "NodeNext", "moduleResolution": "NodeNext",
"esModuleInterop": true, "esModuleInterop": true,
"verbatimModuleSyntax": true "verbatimModuleSyntax": true,
"noImplicitAny": true,
"types": [
"node"
]
}, },
"exclude": [ "exclude": [
"dist_*/**/*.d.ts" "dist_*/**/*.d.ts"