Files
smartrust/ts/classes.sockettransport.ts

188 lines
5.4 KiB
TypeScript

import * as plugins from './plugins.js';
import { LineScanner } from './classes.linescanner.js';
import type { IRustBridgeLogger } from './interfaces/index.js';
import type { IRustTransport } from './interfaces/transport.js';
export interface ISocketTransportOptions {
/** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */
socketPath: string;
/** Maximum inbound message size in bytes */
maxPayloadSize: number;
/** Logger instance */
logger: IRustBridgeLogger;
/** Enable auto-reconnect on unexpected disconnect (default: false) */
autoReconnect?: boolean;
/** Initial delay between reconnect attempts in ms (default: 100) */
reconnectBaseDelayMs?: number;
/** Maximum delay between reconnect attempts in ms (default: 30000) */
reconnectMaxDelayMs?: number;
/** Maximum number of reconnect attempts before giving up (default: 10) */
maxReconnectAttempts?: number;
}
interface IResolvedSocketTransportOptions {
socketPath: string;
maxPayloadSize: number;
logger: IRustBridgeLogger;
autoReconnect: boolean;
reconnectBaseDelayMs: number;
reconnectMaxDelayMs: number;
maxReconnectAttempts: number;
}
/**
* Transport that connects to an already-running process via Unix socket or Windows named pipe.
* The JSON-over-newline protocol is identical to stdio; only the transport changes.
*/
export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport {
private options: IResolvedSocketTransportOptions;
private socket: plugins.net.Socket | null = null;
private lineScanner: LineScanner;
private _connected: boolean = false;
private intentionalDisconnect: boolean = false;
private reconnectAttempts: number = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ISocketTransportOptions) {
super();
this.options = {
autoReconnect: false,
reconnectBaseDelayMs: 100,
reconnectMaxDelayMs: 30000,
maxReconnectAttempts: 10,
...options,
};
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
}
public get connected(): boolean {
return this._connected;
}
/**
* Connect to the socket. Resolves when the TCP/Unix connection is established.
*/
public async connect(): Promise<void> {
this.intentionalDisconnect = false;
this.reconnectAttempts = 0;
return this.doConnect();
}
private doConnect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
let settled = false;
this.socket = plugins.net.connect({ path: this.options.socketPath });
this.socket.on('connect', () => {
if (!settled) {
settled = true;
this._connected = true;
this.reconnectAttempts = 0;
resolve();
}
});
this.socket.on('data', (chunk: Buffer) => {
this.lineScanner.push(chunk, (line) => {
this.emit('line', line);
});
});
this.socket.on('close', () => {
const wasConnected = this._connected;
this._connected = false;
this.lineScanner.clear();
if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) {
this.attemptReconnect();
}
this.emit('close');
});
this.socket.on('error', (err: Error) => {
this._connected = false;
if (!settled) {
settled = true;
reject(err);
} else if (!this.intentionalDisconnect) {
this.emit('error', err);
}
});
});
}
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`);
this.emit('reconnect_failed');
return;
}
const delay = Math.min(
this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts),
this.options.reconnectMaxDelayMs,
);
this.reconnectAttempts++;
this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
try {
await this.doConnect();
this.emit('reconnected');
} catch {
// doConnect rejected — the 'close' handler on the new socket will trigger another attempt
}
}, delay);
}
/**
* Write data to the socket with backpressure support.
*/
public async write(data: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.socket || !this._connected) {
reject(new Error('Socket not connected'));
return;
}
const canContinue = this.socket.write(data, 'utf8', (err) => {
if (err) {
reject(err);
}
});
if (canContinue) {
resolve();
} else {
this.socket.once('drain', () => {
resolve();
});
}
});
}
/**
* Close the socket connection. Does NOT kill the remote daemon.
*/
public disconnect(): void {
this.intentionalDisconnect = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.socket) {
const sock = this.socket;
this.socket = null;
this._connected = false;
this.lineScanner.clear();
sock.removeAllListeners();
sock.destroy();
}
}
}