diff --git a/changelog.md b/changelog.md index 16f7f13..8c4c2ca 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # Changelog +## 2026-02-26 - 1.3.0 - feat(transport) +introduce transport abstraction and socket-mode support for RustBridge + +- Add IRustTransport interface and two transport implementations: StdioTransport (spawns child process and uses stdin/stdout) and SocketTransport (connects to Unix socket / Windows named pipe). +- Refactor RustBridge to use a transport abstraction (connectWithTransport) and add connect(socketPath) to attach to an existing daemon via socket. +- Introduce LineScanner: a buffer-based newline scanner used by both transports to handle large/newline-delimited messages and avoid OOMs. +- Add socket connection options (autoReconnect, reconnectBaseDelayMs, reconnectMaxDelayMs, maxReconnectAttempts) and implement auto-reconnect/backoff behavior in SocketTransport. +- Implement backpressure-aware write semantics and proper disconnect/cleanup for both transports; RustBridge.kill() now disconnects the transport instead of directly managing processes. +- Add tests and tooling: socket transport tests, line scanner tests, and a mock-socket-server.mjs helper script for testing socket mode. +- Export new symbols (StdioTransport, SocketTransport, LineScanner) and update plugins to expose net; update interfaces to export transport types. + ## 2026-02-12 - 1.2.1 - fix(rust-binary-locator) auto-fix missing execute permission for located Rust binaries diff --git a/test/helpers/mock-socket-server.mjs b/test/helpers/mock-socket-server.mjs new file mode 100644 index 0000000..68fc3dd --- /dev/null +++ b/test/helpers/mock-socket-server.mjs @@ -0,0 +1,124 @@ +#!/usr/bin/env node + +/** + * Mock "Rust daemon" for testing the SocketTransport and RustBridge socket mode. + * Creates a Unix socket server, accepts connections, and speaks the same + * JSON-over-newline IPC protocol as mock-rust-binary.mjs. + * + * Usage: node mock-socket-server.mjs + * Signals readiness by writing a JSON line to stdout: {"socketPath":"...","ready":true} + */ + +import * as net from 'net'; +import * as fs from 'fs'; + +const socketPath = process.argv[2]; +if (!socketPath) { + process.stderr.write('Usage: mock-socket-server.mjs \n'); + process.exit(1); +} + +// Remove stale socket file +try { fs.unlinkSync(socketPath); } catch { /* ignore */ } + +/** + * Backpressure-aware write to a socket. + */ +function writeResponse(conn, data) { + const json = JSON.stringify(data) + '\n'; + if (!conn.write(json)) { + conn.once('drain', () => {}); + } +} + +function handleLine(line, conn) { + let request; + try { + request = JSON.parse(line); + } catch { + return; + } + + const { id, method, params } = request; + + if (method === 'echo') { + writeResponse(conn, { id, success: true, result: params }); + } else if (method === 'largeEcho') { + writeResponse(conn, { id, success: true, result: params }); + } else if (method === 'error') { + writeResponse(conn, { id, success: false, error: 'Test error message' }); + } else if (method === 'emitEvent') { + writeResponse(conn, { event: params.eventName, data: params.eventData }); + writeResponse(conn, { id, success: true, result: null }); + } else if (method === 'slow') { + setTimeout(() => { + writeResponse(conn, { id, success: true, result: { delayed: true } }); + }, 100); + } else if (method === 'streamEcho') { + const count = params.count || 0; + let sent = 0; + const interval = setInterval(() => { + if (sent < count) { + writeResponse(conn, { id, stream: true, data: { index: sent, value: `chunk_${sent}` } }); + sent++; + } else { + clearInterval(interval); + writeResponse(conn, { id, success: true, result: { totalChunks: count } }); + } + }, 10); + } else if (method === 'streamError') { + writeResponse(conn, { id, stream: true, data: { index: 0, value: 'before_error' } }); + setTimeout(() => { + writeResponse(conn, { id, success: false, error: 'Stream error after chunk' }); + }, 20); + } else if (method === 'streamEmpty') { + writeResponse(conn, { id, success: true, result: { totalChunks: 0 } }); + } else if (method === 'exit') { + writeResponse(conn, { id, success: true, result: null }); + // In socket mode, 'exit' just closes this connection, not the server + setTimeout(() => conn.end(), 50); + } else { + writeResponse(conn, { id, success: false, error: `Unknown method: ${method}` }); + } +} + +const server = net.createServer((conn) => { + // Send ready event on each new connection + writeResponse(conn, { event: 'ready', data: { version: '1.0.0' } }); + + // Buffer-based newline scanner for incoming data + let buffer = Buffer.alloc(0); + conn.on('data', (chunk) => { + buffer = Buffer.concat([buffer, chunk]); + let idx; + while ((idx = buffer.indexOf(0x0A)) !== -1) { + const lineBuffer = buffer.subarray(0, idx); + buffer = buffer.subarray(idx + 1); + const line = lineBuffer.toString('utf8').trim(); + if (line) { + handleLine(line, conn); + } + } + }); + + conn.on('error', () => { /* ignore client errors */ }); +}); + +server.listen(socketPath, () => { + // Signal to parent that the server is ready + process.stdout.write(JSON.stringify({ socketPath, ready: true }) + '\n'); +}); + +// Handle SIGTERM gracefully +process.on('SIGTERM', () => { + server.close(); + try { fs.unlinkSync(socketPath); } catch { /* ignore */ } + process.exit(0); +}); + +// Handle SIGINT +process.on('SIGINT', () => { + server.close(); + try { fs.unlinkSync(socketPath); } catch { /* ignore */ } + process.exit(0); +}); diff --git a/test/test.linescanner.node.ts b/test/test.linescanner.node.ts new file mode 100644 index 0000000..048d977 --- /dev/null +++ b/test/test.linescanner.node.ts @@ -0,0 +1,89 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import { LineScanner } from '../ts/classes.linescanner.js'; + +const noopLogger = { log() {} }; + +tap.test('should parse a single complete line', async () => { + const scanner = new LineScanner(1024 * 1024, noopLogger); + const lines: string[] = []; + scanner.push(Buffer.from('{"hello":"world"}\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(1); + expect(lines[0]).toEqual('{"hello":"world"}'); +}); + +tap.test('should parse multiple lines in one chunk', async () => { + const scanner = new LineScanner(1024 * 1024, noopLogger); + const lines: string[] = []; + scanner.push(Buffer.from('{"a":1}\n{"b":2}\n{"c":3}\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(3); + expect(lines[0]).toEqual('{"a":1}'); + expect(lines[1]).toEqual('{"b":2}'); + expect(lines[2]).toEqual('{"c":3}'); +}); + +tap.test('should handle a line split across chunks', async () => { + const scanner = new LineScanner(1024 * 1024, noopLogger); + const lines: string[] = []; + scanner.push(Buffer.from('{"hel'), (line) => lines.push(line)); + expect(lines.length).toEqual(0); + scanner.push(Buffer.from('lo":"world"}\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(1); + expect(lines[0]).toEqual('{"hello":"world"}'); +}); + +tap.test('should drop oversized lines', async () => { + const scanner = new LineScanner(100, noopLogger); + const lines: string[] = []; + // Line is 200 chars + newline, exceeds maxPayloadSize of 100 + const oversized = 'x'.repeat(200) + '\n'; + scanner.push(Buffer.from(oversized), (line) => lines.push(line)); + expect(lines.length).toEqual(0); +}); + +tap.test('should clear buffer on OOM (no newline, exceeds max)', async () => { + const scanner = new LineScanner(100, noopLogger); + const lines: string[] = []; + // Push 200 bytes without any newline — exceeds maxPayloadSize + scanner.push(Buffer.from('x'.repeat(200)), (line) => lines.push(line)); + expect(lines.length).toEqual(0); + // After clearing, should work normally again + scanner.push(Buffer.from('{"ok":true}\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(1); + expect(lines[0]).toEqual('{"ok":true}'); +}); + +tap.test('should skip empty lines', async () => { + const scanner = new LineScanner(1024 * 1024, noopLogger); + const lines: string[] = []; + scanner.push(Buffer.from('\n\n{"a":1}\n\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(1); + expect(lines[0]).toEqual('{"a":1}'); +}); + +tap.test('should handle mixed complete and partial lines', async () => { + const scanner = new LineScanner(1024 * 1024, noopLogger); + const lines: string[] = []; + // First chunk: one complete line + start of second line + scanner.push(Buffer.from('{"first":1}\n{"sec'), (line) => lines.push(line)); + expect(lines.length).toEqual(1); + // Second chunk: end of second line + complete third line + scanner.push(Buffer.from('ond":2}\n{"third":3}\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(3); + expect(lines[1]).toEqual('{"second":2}'); + expect(lines[2]).toEqual('{"third":3}'); +}); + +tap.test('clear should reset the buffer', async () => { + const scanner = new LineScanner(1024 * 1024, noopLogger); + const lines: string[] = []; + // Push partial data + scanner.push(Buffer.from('{"partial":'), (line) => lines.push(line)); + // Clear + scanner.clear(); + // Now push a complete new line — old partial should not affect it + scanner.push(Buffer.from('{"fresh":true}\n'), (line) => lines.push(line)); + expect(lines.length).toEqual(1); + expect(lines[0]).toEqual('{"fresh":true}'); +}); + +export default tap.start(); diff --git a/test/test.sockettransport.node.ts b/test/test.sockettransport.node.ts new file mode 100644 index 0000000..1059353 --- /dev/null +++ b/test/test.sockettransport.node.ts @@ -0,0 +1,312 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as path from 'path'; +import * as fs from 'fs'; +import * as childProcess from 'child_process'; +import * as os from 'os'; +import { RustBridge } from '../ts/classes.rustbridge.js'; +import type { ICommandDefinition } from '../ts/interfaces/index.js'; + +const testDir = path.resolve(path.dirname(new URL(import.meta.url).pathname)); +const mockServerPath = path.join(testDir, 'helpers/mock-socket-server.mjs'); + +// Define the command types for our mock binary +type TMockCommands = { + echo: { params: Record; result: Record }; + largeEcho: { params: Record; result: Record }; + error: { params: {}; result: never }; + emitEvent: { params: { eventName: string; eventData: any }; result: null }; + slow: { params: {}; result: { delayed: boolean } }; + exit: { params: {}; result: null }; + streamEcho: { params: { count: number }; chunk: { index: number; value: string }; result: { totalChunks: number } }; + streamError: { params: {}; chunk: { index: number; value: string }; result: never }; + streamEmpty: { params: {}; chunk: never; result: { totalChunks: number } }; +}; + +/** + * Start the mock socket server and return the socket path. + * Returns { proc, socketPath }. + */ +async function startMockServer(testName: string): Promise<{ proc: childProcess.ChildProcess; socketPath: string }> { + const socketPath = path.join(os.tmpdir(), `smartrust-test-${Date.now()}-${testName}.sock`); + + const proc = childProcess.spawn('node', [mockServerPath, socketPath], { + stdio: ['pipe', 'pipe', 'pipe'], + }); + + // Wait for the server to signal readiness via stdout + return new Promise((resolve, reject) => { + let stdoutData = ''; + const timeout = setTimeout(() => { + proc.kill(); + reject(new Error('Mock server did not start within 5s')); + }, 5000); + + proc.stdout!.on('data', (data: Buffer) => { + stdoutData += data.toString(); + const lines = stdoutData.split('\n'); + for (const line of lines) { + if (line.trim()) { + try { + const parsed = JSON.parse(line.trim()); + if (parsed.ready) { + clearTimeout(timeout); + resolve({ proc, socketPath: parsed.socketPath }); + return; + } + } catch { /* not JSON yet */ } + } + } + }); + + proc.on('error', (err) => { + clearTimeout(timeout); + reject(err); + }); + + proc.on('exit', (code) => { + clearTimeout(timeout); + reject(new Error(`Mock server exited with code ${code}`)); + }); + }); +} + +function stopMockServer(proc: childProcess.ChildProcess, socketPath: string) { + try { proc.kill('SIGTERM'); } catch { /* ignore */ } + try { fs.unlinkSync(socketPath); } catch { /* ignore */ } +} + +// === Socket Transport Tests via RustBridge.connect() === + +tap.test('socket: should connect and receive ready event', async () => { + const { proc, socketPath } = await startMockServer('connect-ready'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + + const result = await bridge.connect(socketPath); + expect(result).toBeTrue(); + expect(bridge.running).toBeTrue(); + + bridge.kill(); + expect(bridge.running).toBeFalse(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should send command and receive response', async () => { + const { proc, socketPath } = await startMockServer('send-command'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + + await bridge.connect(socketPath); + + const result = await bridge.sendCommand('echo', { hello: 'world', num: 42 }); + expect(result).toEqual({ hello: 'world', num: 42 }); + + bridge.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should handle error responses', async () => { + const { proc, socketPath } = await startMockServer('error-response'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + + await bridge.connect(socketPath); + + let threw = false; + try { + await bridge.sendCommand('error', {}); + } catch (err: any) { + threw = true; + expect(err.message).toInclude('Test error message'); + } + expect(threw).toBeTrue(); + + bridge.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should receive custom events', async () => { + const { proc, socketPath } = await startMockServer('custom-events'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + + await bridge.connect(socketPath); + + const eventPromise = new Promise((resolve) => { + bridge.once('management:testEvent', (data) => resolve(data)); + }); + + await bridge.sendCommand('emitEvent', { + eventName: 'testEvent', + eventData: { key: 'value' }, + }); + + const eventData = await eventPromise; + expect(eventData).toEqual({ key: 'value' }); + + bridge.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should handle multiple concurrent commands', async () => { + const { proc, socketPath } = await startMockServer('concurrent'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + + await bridge.connect(socketPath); + + const results = await Promise.all([ + bridge.sendCommand('echo', { id: 1 }), + bridge.sendCommand('echo', { id: 2 }), + bridge.sendCommand('echo', { id: 3 }), + ]); + + expect(results[0]).toEqual({ id: 1 }); + expect(results[1]).toEqual({ id: 2 }); + expect(results[2]).toEqual({ id: 3 }); + + bridge.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should handle 1MB payload round-trip', async () => { + const { proc, socketPath } = await startMockServer('large-payload'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + requestTimeoutMs: 30000, + }); + + await bridge.connect(socketPath); + + const largeString = 'x'.repeat(1024 * 1024); + const result = await bridge.sendCommand('largeEcho', { data: largeString }); + expect(result.data).toEqual(largeString); + expect(result.data.length).toEqual(1024 * 1024); + + bridge.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should disconnect without killing the daemon', async () => { + const { proc, socketPath } = await startMockServer('no-kill-daemon'); + try { + // First connection + const bridge1 = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + await bridge1.connect(socketPath); + expect(bridge1.running).toBeTrue(); + + // Disconnect + bridge1.kill(); + expect(bridge1.running).toBeFalse(); + + // Second connection — daemon should still be alive + const bridge2 = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + }); + const result = await bridge2.connect(socketPath); + expect(result).toBeTrue(); + expect(bridge2.running).toBeTrue(); + + // Verify the daemon is functional + const echoResult = await bridge2.sendCommand('echo', { reconnected: true }); + expect(echoResult).toEqual({ reconnected: true }); + + bridge2.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should stream responses via socket', async () => { + const { proc, socketPath } = await startMockServer('streaming'); + try { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 5000, + requestTimeoutMs: 10000, + }); + + await bridge.connect(socketPath); + + const stream = bridge.sendCommandStreaming('streamEcho', { count: 5 }); + const chunks: Array<{ index: number; value: string }> = []; + + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks.length).toEqual(5); + for (let i = 0; i < 5; i++) { + expect(chunks[i].index).toEqual(i); + expect(chunks[i].value).toEqual(`chunk_${i}`); + } + + const result = await stream.result; + expect(result.totalChunks).toEqual(5); + + bridge.kill(); + } finally { + stopMockServer(proc, socketPath); + } +}); + +tap.test('socket: should return false when socket path does not exist', async () => { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + readyTimeoutMs: 3000, + }); + + const result = await bridge.connect('/tmp/nonexistent-smartrust-test.sock'); + expect(result).toBeFalse(); + expect(bridge.running).toBeFalse(); +}); + +tap.test('socket: should throw when sending command while not connected', async () => { + const bridge = new RustBridge({ + binaryName: 'mock-daemon', + }); + + let threw = false; + try { + await bridge.sendCommand('echo', {}); + } catch (err: any) { + threw = true; + expect(err.message).toInclude('not running'); + } + expect(threw).toBeTrue(); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index cf246af..8dfdf52 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartrust', - version: '1.2.1', + version: '1.3.0', description: 'a bridge between JS engines and rust' } diff --git a/ts/classes.linescanner.ts b/ts/classes.linescanner.ts new file mode 100644 index 0000000..843378d --- /dev/null +++ b/ts/classes.linescanner.ts @@ -0,0 +1,51 @@ +import type { IRustBridgeLogger } from './interfaces/index.js'; + +/** + * Buffer-based newline scanner for streaming binary data. + * Accumulates chunks and emits complete lines via callback. + * Used by both StdioTransport and SocketTransport. + */ +export class LineScanner { + private buffer: Buffer = Buffer.alloc(0); + private maxPayloadSize: number; + private logger: IRustBridgeLogger; + + constructor(maxPayloadSize: number, logger: IRustBridgeLogger) { + this.maxPayloadSize = maxPayloadSize; + this.logger = logger; + } + + /** + * Feed a chunk of data. Calls onLine for each complete newline-terminated line found. + */ + public push(chunk: Buffer, onLine: (line: string) => void): void { + this.buffer = Buffer.concat([this.buffer, chunk]); + + let newlineIndex: number; + while ((newlineIndex = this.buffer.indexOf(0x0A)) !== -1) { + const lineBuffer = this.buffer.subarray(0, newlineIndex); + this.buffer = this.buffer.subarray(newlineIndex + 1); + + if (lineBuffer.length > this.maxPayloadSize) { + this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`); + continue; + } + + const line = lineBuffer.toString('utf8').trim(); + if (line) { + onLine(line); + } + } + + // Prevent OOM if sender never sends newline + if (this.buffer.length > this.maxPayloadSize) { + this.logger.log('error', `Buffer exceeded maxPayloadSize (${this.buffer.length} bytes) without newline, clearing`); + this.buffer = Buffer.alloc(0); + } + } + + /** Reset the internal buffer. */ + public clear(): void { + this.buffer = Buffer.alloc(0); + } +} diff --git a/ts/classes.rustbridge.ts b/ts/classes.rustbridge.ts index 1537898..44d3651 100644 --- a/ts/classes.rustbridge.ts +++ b/ts/classes.rustbridge.ts @@ -1,15 +1,19 @@ import * as plugins from './plugins.js'; import { RustBinaryLocator } from './classes.rustbinarylocator.js'; import { StreamingResponse } from './classes.streamingresponse.js'; +import { StdioTransport } from './classes.stdiotransport.js'; +import { SocketTransport } from './classes.sockettransport.js'; import type { IRustBridgeOptions, IRustBridgeLogger, + ISocketConnectOptions, TCommandMap, IManagementRequest, IManagementResponse, IManagementEvent, TStreamingCommandKeys, TExtractChunk, + IRustTransport, } from './interfaces/index.js'; const defaultLogger: IRustBridgeLogger = { @@ -18,7 +22,8 @@ const defaultLogger: IRustBridgeLogger = { /** * Generic bridge between TypeScript and a Rust binary. - * Communicates via JSON-over-stdin/stdout IPC protocol. + * Communicates via JSON-over-stdin/stdout IPC protocol (stdio mode) + * or JSON-over-Unix-socket/named-pipe (socket mode). * * @typeParam TCommands - Map of command names to their param/result types */ @@ -26,9 +31,7 @@ export class RustBridge extends plu private locator: RustBinaryLocator; private options: Required> & IRustBridgeOptions; private logger: IRustBridgeLogger; - private childProcess: plugins.childProcess.ChildProcess | null = null; - private stdoutBuffer: Buffer = Buffer.alloc(0); - private stderrRemainder: string = ''; + private transport: IRustTransport | null = null; private pendingRequests = new Map void; reject: (error: Error) => void; @@ -63,71 +66,93 @@ export class RustBridge extends plu return false; } + const transport = new StdioTransport({ + binaryPath: this.binaryPath, + cliArgs: this.options.cliArgs, + env: this.options.env, + maxPayloadSize: this.options.maxPayloadSize, + logger: this.logger, + }); + + return this.connectWithTransport(transport); + } + + /** + * Connect to an already-running Rust daemon via Unix socket or named pipe. + * Returns true if the connection was established and the daemon signaled readiness. + * + * @param socketPath - Path to Unix socket or Windows named pipe + * @param socketOptions - Optional socket connection options (reconnect, etc.) + */ + public async connect(socketPath: string, socketOptions?: ISocketConnectOptions): Promise { + const transport = new SocketTransport({ + socketPath, + maxPayloadSize: this.options.maxPayloadSize, + logger: this.logger, + autoReconnect: socketOptions?.autoReconnect, + reconnectBaseDelayMs: socketOptions?.reconnectBaseDelayMs, + reconnectMaxDelayMs: socketOptions?.reconnectMaxDelayMs, + maxReconnectAttempts: socketOptions?.maxReconnectAttempts, + }); + + return this.connectWithTransport(transport); + } + + /** + * Internal: wire up any transport and wait for the ready handshake. + */ + private connectWithTransport(transport: IRustTransport): Promise { return new Promise((resolve) => { try { - const env = this.options.env - ? { ...process.env, ...this.options.env } - : { ...process.env }; + this.transport = transport; - this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, { - stdio: ['pipe', 'pipe', 'pipe'], - env, + // Wire transport events + transport.on('line', (line: string) => this.handleLine(line)); + + transport.on('stderr', (line: string) => { + this.logger.log('debug', `[${this.options.binaryName}] ${line}`); + this.emit('stderr', line); }); - // Handle stderr with cross-chunk buffering - this.childProcess.stderr?.on('data', (data: Buffer) => { - this.stderrRemainder += data.toString(); - const lines = this.stderrRemainder.split('\n'); - // Keep the last element (incomplete line) as remainder - this.stderrRemainder = lines.pop()!; - for (const line of lines) { - const trimmed = line.trim(); - if (trimmed) { - this.logger.log('debug', `[${this.options.binaryName}] ${trimmed}`); - this.emit('stderr', trimmed); - } - } - }); - - // Handle stdout via Buffer-based newline scanner - this.childProcess.stdout!.on('data', (chunk: Buffer) => { - this.handleStdoutChunk(chunk); - }); - - // Handle process exit - this.childProcess.on('exit', (code, signal) => { - this.logger.log('info', `Process exited (code=${code}, signal=${signal})`); - // Flush any remaining stderr - if (this.stderrRemainder.trim()) { - this.logger.log('debug', `[${this.options.binaryName}] ${this.stderrRemainder.trim()}`); - this.emit('stderr', this.stderrRemainder.trim()); - } + transport.on('close', (...args: any[]) => { + this.logger.log('info', `Transport closed`); this.cleanup(); - this.emit('exit', code, signal); + this.emit('exit', ...args); }); - this.childProcess.on('error', (err) => { - this.logger.log('error', `Process error: ${err.message}`); + transport.on('error', (err: Error) => { + this.logger.log('error', `Transport error: ${err.message}`); this.cleanup(); resolve(false); }); - // Wait for the ready event - const readyTimeout = setTimeout(() => { - this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`); - this.kill(); - resolve(false); - }, this.options.readyTimeoutMs); + transport.on('reconnected', () => { + this.logger.log('info', 'Transport reconnected, waiting for ready event'); + this.emit('reconnected'); + }); - this.once(`management:${this.options.readyEventName}`, () => { - clearTimeout(readyTimeout); - this.isRunning = true; - this.logger.log('info', `Bridge connected to ${this.options.binaryName}`); - this.emit('ready'); - resolve(true); + // Connect the transport + transport.connect().then(() => { + // Wait for the ready event from the protocol layer + const readyTimeout = setTimeout(() => { + this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`); + this.kill(); + resolve(false); + }, this.options.readyTimeoutMs); + + this.once(`management:${this.options.readyEventName}`, () => { + clearTimeout(readyTimeout); + this.isRunning = true; + this.logger.log('info', `Bridge connected to ${this.options.binaryName}`); + this.emit('ready'); + resolve(true); + }); + }).catch((err: Error) => { + this.logger.log('error', `Transport connect failed: ${err.message}`); + resolve(false); }); } catch (err: any) { - this.logger.log('error', `Failed to spawn: ${err.message}`); + this.logger.log('error', `Failed to connect: ${err.message}`); resolve(false); } }); @@ -140,7 +165,7 @@ export class RustBridge extends plu method: K, params: TCommands[K]['params'], ): Promise { - if (!this.childProcess || !this.isRunning) { + if (!this.transport?.connected || !this.isRunning) { throw new Error(`${this.options.binaryName} bridge is not running`); } @@ -164,10 +189,10 @@ export class RustBridge extends plu this.pendingRequests.set(id, { resolve, reject, timer }); - this.writeToStdin(json + '\n').catch((err) => { + this.transport!.write(json + '\n').catch((err) => { clearTimeout(timer); this.pendingRequests.delete(id); - reject(new Error(`Failed to write to stdin: ${err.message}`)); + reject(new Error(`Failed to write to transport: ${err.message}`)); }); }); } @@ -183,7 +208,7 @@ export class RustBridge extends plu ): StreamingResponse, TCommands[K]['result']> { const streaming = new StreamingResponse, TCommands[K]['result']>(); - if (!this.childProcess || !this.isRunning) { + if (!this.transport?.connected || !this.isRunning) { streaming.fail(new Error(`${this.options.binaryName} bridge is not running`)); return streaming; } @@ -213,28 +238,26 @@ export class RustBridge extends plu streaming, }); - this.writeToStdin(json + '\n').catch((err) => { + this.transport!.write(json + '\n').catch((err) => { clearTimeout(timer); this.pendingRequests.delete(id); - streaming.fail(new Error(`Failed to write to stdin: ${err.message}`)); + streaming.fail(new Error(`Failed to write to transport: ${err.message}`)); }); return streaming; } /** - * Kill the Rust process and clean up all resources. + * Kill the connection and clean up all resources. + * For stdio: kills the child process (SIGTERM, then SIGKILL). + * For socket: closes the socket connection (does NOT kill the daemon). */ public kill(): void { - if (this.childProcess) { - const proc = this.childProcess; - this.childProcess = null; + if (this.transport) { + const transport = this.transport; + this.transport = null; this.isRunning = false; - // Clear buffers - this.stdoutBuffer = Buffer.alloc(0); - this.stderrRemainder = ''; - // Reject pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timer); @@ -242,27 +265,8 @@ export class RustBridge extends plu } this.pendingRequests.clear(); - // Remove all listeners - proc.removeAllListeners(); - proc.stdout?.removeAllListeners(); - proc.stderr?.removeAllListeners(); - proc.stdin?.removeAllListeners(); - - // Kill the process - try { proc.kill('SIGTERM'); } catch { /* already dead */ } - - // Destroy stdio pipes - try { proc.stdin?.destroy(); } catch { /* ignore */ } - try { proc.stdout?.destroy(); } catch { /* ignore */ } - try { proc.stderr?.destroy(); } catch { /* ignore */ } - - // Unref so Node doesn't wait - try { proc.unref(); } catch { /* ignore */ } - - // Force kill after 5 seconds - setTimeout(() => { - try { proc.kill('SIGKILL'); } catch { /* already dead */ } - }, 5000).unref(); + transport.removeAllListeners(); + transport.disconnect(); } } @@ -273,62 +277,6 @@ export class RustBridge extends plu return this.isRunning; } - /** - * Buffer-based newline scanner for stdout chunks. - * Replaces readline to handle large payloads without buffering entire lines in a separate abstraction. - */ - private handleStdoutChunk(chunk: Buffer): void { - this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, chunk]); - - let newlineIndex: number; - while ((newlineIndex = this.stdoutBuffer.indexOf(0x0A)) !== -1) { - const lineBuffer = this.stdoutBuffer.subarray(0, newlineIndex); - this.stdoutBuffer = this.stdoutBuffer.subarray(newlineIndex + 1); - - if (lineBuffer.length > this.options.maxPayloadSize) { - this.logger.log('error', `Inbound message exceeds maxPayloadSize (${lineBuffer.length} bytes), dropping`); - continue; - } - - const line = lineBuffer.toString('utf8').trim(); - this.handleLine(line); - } - - // If accumulated buffer exceeds maxPayloadSize (sender never sends newline), clear to prevent OOM - if (this.stdoutBuffer.length > this.options.maxPayloadSize) { - this.logger.log('error', `Stdout buffer exceeded maxPayloadSize (${this.stdoutBuffer.length} bytes) without newline, clearing`); - this.stdoutBuffer = Buffer.alloc(0); - } - } - - /** - * Write data to stdin with backpressure support. - * Waits for drain if the internal buffer is full. - */ - private writeToStdin(data: string): Promise { - return new Promise((resolve, reject) => { - if (!this.childProcess?.stdin) { - reject(new Error('stdin not available')); - return; - } - - const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => { - if (err) { - reject(err); - } - }); - - if (canContinue) { - resolve(); - } else { - // Wait for drain before resolving - this.childProcess.stdin.once('drain', () => { - resolve(); - }); - } - }); - } - private handleLine(line: string): void { if (!line) return; @@ -381,9 +329,7 @@ export class RustBridge extends plu private cleanup(): void { this.isRunning = false; - this.childProcess = null; - this.stdoutBuffer = Buffer.alloc(0); - this.stderrRemainder = ''; + this.transport = null; // Reject all pending requests for (const [, pending] of this.pendingRequests) { diff --git a/ts/classes.sockettransport.ts b/ts/classes.sockettransport.ts new file mode 100644 index 0000000..8281f67 --- /dev/null +++ b/ts/classes.sockettransport.ts @@ -0,0 +1,187 @@ +import * as plugins from './plugins.js'; +import { LineScanner } from './classes.linescanner.js'; +import type { IRustBridgeLogger } from './interfaces/index.js'; +import type { IRustTransport } from './interfaces/transport.js'; + +export interface ISocketTransportOptions { + /** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */ + socketPath: string; + /** Maximum inbound message size in bytes */ + maxPayloadSize: number; + /** Logger instance */ + logger: IRustBridgeLogger; + /** Enable auto-reconnect on unexpected disconnect (default: false) */ + autoReconnect?: boolean; + /** Initial delay between reconnect attempts in ms (default: 100) */ + reconnectBaseDelayMs?: number; + /** Maximum delay between reconnect attempts in ms (default: 30000) */ + reconnectMaxDelayMs?: number; + /** Maximum number of reconnect attempts before giving up (default: 10) */ + maxReconnectAttempts?: number; +} + +interface IResolvedSocketTransportOptions { + socketPath: string; + maxPayloadSize: number; + logger: IRustBridgeLogger; + autoReconnect: boolean; + reconnectBaseDelayMs: number; + reconnectMaxDelayMs: number; + maxReconnectAttempts: number; +} + +/** + * Transport that connects to an already-running process via Unix socket or Windows named pipe. + * The JSON-over-newline protocol is identical to stdio; only the transport changes. + */ +export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport { + private options: IResolvedSocketTransportOptions; + private socket: plugins.net.Socket | null = null; + private lineScanner: LineScanner; + private _connected: boolean = false; + private intentionalDisconnect: boolean = false; + private reconnectAttempts: number = 0; + private reconnectTimer: ReturnType | null = null; + + constructor(options: ISocketTransportOptions) { + super(); + this.options = { + autoReconnect: false, + reconnectBaseDelayMs: 100, + reconnectMaxDelayMs: 30000, + maxReconnectAttempts: 10, + ...options, + }; + this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger); + } + + public get connected(): boolean { + return this._connected; + } + + /** + * Connect to the socket. Resolves when the TCP/Unix connection is established. + */ + public async connect(): Promise { + this.intentionalDisconnect = false; + this.reconnectAttempts = 0; + return this.doConnect(); + } + + private doConnect(): Promise { + return new Promise((resolve, reject) => { + let settled = false; + + this.socket = plugins.net.connect({ path: this.options.socketPath }); + + this.socket.on('connect', () => { + if (!settled) { + settled = true; + this._connected = true; + this.reconnectAttempts = 0; + resolve(); + } + }); + + this.socket.on('data', (chunk: Buffer) => { + this.lineScanner.push(chunk, (line) => { + this.emit('line', line); + }); + }); + + this.socket.on('close', () => { + const wasConnected = this._connected; + this._connected = false; + this.lineScanner.clear(); + + if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) { + this.attemptReconnect(); + } + this.emit('close'); + }); + + this.socket.on('error', (err: Error) => { + this._connected = false; + if (!settled) { + settled = true; + reject(err); + } else if (!this.intentionalDisconnect) { + this.emit('error', err); + } + }); + }); + } + + private attemptReconnect(): void { + if (this.reconnectAttempts >= this.options.maxReconnectAttempts) { + this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`); + this.emit('reconnect_failed'); + return; + } + + const delay = Math.min( + this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts), + this.options.reconnectMaxDelayMs, + ); + this.reconnectAttempts++; + + this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`); + + this.reconnectTimer = setTimeout(async () => { + this.reconnectTimer = null; + try { + await this.doConnect(); + this.emit('reconnected'); + } catch { + // doConnect rejected — the 'close' handler on the new socket will trigger another attempt + } + }, delay); + } + + /** + * Write data to the socket with backpressure support. + */ + public async write(data: string): Promise { + return new Promise((resolve, reject) => { + if (!this.socket || !this._connected) { + reject(new Error('Socket not connected')); + return; + } + + const canContinue = this.socket.write(data, 'utf8', (err) => { + if (err) { + reject(err); + } + }); + + if (canContinue) { + resolve(); + } else { + this.socket.once('drain', () => { + resolve(); + }); + } + }); + } + + /** + * Close the socket connection. Does NOT kill the remote daemon. + */ + public disconnect(): void { + this.intentionalDisconnect = true; + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + if (this.socket) { + const sock = this.socket; + this.socket = null; + this._connected = false; + this.lineScanner.clear(); + sock.removeAllListeners(); + sock.destroy(); + } + } +} diff --git a/ts/classes.stdiotransport.ts b/ts/classes.stdiotransport.ts new file mode 100644 index 0000000..1abfbda --- /dev/null +++ b/ts/classes.stdiotransport.ts @@ -0,0 +1,149 @@ +import * as plugins from './plugins.js'; +import { LineScanner } from './classes.linescanner.js'; +import type { IRustBridgeLogger } from './interfaces/index.js'; +import type { IRustTransport } from './interfaces/transport.js'; + +export interface IStdioTransportOptions { + binaryPath: string; + cliArgs: string[]; + env?: Record; + maxPayloadSize: number; + logger: IRustBridgeLogger; +} + +/** + * Transport that spawns a child process and communicates via stdin/stdout. + * Extracted from the original RustBridge process management logic. + */ +export class StdioTransport extends plugins.events.EventEmitter implements IRustTransport { + private options: IStdioTransportOptions; + private childProcess: plugins.childProcess.ChildProcess | null = null; + private lineScanner: LineScanner; + private stderrRemainder: string = ''; + private _connected: boolean = false; + + constructor(options: IStdioTransportOptions) { + super(); + this.options = options; + this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger); + } + + public get connected(): boolean { + return this._connected; + } + + /** + * Spawn the child process. Resolves when the process is running (not necessarily ready). + */ + public async connect(): Promise { + const env = this.options.env + ? { ...process.env, ...this.options.env } + : { ...process.env }; + + this.childProcess = plugins.childProcess.spawn( + this.options.binaryPath, + this.options.cliArgs, + { stdio: ['pipe', 'pipe', 'pipe'], env }, + ); + + // Handle stderr with cross-chunk buffering + this.childProcess.stderr?.on('data', (data: Buffer) => { + this.stderrRemainder += data.toString(); + const lines = this.stderrRemainder.split('\n'); + this.stderrRemainder = lines.pop()!; + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + this.emit('stderr', trimmed); + } + } + }); + + // Handle stdout via LineScanner + this.childProcess.stdout!.on('data', (chunk: Buffer) => { + this.lineScanner.push(chunk, (line) => { + this.emit('line', line); + }); + }); + + // Handle process exit + this.childProcess.on('exit', (code: number | null, signal: string | null) => { + // Flush remaining stderr + if (this.stderrRemainder.trim()) { + this.emit('stderr', this.stderrRemainder.trim()); + } + this._connected = false; + this.lineScanner.clear(); + this.stderrRemainder = ''; + this.emit('close', code, signal); + }); + + this.childProcess.on('error', (err: Error) => { + this._connected = false; + this.emit('error', err); + }); + + this._connected = true; + } + + /** + * Write data to stdin with backpressure support. + */ + public async write(data: string): Promise { + return new Promise((resolve, reject) => { + if (!this.childProcess?.stdin) { + reject(new Error('stdin not available')); + return; + } + + const canContinue = this.childProcess.stdin.write(data, 'utf8', (err) => { + if (err) { + reject(err); + } + }); + + if (canContinue) { + resolve(); + } else { + this.childProcess.stdin.once('drain', () => { + resolve(); + }); + } + }); + } + + /** + * Kill the child process. Sends SIGTERM, then SIGKILL after 5s. + */ + public disconnect(): void { + if (!this.childProcess) return; + + const proc = this.childProcess; + this.childProcess = null; + this._connected = false; + this.lineScanner.clear(); + this.stderrRemainder = ''; + + // Remove all listeners + proc.removeAllListeners(); + proc.stdout?.removeAllListeners(); + proc.stderr?.removeAllListeners(); + proc.stdin?.removeAllListeners(); + + // Kill the process + try { proc.kill('SIGTERM'); } catch { /* already dead */ } + + // Destroy stdio pipes + try { proc.stdin?.destroy(); } catch { /* ignore */ } + try { proc.stdout?.destroy(); } catch { /* ignore */ } + try { proc.stderr?.destroy(); } catch { /* ignore */ } + + // Unref so Node doesn't wait + try { proc.unref(); } catch { /* ignore */ } + + // Force kill after 5 seconds + setTimeout(() => { + try { proc.kill('SIGKILL'); } catch { /* already dead */ } + }, 5000).unref(); + } +} diff --git a/ts/index.ts b/ts/index.ts index 385e4b6..608d058 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,4 +1,7 @@ export { RustBridge } from './classes.rustbridge.js'; export { RustBinaryLocator } from './classes.rustbinarylocator.js'; export { StreamingResponse } from './classes.streamingresponse.js'; +export { StdioTransport } from './classes.stdiotransport.js'; +export { SocketTransport } from './classes.sockettransport.js'; +export { LineScanner } from './classes.linescanner.js'; export * from './interfaces/index.js'; diff --git a/ts/interfaces/config.ts b/ts/interfaces/config.ts index 6e6ba5d..70d893a 100644 --- a/ts/interfaces/config.ts +++ b/ts/interfaces/config.ts @@ -45,3 +45,17 @@ export interface IRustBridgeOptions extends IBinaryLocatorOptions { * Resets on each chunk received. */ streamTimeoutMs?: number; } + +/** + * Options for connecting to an already-running daemon via Unix socket or named pipe. + */ +export interface ISocketConnectOptions { + /** Enable auto-reconnect on unexpected disconnect (default: false) */ + autoReconnect?: boolean; + /** Initial delay between reconnect attempts in ms (default: 100) */ + reconnectBaseDelayMs?: number; + /** Maximum delay between reconnect attempts in ms (default: 30000) */ + reconnectMaxDelayMs?: number; + /** Maximum number of reconnect attempts before giving up (default: 10) */ + maxReconnectAttempts?: number; +} diff --git a/ts/interfaces/index.ts b/ts/interfaces/index.ts index 992e6c8..362cca1 100644 --- a/ts/interfaces/index.ts +++ b/ts/interfaces/index.ts @@ -1,2 +1,3 @@ export * from './ipc.js'; export * from './config.js'; +export * from './transport.js'; diff --git a/ts/interfaces/transport.ts b/ts/interfaces/transport.ts new file mode 100644 index 0000000..3e0e697 --- /dev/null +++ b/ts/interfaces/transport.ts @@ -0,0 +1,26 @@ +import type * as plugins from '../plugins.js'; + +/** + * Abstract transport for communicating with a Rust process. + * Both stdio and socket transports implement this interface. + * + * Events emitted: + * - 'line' (line: string) — a complete newline-terminated JSON line received + * - 'error' (err: Error) — transport-level error + * - 'close' (...args: any[]) — transport has closed/disconnected + * - 'stderr' (line: string) — stderr output (stdio transport only) + * - 'reconnected' () — transport reconnected after unexpected disconnect (socket only) + */ +export interface IRustTransport extends plugins.events.EventEmitter { + /** Connect the transport (spawn process or connect to socket). Resolves when I/O channel is open. */ + connect(): Promise; + + /** Write a string to the transport. Handles backpressure. */ + write(data: string): Promise; + + /** Disconnect the transport. For stdio: kills the process. For socket: closes the connection. */ + disconnect(): void; + + /** Whether the transport is currently connected and writable. */ + readonly connected: boolean; +} diff --git a/ts/plugins.ts b/ts/plugins.ts index 722734b..c8055e5 100644 --- a/ts/plugins.ts +++ b/ts/plugins.ts @@ -4,8 +4,9 @@ import * as fs from 'fs'; import * as childProcess from 'child_process'; import * as events from 'events'; import * as url from 'url'; +import * as net from 'net'; -export { path, fs, childProcess, events, url }; +export { path, fs, childProcess, events, url, net }; // @push.rocks scope import * as smartpath from '@push.rocks/smartpath';