import * as plugins from './plugins.js'; import { LineScanner } from './classes.linescanner.js'; import type { IRustBridgeLogger } from './interfaces/index.js'; import type { IRustTransport } from './interfaces/transport.js'; export interface ISocketTransportOptions { /** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */ socketPath: string; /** Maximum inbound message size in bytes */ maxPayloadSize: number; /** Logger instance */ logger: IRustBridgeLogger; /** Enable auto-reconnect on unexpected disconnect (default: false) */ autoReconnect?: boolean; /** Initial delay between reconnect attempts in ms (default: 100) */ reconnectBaseDelayMs?: number; /** Maximum delay between reconnect attempts in ms (default: 30000) */ reconnectMaxDelayMs?: number; /** Maximum number of reconnect attempts before giving up (default: 10) */ maxReconnectAttempts?: number; } interface IResolvedSocketTransportOptions { socketPath: string; maxPayloadSize: number; logger: IRustBridgeLogger; autoReconnect: boolean; reconnectBaseDelayMs: number; reconnectMaxDelayMs: number; maxReconnectAttempts: number; } /** * Transport that connects to an already-running process via Unix socket or Windows named pipe. * The JSON-over-newline protocol is identical to stdio; only the transport changes. */ export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport { private options: IResolvedSocketTransportOptions; private socket: plugins.net.Socket | null = null; private lineScanner: LineScanner; private _connected: boolean = false; private intentionalDisconnect: boolean = false; private reconnectAttempts: number = 0; private reconnectTimer: ReturnType | null = null; constructor(options: ISocketTransportOptions) { super(); this.options = { autoReconnect: false, reconnectBaseDelayMs: 100, reconnectMaxDelayMs: 30000, maxReconnectAttempts: 10, ...options, }; this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger); } public get connected(): boolean { return this._connected; } /** * Connect to the socket. Resolves when the TCP/Unix connection is established. */ public async connect(): Promise { this.intentionalDisconnect = false; this.reconnectAttempts = 0; return this.doConnect(); } private doConnect(): Promise { return new Promise((resolve, reject) => { let settled = false; this.socket = plugins.net.connect({ path: this.options.socketPath }); this.socket.on('connect', () => { if (!settled) { settled = true; this._connected = true; this.reconnectAttempts = 0; resolve(); } }); this.socket.on('data', (chunk: Buffer) => { this.lineScanner.push(chunk, (line) => { this.emit('line', line); }); }); this.socket.on('close', () => { const wasConnected = this._connected; this._connected = false; this.lineScanner.clear(); if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) { this.attemptReconnect(); } this.emit('close'); }); this.socket.on('error', (err: Error) => { this._connected = false; if (!settled) { settled = true; reject(err); } else if (!this.intentionalDisconnect) { this.emit('error', err); } }); }); } private attemptReconnect(): void { if (this.reconnectAttempts >= this.options.maxReconnectAttempts) { this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`); this.emit('reconnect_failed'); return; } const delay = Math.min( this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts), this.options.reconnectMaxDelayMs, ); this.reconnectAttempts++; this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`); this.reconnectTimer = setTimeout(async () => { this.reconnectTimer = null; try { await this.doConnect(); this.emit('reconnected'); } catch { // doConnect rejected — the 'close' handler on the new socket will trigger another attempt } }, delay); } /** * Write data to the socket with backpressure support. */ public async write(data: string): Promise { return new Promise((resolve, reject) => { if (!this.socket || !this._connected) { reject(new Error('Socket not connected')); return; } const canContinue = this.socket.write(data, 'utf8', (err) => { if (err) { reject(err); } }); if (canContinue) { resolve(); } else { this.socket.once('drain', () => { resolve(); }); } }); } /** * Close the socket connection. Does NOT kill the remote daemon. */ public disconnect(): void { this.intentionalDisconnect = true; if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.socket) { const sock = this.socket; this.socket = null; this._connected = false; this.lineScanner.clear(); sock.removeAllListeners(); sock.destroy(); } } }