import * as plugins from './smartipc.plugins.js'; import { IpcChannel } from './classes.ipcchannel.js'; import type { IIpcChannelOptions } from './classes.ipcchannel.js'; /** * Options for IPC Client */ export interface IConnectRetryConfig { /** Enable connection retry */ enabled: boolean; /** Initial delay before first retry in ms */ initialDelay?: number; /** Maximum delay between retries in ms */ maxDelay?: number; /** Maximum number of attempts */ maxAttempts?: number; /** Total timeout for all retry attempts in ms */ totalTimeout?: number; } export interface IClientConnectOptions { /** Wait for server to be ready before attempting connection */ waitForReady?: boolean; /** Timeout for waiting for server readiness in ms */ waitTimeout?: number; } export interface IIpcClientOptions extends IIpcChannelOptions { /** Client identifier */ clientId?: string; /** Client metadata */ metadata?: Record; /** Connection retry configuration */ connectRetry?: IConnectRetryConfig; /** Registration timeout in ms (default: 5000) */ registerTimeoutMs?: number; } /** * IPC Client for connecting to an IPC server */ export class IpcClient extends plugins.EventEmitter { private options: IIpcClientOptions; private channel: IpcChannel; private messageHandlers = new Map any | Promise>(); private isConnected = false; private clientId: string; private didRegisterOnce = false; constructor(options: IIpcClientOptions) { super(); this.options = options; this.clientId = options.clientId || plugins.crypto.randomUUID(); // Create the channel this.channel = new IpcChannel(this.options); this.setupChannelHandlers(); } /** * Connect to the server */ public async connect(connectOptions: IClientConnectOptions = {}): Promise { if (this.isConnected) { return; } // Helper function to attempt registration const attemptRegistration = async (): Promise => { await this.attemptRegistrationInternal(); }; // Helper function to attempt connection with retry const attemptConnection = async (): Promise => { const retryConfig = this.options.connectRetry; const maxAttempts = retryConfig?.maxAttempts || 1; const initialDelay = retryConfig?.initialDelay || 100; const maxDelay = retryConfig?.maxDelay || 1500; const totalTimeout = retryConfig?.totalTimeout || 15000; const startTime = Date.now(); let lastError: Error | undefined; let delay = initialDelay; for (let attempt = 1; attempt <= maxAttempts; attempt++) { // Check total timeout if (totalTimeout && Date.now() - startTime > totalTimeout) { throw new Error(`Connection timeout after ${totalTimeout}ms: ${lastError?.message || 'Unknown error'}`); } try { // Connect the channel await this.channel.connect(); // Attempt registration await attemptRegistration(); return; // Success! } catch (error) { lastError = error as Error; // Disconnect channel for retry await this.channel.disconnect().catch(() => {}); // If this isn't the last attempt and retry is enabled, wait before retrying if (attempt < maxAttempts && retryConfig?.enabled) { // Check if we have time for another attempt if (totalTimeout && Date.now() - startTime + delay > totalTimeout) { break; // Will timeout, don't wait } await new Promise(resolve => setTimeout(resolve, delay)); // Exponential backoff with max limit delay = Math.min(delay * 2, maxDelay); } } } // All attempts failed throw lastError || new Error('Failed to connect to server'); }; // If waitForReady is specified, wait for server socket to exist first if (connectOptions.waitForReady) { const waitTimeout = connectOptions.waitTimeout || 10000; const startTime = Date.now(); while (Date.now() - startTime < waitTimeout) { try { // Try to connect await attemptConnection(); return; // Success! } catch (error) { // If it's a connection refused error, server might not be ready yet if ((error as any).message?.includes('ECONNREFUSED') || (error as any).message?.includes('ENOENT')) { await new Promise(resolve => setTimeout(resolve, 100)); continue; } // Other errors should be thrown throw error; } } throw new Error(`Server not ready after ${waitTimeout}ms`); } else { // Normal connection attempt await attemptConnection(); } } /** * Attempt to register this client over the current channel connection. * Sets connection flags and emits 'connect' on success. */ private async attemptRegistrationInternal(): Promise { const registerTimeoutMs = this.options.registerTimeoutMs || 5000; try { const response = await this.channel.request( '__register__', { clientId: this.clientId, metadata: this.options.metadata }, { timeout: registerTimeoutMs, headers: { clientId: this.clientId } } ); if (!response.success) { throw new Error(response.error || 'Registration failed'); } this.isConnected = true; this.didRegisterOnce = true; this.emit('connect'); } catch (error: any) { throw new Error(`Failed to register with server: ${error.message}`); } } /** * Disconnect from the server */ public async disconnect(): Promise { if (!this.isConnected) { return; } this.isConnected = false; await this.channel.disconnect(); this.emit('disconnect'); } /** * Setup channel event handlers */ private setupChannelHandlers(): void { // Forward channel events this.channel.on('connect', async () => { // On reconnects, re-register automatically when we had connected before if (this.didRegisterOnce && !this.isConnected) { try { await this.attemptRegistrationInternal(); } catch (error) { this.emit('error', error); } } // For initial connect(), registration is handled explicitly there }); this.channel.on('disconnect', (reason) => { this.isConnected = false; this.emit('disconnect', reason); }); this.channel.on('error', (error: any) => { // If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) { this.emit('heartbeatTimeout', error); return; } this.emit('error', error); }); this.channel.on('heartbeatTimeout', (error) => { // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) this.emit('heartbeatTimeout', error); }); this.channel.on('reconnecting', (info) => { this.emit('reconnecting', info); }); // Handle messages this.channel.on('message', (message) => { // Check if we have a handler for this message type if (this.messageHandlers.has(message.type)) { const handler = this.messageHandlers.get(message.type)!; // If message expects a response if (message.headers?.requiresResponse && message.id) { Promise.resolve() .then(() => handler(message.payload)) .then((result) => { return this.channel.sendMessage( `${message.type}_response`, result, { correlationId: message.id } ); }) .catch((error) => { return this.channel.sendMessage( `${message.type}_response`, null, { correlationId: message.id, error: error.message } ); }); } else { // Fire and forget handler(message.payload); } } else { // Emit unhandled message this.emit('message', message); } }); } /** * Register a message handler */ public onMessage(type: string, handler: (payload: any) => any | Promise): void { this.messageHandlers.set(type, handler); } /** * Send a message to the server */ public async sendMessage(type: string, payload: any, headers?: Record): Promise { if (!this.isConnected) { throw new Error('Client is not connected'); } // Always include clientId in headers await this.channel.sendMessage(type, payload, { ...headers, clientId: this.clientId }); } /** * Send a request to the server and wait for response */ public async request( type: string, payload: TReq, options?: { timeout?: number; headers?: Record } ): Promise { if (!this.isConnected) { throw new Error('Client is not connected'); } // Always include clientId in headers return this.channel.request(type, payload, { ...options, headers: { ...options?.headers, clientId: this.clientId } }); } /** * Subscribe to a topic (pub/sub pattern) */ public async subscribe(topic: string, handler: (payload: any) => void): Promise { // Register local handler this.messageHandlers.set(`topic:${topic}`, handler); // Notify server about subscription await this.sendMessage('__subscribe__', { topic }); } /** * Unsubscribe from a topic */ public async unsubscribe(topic: string): Promise { // Remove local handler this.messageHandlers.delete(`topic:${topic}`); // Notify server about unsubscription await this.sendMessage('__unsubscribe__', { topic }); } /** * Publish to a topic */ public async publish(topic: string, payload: any): Promise { await this.sendMessage('__publish__', { topic, payload }); } /** * Get client ID */ public getClientId(): string { return this.clientId; } /** * Check if client is connected */ public getIsConnected(): boolean { return this.isConnected && this.channel.isConnected(); } /** * Get client statistics */ public getStats(): any { return this.channel.getStats(); } }