feat(rustbridge): add RustBridge and RustBinaryLocator with typed IPC interfaces, plugins, tests and mock runner; export from index; add npm registries
This commit is contained in:
256
ts/classes.rustbridge.ts
Normal file
256
ts/classes.rustbridge.ts
Normal file
@@ -0,0 +1,256 @@
|
||||
import * as plugins from './plugins.js';
|
||||
import { RustBinaryLocator } from './classes.rustbinarylocator.js';
|
||||
import type {
|
||||
IRustBridgeOptions,
|
||||
IRustBridgeLogger,
|
||||
TCommandMap,
|
||||
IManagementRequest,
|
||||
IManagementResponse,
|
||||
IManagementEvent,
|
||||
} from './interfaces/index.js';
|
||||
|
||||
const defaultLogger: IRustBridgeLogger = {
|
||||
log() {},
|
||||
};
|
||||
|
||||
/**
|
||||
* Generic bridge between TypeScript and a Rust binary.
|
||||
* Communicates via JSON-over-stdin/stdout IPC protocol.
|
||||
*
|
||||
* @typeParam TCommands - Map of command names to their param/result types
|
||||
*/
|
||||
export class RustBridge<TCommands extends TCommandMap = TCommandMap> extends plugins.events.EventEmitter {
|
||||
private locator: RustBinaryLocator;
|
||||
private options: Required<Pick<IRustBridgeOptions, 'cliArgs' | 'requestTimeoutMs' | 'readyTimeoutMs' | 'readyEventName'>> & IRustBridgeOptions;
|
||||
private logger: IRustBridgeLogger;
|
||||
private childProcess: plugins.childProcess.ChildProcess | null = null;
|
||||
private readlineInterface: plugins.readline.Interface | null = null;
|
||||
private pendingRequests = new Map<string, {
|
||||
resolve: (value: any) => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
}>();
|
||||
private requestCounter = 0;
|
||||
private isRunning = false;
|
||||
private binaryPath: string | null = null;
|
||||
|
||||
constructor(options: IRustBridgeOptions) {
|
||||
super();
|
||||
this.logger = options.logger || defaultLogger;
|
||||
this.options = {
|
||||
cliArgs: ['--management'],
|
||||
requestTimeoutMs: 30000,
|
||||
readyTimeoutMs: 10000,
|
||||
readyEventName: 'ready',
|
||||
...options,
|
||||
};
|
||||
this.locator = new RustBinaryLocator(options, this.logger);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn the Rust binary and wait for it to signal readiness.
|
||||
* Returns true if the binary was found and spawned successfully.
|
||||
*/
|
||||
public async spawn(): Promise<boolean> {
|
||||
this.binaryPath = await this.locator.findBinary();
|
||||
if (!this.binaryPath) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return new Promise<boolean>((resolve) => {
|
||||
try {
|
||||
const env = this.options.env
|
||||
? { ...process.env, ...this.options.env }
|
||||
: { ...process.env };
|
||||
|
||||
this.childProcess = plugins.childProcess.spawn(this.binaryPath!, this.options.cliArgs, {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
env,
|
||||
});
|
||||
|
||||
// Handle stderr
|
||||
this.childProcess.stderr?.on('data', (data: Buffer) => {
|
||||
const lines = data.toString().split('\n').filter((l: string) => l.trim());
|
||||
for (const line of lines) {
|
||||
this.logger.log('debug', `[${this.options.binaryName}] ${line}`);
|
||||
this.emit('stderr', line);
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stdout via readline for line-delimited JSON
|
||||
this.readlineInterface = plugins.readline.createInterface({ input: this.childProcess.stdout! });
|
||||
this.readlineInterface.on('line', (line: string) => {
|
||||
this.handleLine(line.trim());
|
||||
});
|
||||
|
||||
// Handle process exit
|
||||
this.childProcess.on('exit', (code, signal) => {
|
||||
this.logger.log('info', `Process exited (code=${code}, signal=${signal})`);
|
||||
this.cleanup();
|
||||
this.emit('exit', code, signal);
|
||||
});
|
||||
|
||||
this.childProcess.on('error', (err) => {
|
||||
this.logger.log('error', `Process error: ${err.message}`);
|
||||
this.cleanup();
|
||||
resolve(false);
|
||||
});
|
||||
|
||||
// Wait for the ready event
|
||||
const readyTimeout = setTimeout(() => {
|
||||
this.logger.log('error', `Process did not send ready event within ${this.options.readyTimeoutMs}ms`);
|
||||
this.kill();
|
||||
resolve(false);
|
||||
}, this.options.readyTimeoutMs);
|
||||
|
||||
this.once(`management:${this.options.readyEventName}`, () => {
|
||||
clearTimeout(readyTimeout);
|
||||
this.isRunning = true;
|
||||
this.logger.log('info', `Bridge connected to ${this.options.binaryName}`);
|
||||
this.emit('ready');
|
||||
resolve(true);
|
||||
});
|
||||
} catch (err: any) {
|
||||
this.logger.log('error', `Failed to spawn: ${err.message}`);
|
||||
resolve(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a typed command to the Rust process and wait for the response.
|
||||
*/
|
||||
public async sendCommand<K extends string & keyof TCommands>(
|
||||
method: K,
|
||||
params: TCommands[K]['params'],
|
||||
): Promise<TCommands[K]['result']> {
|
||||
if (!this.childProcess || !this.isRunning) {
|
||||
throw new Error(`${this.options.binaryName} bridge is not running`);
|
||||
}
|
||||
|
||||
const id = `req_${++this.requestCounter}`;
|
||||
const request: IManagementRequest = { id, method, params };
|
||||
|
||||
return new Promise<TCommands[K]['result']>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`Command '${method}' timed out after ${this.options.requestTimeoutMs}ms`));
|
||||
}, this.options.requestTimeoutMs);
|
||||
|
||||
this.pendingRequests.set(id, { resolve, reject, timer });
|
||||
|
||||
const json = JSON.stringify(request) + '\n';
|
||||
this.childProcess!.stdin!.write(json, (err) => {
|
||||
if (err) {
|
||||
clearTimeout(timer);
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`Failed to write to stdin: ${err.message}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill the Rust process and clean up all resources.
|
||||
*/
|
||||
public kill(): void {
|
||||
if (this.childProcess) {
|
||||
const proc = this.childProcess;
|
||||
this.childProcess = null;
|
||||
this.isRunning = false;
|
||||
|
||||
// Close readline
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
|
||||
// Reject pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
clearTimeout(pending.timer);
|
||||
pending.reject(new Error(`${this.options.binaryName} process killed`));
|
||||
}
|
||||
this.pendingRequests.clear();
|
||||
|
||||
// Remove all listeners
|
||||
proc.removeAllListeners();
|
||||
proc.stdout?.removeAllListeners();
|
||||
proc.stderr?.removeAllListeners();
|
||||
proc.stdin?.removeAllListeners();
|
||||
|
||||
// Kill the process
|
||||
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
|
||||
|
||||
// Destroy stdio pipes
|
||||
try { proc.stdin?.destroy(); } catch { /* ignore */ }
|
||||
try { proc.stdout?.destroy(); } catch { /* ignore */ }
|
||||
try { proc.stderr?.destroy(); } catch { /* ignore */ }
|
||||
|
||||
// Unref so Node doesn't wait
|
||||
try { proc.unref(); } catch { /* ignore */ }
|
||||
|
||||
// Force kill after 5 seconds
|
||||
setTimeout(() => {
|
||||
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
|
||||
}, 5000).unref();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the bridge is currently running.
|
||||
*/
|
||||
public get running(): boolean {
|
||||
return this.isRunning;
|
||||
}
|
||||
|
||||
private handleLine(line: string): void {
|
||||
if (!line) return;
|
||||
|
||||
let parsed: any;
|
||||
try {
|
||||
parsed = JSON.parse(line);
|
||||
} catch {
|
||||
this.logger.log('warn', `Non-JSON output: ${line}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if it's an event (has 'event' field, no 'id')
|
||||
if ('event' in parsed && !('id' in parsed)) {
|
||||
const event = parsed as IManagementEvent;
|
||||
this.emit(`management:${event.event}`, event.data);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise it's a response (has 'id' field)
|
||||
if ('id' in parsed) {
|
||||
const response = parsed as IManagementResponse;
|
||||
const pending = this.pendingRequests.get(response.id);
|
||||
if (pending) {
|
||||
clearTimeout(pending.timer);
|
||||
this.pendingRequests.delete(response.id);
|
||||
if (response.success) {
|
||||
pending.resolve(response.result);
|
||||
} else {
|
||||
pending.reject(new Error(response.error || 'Unknown error from Rust process'));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private cleanup(): void {
|
||||
this.isRunning = false;
|
||||
this.childProcess = null;
|
||||
|
||||
if (this.readlineInterface) {
|
||||
this.readlineInterface.close();
|
||||
this.readlineInterface = null;
|
||||
}
|
||||
|
||||
// Reject all pending requests
|
||||
for (const [, pending] of this.pendingRequests) {
|
||||
clearTimeout(pending.timer);
|
||||
pending.reject(new Error(`${this.options.binaryName} process exited`));
|
||||
}
|
||||
this.pendingRequests.clear();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user