import * as plugins from './smartipc.plugins.js'; /** * Message envelope structure for all IPC messages */ export interface IIpcMessageEnvelope { id: string; type: string; correlationId?: string; timestamp: number; payload: T; headers?: Record; } /** * Transport configuration options */ export interface IIpcTransportOptions { /** Unique identifier for this transport */ id: string; /** * When true, a client transport will NOT auto-start a server when connect() * encounters ECONNREFUSED/ENOENT. Useful for strict client/daemon setups. * Default: false. Can also be overridden by env SMARTIPC_CLIENT_ONLY=1. */ clientOnly?: boolean; /** Socket path for Unix domain sockets or pipe name for Windows */ socketPath?: string; /** TCP host for network transport */ host?: string; /** TCP port for network transport */ port?: number; /** Enable message encryption */ encryption?: boolean; /** Authentication token */ authToken?: string; /** Socket timeout in ms */ timeout?: number; /** Enable TCP no delay (Nagle's algorithm) */ noDelay?: boolean; /** Maximum message size in bytes (default: 8MB) */ maxMessageSize?: number; /** Automatically cleanup stale socket file on start (default: false) */ autoCleanupSocketFile?: boolean; /** Socket file permissions mode (e.g. 0o600) */ socketMode?: number; } /** * Connection state events */ export interface IIpcTransportEvents { connect: () => void; disconnect: (reason?: string) => void; error: (error: Error) => void; message: (message: IIpcMessageEnvelope) => void; drain: () => void; } /** * Abstract base class for IPC transports */ export abstract class IpcTransport extends plugins.EventEmitter { protected options: IIpcTransportOptions; protected connected: boolean = false; protected messageBuffer: Buffer = Buffer.alloc(0); protected currentMessageLength: number | null = null; constructor(options: IIpcTransportOptions) { super(); this.options = options; } /** * Connect the transport */ abstract connect(): Promise; /** * Disconnect the transport */ abstract disconnect(): Promise; /** * Send a message through the transport */ abstract send(message: IIpcMessageEnvelope): Promise; /** * Check if transport is connected */ public isConnected(): boolean { return this.connected; } /** * Parse incoming data with length-prefixed framing */ protected parseIncomingData(data: Buffer): void { // Append new data to buffer this.messageBuffer = Buffer.concat([this.messageBuffer, data]); while (this.messageBuffer.length > 0) { // If we don't have a message length yet, try to read it if (this.currentMessageLength === null) { if (this.messageBuffer.length >= 4) { // Read the length prefix (4 bytes, big endian) this.currentMessageLength = this.messageBuffer.readUInt32BE(0); // Check max message size const maxSize = this.options.maxMessageSize || 8 * 1024 * 1024; // 8MB default if (this.currentMessageLength > maxSize) { this.emit('error', new Error(`Message size ${this.currentMessageLength} exceeds maximum ${maxSize}`)); // Reset state to recover this.messageBuffer = Buffer.alloc(0); this.currentMessageLength = null; return; } this.messageBuffer = this.messageBuffer.slice(4); } else { // Not enough data for length prefix break; } } // If we have a message length, try to read the message if (this.currentMessageLength !== null) { if (this.messageBuffer.length >= this.currentMessageLength) { // Extract the message const messageData = this.messageBuffer.slice(0, this.currentMessageLength); this.messageBuffer = this.messageBuffer.slice(this.currentMessageLength); this.currentMessageLength = null; // Parse and emit the message try { const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope; this.emit('message', message); } catch (error: any) { this.emit('error', new Error(`Failed to parse message: ${error.message}`)); } } else { // Not enough data for the complete message break; } } } } /** * Frame a message with length prefix */ protected frameMessage(message: IIpcMessageEnvelope): Buffer { const messageStr = JSON.stringify(message); const messageBuffer = Buffer.from(messageStr, 'utf8'); const lengthBuffer = Buffer.allocUnsafe(4); lengthBuffer.writeUInt32BE(messageBuffer.length, 0); return Buffer.concat([lengthBuffer, messageBuffer]); } /** * Handle socket errors */ protected handleError(error: Error): void { this.emit('error', error); this.connected = false; this.emit('disconnect', error.message); } } /** * Unix domain socket transport for Linux/Mac */ export class UnixSocketTransport extends IpcTransport { private socket: plugins.net.Socket | null = null; private server: plugins.net.Server | null = null; private clients: Set = new Set(); private socketToClientId = new WeakMap(); private clientIdToSocket = new Map(); /** * Connect as client or start as server */ public async connect(): Promise { return new Promise((resolve, reject) => { const socketPath = this.getSocketPath(); // Try to connect as client first this.socket = new plugins.net.Socket(); if (this.options.noDelay !== false) { this.socket.setNoDelay(true); } this.socket.on('connect', () => { this.connected = true; this.setupSocketHandlers(this.socket!); this.emit('connect'); resolve(); }); this.socket.on('error', (error: any) => { if (error.code === 'ECONNREFUSED' || error.code === 'ENOENT') { // Determine if we must NOT auto-start server const envVal = process.env.SMARTIPC_CLIENT_ONLY; const envClientOnly = !!envVal && (envVal === '1' || envVal === 'true' || envVal === 'TRUE'); const clientOnly = this.options.clientOnly === true || envClientOnly; if (clientOnly) { // Reject instead of starting a server to avoid races const reason = error.code || 'UNKNOWN'; const err = new Error(`Server not available (${reason}); clientOnly prevents auto-start`); (err as any).code = reason; reject(err); return; } // No server exists and clientOnly is false: become the server (back-compat) this.socket = null; this.startServer(socketPath).then(resolve).catch(reject); } else { reject(error); } }); this.socket.connect(socketPath); }); } /** * Start as server */ private async startServer(socketPath: string): Promise { return new Promise((resolve, reject) => { // Clean up stale socket file if autoCleanupSocketFile is enabled if (this.options.autoCleanupSocketFile) { try { plugins.fs.unlinkSync(socketPath); } catch (error) { // File doesn't exist, that's fine } } this.server = plugins.net.createServer((socket) => { // Each new connection gets added to clients this.clients.add(socket); if (this.options.noDelay !== false) { socket.setNoDelay(true); } // Set up handlers for this client socket socket.on('data', (data) => { // Parse incoming data and emit with socket reference this.parseIncomingDataFromClient(data, socket); }); socket.on('error', (error) => { this.emit('clientError', error, socket); }); socket.on('close', () => { this.clients.delete(socket); // Clean up clientId mappings const clientId = this.socketToClientId.get(socket); if (clientId && this.clientIdToSocket.get(clientId) === socket) { this.clientIdToSocket.delete(clientId); } this.socketToClientId.delete(socket); this.emit('clientDisconnected', socket); }); socket.on('drain', () => { this.emit('drain'); }); // Emit new client connection this.emit('clientConnected', socket); }); this.server.on('error', reject); this.server.listen(socketPath, () => { // Set socket permissions if specified if (this.options.socketMode !== undefined && process.platform !== 'win32') { try { plugins.fs.chmodSync(socketPath, this.options.socketMode); } catch (error) { // Ignore permission errors, not critical } } this.connected = true; this.emit('connect'); resolve(); }); }); } /** * Parse incoming data from a specific client socket */ private parseIncomingDataFromClient(data: Buffer, socket: plugins.net.Socket): void { // We need to maintain separate buffers per client // For now, just emit the raw message with the socket reference const socketBuffers = this.clientBuffers || (this.clientBuffers = new WeakMap()); let buffer = socketBuffers.get(socket) || Buffer.alloc(0); let currentLength = this.clientLengths?.get(socket) || null; // Append new data to buffer buffer = Buffer.concat([buffer, data]); while (buffer.length > 0) { // If we don't have a message length yet, try to read it if (currentLength === null) { if (buffer.length >= 4) { // Read the length prefix (4 bytes, big endian) currentLength = buffer.readUInt32BE(0); buffer = buffer.slice(4); } else { // Not enough data for length prefix break; } } // If we have a message length, try to read the message if (currentLength !== null) { if (buffer.length >= currentLength) { // Extract the message const messageData = buffer.slice(0, currentLength); buffer = buffer.slice(currentLength); currentLength = null; // Parse and emit the message with socket reference try { const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope; // Update clientId mapping const clientId = message.headers?.clientId ?? (message.type === '__register__' ? (message.payload as any)?.clientId : undefined); if (clientId) { this.socketToClientId.set(socket, clientId); this.clientIdToSocket.set(clientId, socket); } // Emit both events so IpcChannel can process it this.emit('clientMessage', message, socket); this.emit('message', message); } catch (error: any) { this.emit('error', new Error(`Failed to parse message: ${error.message}`)); } } else { // Not enough data for the complete message break; } } } // Store the buffer and length for next time socketBuffers.set(socket, buffer); if (this.clientLengths) { if (currentLength !== null) { this.clientLengths.set(socket, currentLength); } else { this.clientLengths.delete(socket); } } else { this.clientLengths = new WeakMap(); if (currentLength !== null) { this.clientLengths.set(socket, currentLength); } } } private clientBuffers?: WeakMap; private clientLengths?: WeakMap; /** * Setup socket event handlers */ private setupSocketHandlers(socket: plugins.net.Socket): void { socket.on('data', (data) => { this.parseIncomingData(data); }); socket.on('error', (error) => { this.handleError(error); }); socket.on('close', () => { this.connected = false; this.emit('disconnect'); }); socket.on('drain', () => { this.emit('drain'); }); } /** * Disconnect the transport */ public async disconnect(): Promise { if (this.socket) { this.socket.destroy(); this.socket = null; } if (this.server) { for (const client of this.clients) { client.destroy(); } this.clients.clear(); await new Promise((resolve) => { this.server!.close(() => resolve()); }); this.server = null; // Clean up socket file try { plugins.fs.unlinkSync(this.getSocketPath()); } catch (error) { // Ignore cleanup errors } } this.connected = false; this.emit('disconnect'); } /** * Send a message */ public async send(message: IIpcMessageEnvelope): Promise { const frame = this.frameMessage(message); if (this.socket) { // Client mode return new Promise((resolve) => { const success = this.socket!.write(frame, (error) => { if (error) { this.handleError(error); resolve(false); } else { resolve(true); } }); // Handle backpressure if (!success) { this.socket!.once('drain', () => resolve(true)); } }); } else if (this.server && this.clients.size > 0) { // Server mode - route by clientId if present, otherwise broadcast const targetClientId = message.headers?.clientId; if (targetClientId && this.clientIdToSocket.has(targetClientId)) { // Send to specific client const targetSocket = this.clientIdToSocket.get(targetClientId)!; if (targetSocket && !targetSocket.destroyed) { return new Promise((resolve) => { const success = targetSocket.write(frame, (error) => { if (error) { resolve(false); } else { resolve(true); } }); if (!success) { targetSocket.once('drain', () => resolve(true)); } }); } else { // Socket is destroyed, remove from mappings this.clientIdToSocket.delete(targetClientId); return false; } } else { // Broadcast to all clients (fallback for messages without specific target) const promises: Promise[] = []; for (const client of this.clients) { promises.push(new Promise((resolve) => { const success = client.write(frame, (error) => { if (error) { resolve(false); } else { resolve(true); } }); if (!success) { client.once('drain', () => resolve(true)); } })); } const results = await Promise.all(promises); return results.every(r => r); } } return false; } /** * Get the socket path */ private getSocketPath(): string { if (this.options.socketPath) { return this.options.socketPath; } const platform = plugins.os.platform(); const tmpDir = plugins.os.tmpdir(); const socketName = `smartipc-${this.options.id}.sock`; if (platform === 'win32') { // Windows named pipe path return `\\\\.\\pipe\\${socketName}`; } else { // Unix domain socket path return plugins.path.join(tmpDir, socketName); } } } /** * Named pipe transport for Windows */ export class NamedPipeTransport extends UnixSocketTransport { // Named pipes on Windows use the same net module interface // The main difference is the path format, which is handled in getSocketPath() // Additional Windows-specific handling can be added here if needed } /** * TCP transport for network IPC */ export class TcpTransport extends IpcTransport { private socket: plugins.net.Socket | null = null; private server: plugins.net.Server | null = null; private clients: Set = new Set(); /** * Connect as client or start as server */ public async connect(): Promise { return new Promise((resolve, reject) => { const host = this.options.host || 'localhost'; const port = this.options.port || 8765; // Try to connect as client first this.socket = new plugins.net.Socket(); if (this.options.noDelay !== false) { this.socket.setNoDelay(true); } if (this.options.timeout) { this.socket.setTimeout(this.options.timeout); } this.socket.on('connect', () => { this.connected = true; this.setupSocketHandlers(this.socket!); this.emit('connect'); resolve(); }); this.socket.on('error', (error: any) => { if (error.code === 'ECONNREFUSED') { // No server exists, we should become the server this.socket = null; this.startServer(host, port).then(resolve).catch(reject); } else { reject(error); } }); this.socket.connect(port, host); }); } /** * Start as server */ private async startServer(host: string, port: number): Promise { return new Promise((resolve, reject) => { this.server = plugins.net.createServer((socket) => { this.clients.add(socket); if (this.options.noDelay !== false) { socket.setNoDelay(true); } if (this.options.timeout) { socket.setTimeout(this.options.timeout); } this.setupSocketHandlers(socket); socket.on('close', () => { this.clients.delete(socket); }); }); this.server.on('error', reject); this.server.listen(port, host, () => { this.connected = true; this.emit('connect'); resolve(); }); }); } /** * Setup socket event handlers */ private setupSocketHandlers(socket: plugins.net.Socket): void { socket.on('data', (data) => { this.parseIncomingData(data); }); socket.on('error', (error) => { this.handleError(error); }); socket.on('close', () => { this.connected = false; this.emit('disconnect'); }); socket.on('timeout', () => { this.handleError(new Error('Socket timeout')); socket.destroy(); }); socket.on('drain', () => { this.emit('drain'); }); } /** * Disconnect the transport */ public async disconnect(): Promise { if (this.socket) { this.socket.destroy(); this.socket = null; } if (this.server) { for (const client of this.clients) { client.destroy(); } this.clients.clear(); await new Promise((resolve) => { this.server!.close(() => resolve()); }); this.server = null; } this.connected = false; this.emit('disconnect'); } /** * Send a message */ public async send(message: IIpcMessageEnvelope): Promise { const frame = this.frameMessage(message); if (this.socket) { // Client mode return new Promise((resolve) => { const success = this.socket!.write(frame, (error) => { if (error) { this.handleError(error); resolve(false); } else { resolve(true); } }); // Handle backpressure if (!success) { this.socket!.once('drain', () => resolve(true)); } }); } else if (this.server && this.clients.size > 0) { // Server mode - broadcast to all clients const promises: Promise[] = []; for (const client of this.clients) { promises.push(new Promise((resolve) => { const success = client.write(frame, (error) => { if (error) { resolve(false); } else { resolve(true); } }); if (!success) { client.once('drain', () => resolve(true)); } })); } const results = await Promise.all(promises); return results.every(r => r); } return false; } } /** * Factory function to create appropriate transport based on platform and options */ export function createTransport(options: IIpcTransportOptions): IpcTransport { // If TCP is explicitly requested if (options.host || options.port) { return new TcpTransport(options); } // Platform-specific default transport const platform = plugins.os.platform(); if (platform === 'win32') { return new NamedPipeTransport(options); } else { return new UnixSocketTransport(options); } }