import * as plugins from './smartipc.plugins.js'; import { IpcTransport, createTransport } from './classes.transports.js'; import type { IIpcMessageEnvelope, IIpcTransportOptions } from './classes.transports.js'; /** * Options for IPC channel */ export interface IIpcChannelOptions extends IIpcTransportOptions { /** Enable automatic reconnection */ autoReconnect?: boolean; /** Initial reconnect delay in ms */ reconnectDelay?: number; /** Maximum reconnect delay in ms */ maxReconnectDelay?: number; /** Reconnect delay multiplier */ reconnectMultiplier?: number; /** Maximum number of reconnect attempts */ maxReconnectAttempts?: number; /** Enable heartbeat */ heartbeat?: boolean; /** Heartbeat interval in ms */ heartbeatInterval?: number; /** Heartbeat timeout in ms */ heartbeatTimeout?: number; /** Initial grace period before heartbeat timeout in ms */ heartbeatInitialGracePeriodMs?: number; /** Throw on heartbeat timeout (default: true, set false to emit event instead) */ heartbeatThrowOnTimeout?: boolean; /** Maximum concurrent streams (incoming/outgoing) */ maxConcurrentStreams?: number; } /** * Request/Response tracking */ interface IPendingRequest { resolve: (value: T) => void; reject: (error: Error) => void; timer?: NodeJS.Timeout; } /** * IPC Channel with connection management, auto-reconnect, and typed messaging */ export class IpcChannel extends plugins.EventEmitter { private transport: IpcTransport; private options: IIpcChannelOptions; private pendingRequests = new Map(); private messageHandlers = new Map any | Promise>(); private reconnectAttempts = 0; private reconnectTimer?: NodeJS.Timeout; private heartbeatTimer?: NodeJS.Timeout; private heartbeatCheckTimer?: NodeJS.Timeout; private heartbeatGraceTimer?: NodeJS.Timeout; private lastHeartbeat: number = Date.now(); private connectionStartTime: number = Date.now(); private isReconnecting = false; private isClosing = false; // Streaming state private incomingStreams = new Map(); private incomingStreamMeta = new Map>(); private outgoingStreams = new Map void }>(); private activeIncomingStreams = 0; private activeOutgoingStreams = 0; // Metrics private metrics = { messagesSent: 0, messagesReceived: 0, bytesSent: 0, bytesReceived: 0, reconnects: 0, heartbeatTimeouts: 0, errors: 0, requestTimeouts: 0, connectedAt: 0 }; constructor(options: IIpcChannelOptions) { super(); this.options = { autoReconnect: true, reconnectDelay: 1000, maxReconnectDelay: 30000, reconnectMultiplier: 1.5, maxReconnectAttempts: Infinity, heartbeat: true, heartbeatInterval: 5000, heartbeatTimeout: 10000, maxConcurrentStreams: 32, ...options }; // Normalize heartbeatThrowOnTimeout to boolean (defensive for JS consumers) const throwOnTimeout = (this.options as any).heartbeatThrowOnTimeout; if (throwOnTimeout !== undefined) { if (throwOnTimeout === 'false') { this.options.heartbeatThrowOnTimeout = false; } else if (throwOnTimeout === 'true') { this.options.heartbeatThrowOnTimeout = true; } else if (typeof throwOnTimeout !== 'boolean') { this.options.heartbeatThrowOnTimeout = Boolean(throwOnTimeout); } } this.transport = createTransport(this.options); this.setupTransportHandlers(); } /** * Setup transport event handlers */ private setupTransportHandlers(): void { this.transport.on('connect', () => { this.reconnectAttempts = 0; this.isReconnecting = false; this.metrics.connectedAt = Date.now(); this.startHeartbeat(); this.emit('connect'); }); this.transport.on('disconnect', (reason) => { this.stopHeartbeat(); this.clearPendingRequests(new Error(`Disconnected: ${reason || 'Unknown reason'}`)); this.emit('disconnect', reason); if (this.options.autoReconnect && !this.isClosing) { this.scheduleReconnect(); } }); this.transport.on('error', (error) => { this.emit('error', error); }); this.transport.on('message', (message: IIpcMessageEnvelope) => { this.handleMessage(message); }); // Forward per-client disconnects from transports that support multi-client servers // We re-emit a 'clientDisconnected' event with the clientId if known so higher layers can act. // eslint-disable-next-line @typescript-eslint/no-explicit-any (this.transport as any).on?.('clientDisconnected', (_socket: any, clientId?: string) => { this.emit('clientDisconnected', clientId); }); this.transport.on('drain', () => { this.emit('drain'); }); } /** * Connect the channel */ public async connect(): Promise { if (this.transport.isConnected()) { return; } try { await this.transport.connect(); } catch (error) { this.emit('error', error); if (this.options.autoReconnect && !this.isClosing) { this.scheduleReconnect(); } else { throw error; } } } /** * Disconnect the channel */ public async disconnect(): Promise { this.isClosing = true; this.stopHeartbeat(); this.cancelReconnect(); this.clearPendingRequests(new Error('Channel closed')); await this.transport.disconnect(); } /** * Schedule a reconnection attempt */ private scheduleReconnect(): void { if (this.isReconnecting || this.isClosing) { return; } if (this.options.maxReconnectAttempts !== Infinity && this.reconnectAttempts >= this.options.maxReconnectAttempts) { this.emit('error', new Error('Maximum reconnection attempts reached')); return; } this.isReconnecting = true; this.reconnectAttempts++; // Calculate delay with exponential backoff and jitter const baseDelay = Math.min( this.options.reconnectDelay! * Math.pow(this.options.reconnectMultiplier!, this.reconnectAttempts - 1), this.options.maxReconnectDelay! ); const jitter = Math.random() * 0.1 * baseDelay; // 10% jitter const delay = baseDelay + jitter; this.emit('reconnecting', { attempt: this.reconnectAttempts, delay }); this.reconnectTimer = setTimeout(async () => { try { await this.transport.connect(); } catch (error) { // Connection failed, will be rescheduled by disconnect handler } }, delay); } /** * Cancel scheduled reconnection */ private cancelReconnect(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = undefined; } this.isReconnecting = false; } /** * Start heartbeat mechanism */ private startHeartbeat(): void { if (!this.options.heartbeat) { return; } this.stopHeartbeat(); this.lastHeartbeat = Date.now(); this.connectionStartTime = Date.now(); // Send heartbeat messages this.heartbeatTimer = setInterval(() => { this.sendMessage('__heartbeat__', { timestamp: Date.now() }).catch(() => { // Ignore heartbeat send errors }); }, this.options.heartbeatInterval!); // Delay starting the check until after the grace period const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0; if (gracePeriod > 0) { // Use a timer to delay the first check this.heartbeatGraceTimer = setTimeout(() => { this.startHeartbeatCheck(); }, gracePeriod); } else { // No grace period, start checking immediately this.startHeartbeatCheck(); } } /** * Start heartbeat timeout checking (separated for grace period handling) */ private startHeartbeatCheck(): void { // Check for heartbeat timeout this.heartbeatCheckTimer = setInterval(() => { const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) { const error = new Error('Heartbeat timeout'); if (this.options.heartbeatThrowOnTimeout !== false) { // Default behavior: emit error which may cause disconnect this.emit('error', error); this.transport.disconnect().catch(() => {}); } else { // Emit heartbeatTimeout event instead of error this.emit('heartbeatTimeout', error); // Clear timers to avoid repeated events this.stopHeartbeat(); } } }, Math.max(1000, Math.floor(this.options.heartbeatTimeout! / 2))); } /** * Stop heartbeat mechanism */ private stopHeartbeat(): void { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = undefined; } if (this.heartbeatCheckTimer) { clearInterval(this.heartbeatCheckTimer); this.heartbeatCheckTimer = undefined; } if (this.heartbeatGraceTimer) { clearTimeout(this.heartbeatGraceTimer); this.heartbeatGraceTimer = undefined; } } /** * Handle incoming messages */ private handleMessage(message: IIpcMessageEnvelope): void { // Track metrics this.metrics.messagesReceived++; this.metrics.bytesReceived += JSON.stringify(message).length; // Handle heartbeat and send response if (message.type === '__heartbeat__') { this.lastHeartbeat = Date.now(); // Reply so the sender also observes liveness this.transport.send({ id: plugins.crypto.randomUUID(), type: '__heartbeat_response__', correlationId: message.id, timestamp: Date.now(), payload: { timestamp: Date.now() }, headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined }).catch(() => {}); return; } // Handle heartbeat response if (message.type === '__heartbeat_response__') { this.lastHeartbeat = Date.now(); return; } // Handle streaming control messages if (message.type === '__stream_init__') { const streamId = (message.payload as any)?.streamId as string; const meta = (message.payload as any)?.meta as Record | undefined; if (typeof streamId === 'string' && streamId.length) { // Enforce max concurrent incoming streams if (this.activeIncomingStreams >= (this.options.maxConcurrentStreams || Infinity)) { const response: IIpcMessageEnvelope = { id: plugins.crypto.randomUUID(), type: '__stream_error__', timestamp: Date.now(), payload: { streamId, error: 'Max concurrent streams exceeded' }, headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined }; this.transport.send(response).catch(() => {}); return; } const pass = new plugins.stream.PassThrough(); this.incomingStreams.set(streamId, pass); if (meta) this.incomingStreamMeta.set(streamId, meta); this.activeIncomingStreams++; // Emit a high-level stream event const headersClientId = message.headers?.clientId; const eventPayload = { streamId, meta: meta || {}, headers: message.headers || {}, clientId: headersClientId, }; // Emit as ('stream', info, readable) this.emit('stream', eventPayload, pass); } return; } if (message.type === '__stream_chunk__') { const streamId = (message.payload as any)?.streamId as string; const chunkB64 = (message.payload as any)?.chunk as string; const pass = this.incomingStreams.get(streamId); if (pass && typeof chunkB64 === 'string') { try { const chunk = Buffer.from(chunkB64, 'base64'); pass.write(chunk); } catch (e) { // If decode fails, destroy stream pass.destroy(e as Error); this.incomingStreams.delete(streamId); this.incomingStreamMeta.delete(streamId); } } return; } if (message.type === '__stream_end__') { const streamId = (message.payload as any)?.streamId as string; const pass = this.incomingStreams.get(streamId); if (pass) { pass.end(); this.incomingStreams.delete(streamId); this.incomingStreamMeta.delete(streamId); this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1); } return; } if (message.type === '__stream_error__') { const streamId = (message.payload as any)?.streamId as string; const errMsg = (message.payload as any)?.error as string; const pass = this.incomingStreams.get(streamId); if (pass) { pass.destroy(new Error(errMsg || 'stream error')); this.incomingStreams.delete(streamId); this.incomingStreamMeta.delete(streamId); this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1); } return; } if (message.type === '__stream_cancel__') { const streamId = (message.payload as any)?.streamId as string; // Cancel outgoing stream with same id if present const ctrl = this.outgoingStreams.get(streamId); if (ctrl) { ctrl.cancelled = true; try { ctrl.abort?.(); } catch {} this.outgoingStreams.delete(streamId); this.activeOutgoingStreams = Math.max(0, this.activeOutgoingStreams - 1); } // Also cancel any incoming stream if tracked const pass = this.incomingStreams.get(streamId); if (pass) { try { pass.destroy(new Error('stream cancelled')); } catch {} this.incomingStreams.delete(streamId); this.incomingStreamMeta.delete(streamId); this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1); } return; } // Handle request/response if (message.correlationId && this.pendingRequests.has(message.correlationId)) { const pending = this.pendingRequests.get(message.correlationId)!; this.pendingRequests.delete(message.correlationId); if (pending.timer) { clearTimeout(pending.timer); } if (message.headers?.error) { pending.reject(new Error(message.headers.error)); } else { pending.resolve(message.payload); } return; } // Handle regular messages if (this.messageHandlers.has(message.type)) { const handler = this.messageHandlers.get(message.type)!; // If message expects a response if (message.headers?.requiresResponse && message.id) { Promise.resolve() .then(() => handler(message.payload)) .then((result) => { const response: IIpcMessageEnvelope = { id: plugins.crypto.randomUUID(), type: `${message.type}_response`, correlationId: message.id, timestamp: Date.now(), payload: result, headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined }; return this.transport.send(response); }) .catch((error: any) => { const response: IIpcMessageEnvelope = { id: plugins.crypto.randomUUID(), type: `${message.type}_response`, correlationId: message.id, timestamp: Date.now(), payload: null, headers: { error: error.message, ...(message.headers?.clientId ? { clientId: message.headers.clientId } : {}) } }; return this.transport.send(response); }); } else { // Fire and forget try { handler(message.payload); } catch (error) { this.emit('error', error); } } } else { // Emit unhandled message this.emit('message', message); } } /** * Send a message without expecting a response */ public async sendMessage(type: string, payload: any, headers?: Record): Promise { // Extract correlationId from headers and place it at top level const { correlationId, ...restHeaders } = headers ?? {}; const message: IIpcMessageEnvelope = { id: plugins.crypto.randomUUID(), type, timestamp: Date.now(), payload, ...(correlationId ? { correlationId } : {}), headers: Object.keys(restHeaders).length ? restHeaders : undefined }; const success = await this.transport.send(message); if (!success) { this.metrics.errors++; throw new Error('Failed to send message'); } // Track metrics this.metrics.messagesSent++; this.metrics.bytesSent += JSON.stringify(message).length; } /** * Send a request and wait for response */ public async request( type: string, payload: TReq, options?: { timeout?: number; headers?: Record } ): Promise { const messageId = plugins.crypto.randomUUID(); const timeout = options?.timeout || 30000; const message: IIpcMessageEnvelope = { id: messageId, type, timestamp: Date.now(), payload, headers: { ...options?.headers, requiresResponse: true } }; return new Promise((resolve, reject) => { // Setup timeout const timer = setTimeout(() => { this.pendingRequests.delete(messageId); reject(new Error(`Request timeout for ${type}`)); }, timeout); // Store pending request this.pendingRequests.set(messageId, { resolve, reject, timer }); // Send message with better error handling this.transport.send(message) .then((success) => { if (!success) { this.pendingRequests.delete(messageId); clearTimeout(timer); reject(new Error('Failed to send message')); } }) .catch((error) => { this.pendingRequests.delete(messageId); clearTimeout(timer); reject(error); }); }); } /** * Register a message handler */ public on(event: string, handler: (payload: any) => any | Promise): this { if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout' || event === 'clientDisconnected' || event === 'stream') { // Special handling for channel events super.on(event, handler); } else { // Register as message type handler this.messageHandlers.set(event, handler); } return this; } /** * Clear all pending requests */ private clearPendingRequests(error: Error): void { for (const [id, pending] of this.pendingRequests) { if (pending.timer) { clearTimeout(pending.timer); } pending.reject(error); } this.pendingRequests.clear(); } /** * Check if channel is connected */ public isConnected(): boolean { return this.transport.isConnected(); } /** * Get channel statistics */ public getStats(): { connected: boolean; reconnectAttempts: number; pendingRequests: number; isReconnecting: boolean; metrics: { messagesSent: number; messagesReceived: number; bytesSent: number; bytesReceived: number; reconnects: number; heartbeatTimeouts: number; errors: number; requestTimeouts: number; uptime?: number; }; } { return { connected: this.transport.isConnected(), reconnectAttempts: this.reconnectAttempts, pendingRequests: this.pendingRequests.size, isReconnecting: this.isReconnecting, metrics: { ...this.metrics, uptime: this.metrics.connectedAt ? Date.now() - this.metrics.connectedAt : undefined } }; } } /** * Streaming helpers */ export interface IStreamSendOptions { headers?: Record; chunkSize?: number; // bytes, default 64k streamId?: string; meta?: Record; } export type ReadableLike = NodeJS.ReadableStream | plugins.stream.Readable; // Extend IpcChannel with a sendStream method export interface IpcChannel { sendStream(readable: ReadableLike, options?: IStreamSendOptions): Promise; cancelOutgoingStream(streamId: string, headers?: Record): Promise; cancelIncomingStream(streamId: string, headers?: Record): Promise; } IpcChannel.prototype.sendStream = async function(this: IpcChannel, readable: ReadableLike, options?: IStreamSendOptions): Promise { const streamId = options?.streamId || (plugins.crypto.randomUUID ? plugins.crypto.randomUUID() : `${Date.now()}-${Math.random()}`); const headers = options?.headers || {}; const chunkSize = Math.max(1024, Math.min(options?.chunkSize || 64 * 1024, (this as any).options.maxMessageSize || 8 * 1024 * 1024)); const self: any = this; // Enforce max concurrent outgoing streams (reserve a slot synchronously) if (self.activeOutgoingStreams >= (self.options.maxConcurrentStreams || Infinity)) { throw new Error('Max concurrent streams exceeded'); } self.activeOutgoingStreams++; self.outgoingStreams.set(streamId, { cancelled: false, abort: () => { try { (readable as any).destroy?.(new Error('stream cancelled')); } catch {} } }); try { // Send init after reserving slot await (this as any).sendMessage('__stream_init__', { streamId, meta: options?.meta || {} }, headers); } catch (e) { self.outgoingStreams.delete(streamId); self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); throw e; } const readChunkAndSend = async (buf: Buffer) => { // Slice into chunkSize frames if needed for (let offset = 0; offset < buf.length; offset += chunkSize) { const ctrl = self.outgoingStreams.get(streamId); if (ctrl?.cancelled) { return; } const slice = buf.subarray(offset, Math.min(offset + chunkSize, buf.length)); const chunkB64 = slice.toString('base64'); await (this as any).sendMessage('__stream_chunk__', { streamId, chunk: chunkB64 }, headers); } }; await new Promise((resolve, reject) => { let sending = Promise.resolve(); readable.on('data', (chunk: any) => { const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // Ensure sequential sending to avoid write races sending = sending.then(() => readChunkAndSend(buf)); sending.catch(reject); }); readable.on('end', async () => { try { await sending; await (this as any).sendMessage('__stream_end__', { streamId }, headers); self.outgoingStreams.delete(streamId); self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); resolve(); } catch (e) { self.outgoingStreams.delete(streamId); self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); reject(e); } }); readable.on('error', async (err: Error) => { try { await sending.catch(() => {}); await (this as any).sendMessage('__stream_error__', { streamId, error: err.message }, headers); } finally { self.outgoingStreams.delete(streamId); self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); reject(err); } }); // In case the stream is already ended const r = readable as any; if (r.readableEnded) { (async () => { await (this as any).sendMessage('__stream_end__', { streamId }, headers); self.outgoingStreams.delete(streamId); self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); resolve(); })().catch(reject); } }); }; IpcChannel.prototype.cancelOutgoingStream = async function(this: IpcChannel, streamId: string, headers?: Record): Promise { const self: any = this; const ctrl = self.outgoingStreams.get(streamId); if (ctrl) { ctrl.cancelled = true; try { ctrl.abort?.(); } catch {} self.outgoingStreams.delete(streamId); self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); } await (this as any).sendMessage('__stream_cancel__', { streamId }, headers || {}); }; IpcChannel.prototype.cancelIncomingStream = async function(this: IpcChannel, streamId: string, headers?: Record): Promise { const self: any = this; const pass = self.incomingStreams.get(streamId); if (pass) { try { pass.destroy(new Error('stream cancelled')); } catch {} self.incomingStreams.delete(streamId); self.incomingStreamMeta.delete(streamId); self.activeIncomingStreams = Math.max(0, self.activeIncomingStreams - 1); } await (this as any).sendMessage('__stream_cancel__', { streamId }, headers || {}); };