188 lines
5.4 KiB
TypeScript
188 lines
5.4 KiB
TypeScript
|
|
import * as plugins from './plugins.js';
|
||
|
|
import { LineScanner } from './classes.linescanner.js';
|
||
|
|
import type { IRustBridgeLogger } from './interfaces/index.js';
|
||
|
|
import type { IRustTransport } from './interfaces/transport.js';
|
||
|
|
|
||
|
|
export interface ISocketTransportOptions {
|
||
|
|
/** Path to Unix socket (Linux/macOS) or named pipe path (Windows) */
|
||
|
|
socketPath: string;
|
||
|
|
/** Maximum inbound message size in bytes */
|
||
|
|
maxPayloadSize: number;
|
||
|
|
/** Logger instance */
|
||
|
|
logger: IRustBridgeLogger;
|
||
|
|
/** Enable auto-reconnect on unexpected disconnect (default: false) */
|
||
|
|
autoReconnect?: boolean;
|
||
|
|
/** Initial delay between reconnect attempts in ms (default: 100) */
|
||
|
|
reconnectBaseDelayMs?: number;
|
||
|
|
/** Maximum delay between reconnect attempts in ms (default: 30000) */
|
||
|
|
reconnectMaxDelayMs?: number;
|
||
|
|
/** Maximum number of reconnect attempts before giving up (default: 10) */
|
||
|
|
maxReconnectAttempts?: number;
|
||
|
|
}
|
||
|
|
|
||
|
|
interface IResolvedSocketTransportOptions {
|
||
|
|
socketPath: string;
|
||
|
|
maxPayloadSize: number;
|
||
|
|
logger: IRustBridgeLogger;
|
||
|
|
autoReconnect: boolean;
|
||
|
|
reconnectBaseDelayMs: number;
|
||
|
|
reconnectMaxDelayMs: number;
|
||
|
|
maxReconnectAttempts: number;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Transport that connects to an already-running process via Unix socket or Windows named pipe.
|
||
|
|
* The JSON-over-newline protocol is identical to stdio; only the transport changes.
|
||
|
|
*/
|
||
|
|
export class SocketTransport extends plugins.events.EventEmitter implements IRustTransport {
|
||
|
|
private options: IResolvedSocketTransportOptions;
|
||
|
|
private socket: plugins.net.Socket | null = null;
|
||
|
|
private lineScanner: LineScanner;
|
||
|
|
private _connected: boolean = false;
|
||
|
|
private intentionalDisconnect: boolean = false;
|
||
|
|
private reconnectAttempts: number = 0;
|
||
|
|
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||
|
|
|
||
|
|
constructor(options: ISocketTransportOptions) {
|
||
|
|
super();
|
||
|
|
this.options = {
|
||
|
|
autoReconnect: false,
|
||
|
|
reconnectBaseDelayMs: 100,
|
||
|
|
reconnectMaxDelayMs: 30000,
|
||
|
|
maxReconnectAttempts: 10,
|
||
|
|
...options,
|
||
|
|
};
|
||
|
|
this.lineScanner = new LineScanner(options.maxPayloadSize, options.logger);
|
||
|
|
}
|
||
|
|
|
||
|
|
public get connected(): boolean {
|
||
|
|
return this._connected;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Connect to the socket. Resolves when the TCP/Unix connection is established.
|
||
|
|
*/
|
||
|
|
public async connect(): Promise<void> {
|
||
|
|
this.intentionalDisconnect = false;
|
||
|
|
this.reconnectAttempts = 0;
|
||
|
|
return this.doConnect();
|
||
|
|
}
|
||
|
|
|
||
|
|
private doConnect(): Promise<void> {
|
||
|
|
return new Promise<void>((resolve, reject) => {
|
||
|
|
let settled = false;
|
||
|
|
|
||
|
|
this.socket = plugins.net.connect({ path: this.options.socketPath });
|
||
|
|
|
||
|
|
this.socket.on('connect', () => {
|
||
|
|
if (!settled) {
|
||
|
|
settled = true;
|
||
|
|
this._connected = true;
|
||
|
|
this.reconnectAttempts = 0;
|
||
|
|
resolve();
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
this.socket.on('data', (chunk: Buffer) => {
|
||
|
|
this.lineScanner.push(chunk, (line) => {
|
||
|
|
this.emit('line', line);
|
||
|
|
});
|
||
|
|
});
|
||
|
|
|
||
|
|
this.socket.on('close', () => {
|
||
|
|
const wasConnected = this._connected;
|
||
|
|
this._connected = false;
|
||
|
|
this.lineScanner.clear();
|
||
|
|
|
||
|
|
if (!this.intentionalDisconnect && wasConnected && this.options.autoReconnect) {
|
||
|
|
this.attemptReconnect();
|
||
|
|
}
|
||
|
|
this.emit('close');
|
||
|
|
});
|
||
|
|
|
||
|
|
this.socket.on('error', (err: Error) => {
|
||
|
|
this._connected = false;
|
||
|
|
if (!settled) {
|
||
|
|
settled = true;
|
||
|
|
reject(err);
|
||
|
|
} else if (!this.intentionalDisconnect) {
|
||
|
|
this.emit('error', err);
|
||
|
|
}
|
||
|
|
});
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
private attemptReconnect(): void {
|
||
|
|
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
|
||
|
|
this.options.logger.log('error', `Max reconnect attempts (${this.options.maxReconnectAttempts}) reached`);
|
||
|
|
this.emit('reconnect_failed');
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const delay = Math.min(
|
||
|
|
this.options.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts),
|
||
|
|
this.options.reconnectMaxDelayMs,
|
||
|
|
);
|
||
|
|
this.reconnectAttempts++;
|
||
|
|
|
||
|
|
this.options.logger.log('info', `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
|
||
|
|
|
||
|
|
this.reconnectTimer = setTimeout(async () => {
|
||
|
|
this.reconnectTimer = null;
|
||
|
|
try {
|
||
|
|
await this.doConnect();
|
||
|
|
this.emit('reconnected');
|
||
|
|
} catch {
|
||
|
|
// doConnect rejected — the 'close' handler on the new socket will trigger another attempt
|
||
|
|
}
|
||
|
|
}, delay);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Write data to the socket with backpressure support.
|
||
|
|
*/
|
||
|
|
public async write(data: string): Promise<void> {
|
||
|
|
return new Promise<void>((resolve, reject) => {
|
||
|
|
if (!this.socket || !this._connected) {
|
||
|
|
reject(new Error('Socket not connected'));
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const canContinue = this.socket.write(data, 'utf8', (err) => {
|
||
|
|
if (err) {
|
||
|
|
reject(err);
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
if (canContinue) {
|
||
|
|
resolve();
|
||
|
|
} else {
|
||
|
|
this.socket.once('drain', () => {
|
||
|
|
resolve();
|
||
|
|
});
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Close the socket connection. Does NOT kill the remote daemon.
|
||
|
|
*/
|
||
|
|
public disconnect(): void {
|
||
|
|
this.intentionalDisconnect = true;
|
||
|
|
|
||
|
|
if (this.reconnectTimer) {
|
||
|
|
clearTimeout(this.reconnectTimer);
|
||
|
|
this.reconnectTimer = null;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (this.socket) {
|
||
|
|
const sock = this.socket;
|
||
|
|
this.socket = null;
|
||
|
|
this._connected = false;
|
||
|
|
this.lineScanner.clear();
|
||
|
|
sock.removeAllListeners();
|
||
|
|
sock.destroy();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|