import * as plugins from './smartipc.plugins.js'; import { IpcChannel } from './classes.ipcchannel.js'; import type { IIpcChannelOptions } from './classes.ipcchannel.js'; /** * Options for IPC Server */ export interface IServerStartOptions { /** When to consider server ready (default: 'socket-bound') */ readyWhen?: 'socket-bound' | 'accepting'; } export interface IIpcServerOptions extends Omit { /** Maximum number of client connections */ maxClients?: number; /** Client idle timeout in ms */ clientIdleTimeout?: number; /** Automatically cleanup stale socket file on start (default: false) */ autoCleanupSocketFile?: boolean; /** Socket file permissions mode (e.g. 0o600) */ socketMode?: number; } /** * Client connection information */ interface IClientConnection { id: string; channel: IpcChannel; connectedAt: number; lastActivity: number; metadata?: Record; } /** * IPC Server for handling multiple client connections */ export class IpcServer extends plugins.EventEmitter { private options: IIpcServerOptions; private clients = new Map(); private messageHandlers = new Map any | Promise>(); private primaryChannel?: IpcChannel; private isRunning = false; private isReady = false; private clientIdleCheckTimer?: NodeJS.Timeout; // Pub/sub tracking private topicIndex = new Map>(); // topic -> clientIds private clientTopics = new Map>(); // clientId -> topics constructor(options: IIpcServerOptions) { super(); this.options = { maxClients: Infinity, clientIdleTimeout: 0, // 0 means no timeout ...options }; } /** * Start the server */ public async start(options: IServerStartOptions = {}): Promise { if (this.isRunning) { return; } // Create primary channel for initial connections this.primaryChannel = new IpcChannel({ ...this.options, autoReconnect: false // Server doesn't auto-reconnect }); // Register the __register__ handler on the channel this.primaryChannel.on('__register__', async (payload: { clientId: string; metadata?: Record }) => { const clientId = payload.clientId; const metadata = payload.metadata; // Check max clients if (this.clients.size >= this.options.maxClients!) { return { success: false, error: 'Maximum number of clients reached' }; } // Create new client connection const clientConnection: IClientConnection = { id: clientId, channel: this.primaryChannel!, connectedAt: Date.now(), lastActivity: Date.now(), metadata: metadata }; this.clients.set(clientId, clientConnection); this.emit('clientConnect', clientId, metadata); return { success: true, clientId: clientId }; }); // Handle other messages this.primaryChannel.on('message', (message) => { // Extract client ID from message headers const clientId = message.headers?.clientId || 'unknown'; // Update last activity if (this.clients.has(clientId)) { this.clients.get(clientId)!.lastActivity = Date.now(); } // Handle pub/sub messages if (message.type === '__subscribe__') { const topic = message.payload?.topic; if (typeof topic === 'string' && topic.length) { let set = this.topicIndex.get(topic); if (!set) this.topicIndex.set(topic, (set = new Set())); set.add(clientId); let cset = this.clientTopics.get(clientId); if (!cset) this.clientTopics.set(clientId, (cset = new Set())); cset.add(topic); } return; } if (message.type === '__unsubscribe__') { const topic = message.payload?.topic; const set = this.topicIndex.get(topic); if (set) { set.delete(clientId); if (set.size === 0) this.topicIndex.delete(topic); } const cset = this.clientTopics.get(clientId); if (cset) { cset.delete(topic); if (cset.size === 0) this.clientTopics.delete(clientId); } return; } if (message.type === '__publish__') { const topic = message.payload?.topic; const payload = message.payload?.payload; const targets = this.topicIndex.get(topic); if (targets && targets.size) { // Send to subscribers const sends: Promise[] = []; for (const subClientId of targets) { sends.push( this.sendToClient(subClientId, `topic:${topic}`, payload) .catch(err => { this.emit('error', err, subClientId); }) ); } Promise.allSettled(sends).catch(() => {}); } return; } // Forward to registered handlers if (this.messageHandlers.has(message.type)) { const handler = this.messageHandlers.get(message.type)!; // If message expects a response if (message.headers?.requiresResponse && message.id) { Promise.resolve() .then(() => handler(message.payload, clientId)) .then((result) => { return this.primaryChannel!.sendMessage( `${message.type}_response`, result, { correlationId: message.id, clientId } ); }) .catch((error) => { return this.primaryChannel!.sendMessage( `${message.type}_response`, null, { correlationId: message.id, error: error.message, clientId } ); }); } else { // Fire and forget handler(message.payload, clientId); } } // Emit raw message event this.emit('message', message, clientId); }); // Setup primary channel handlers this.primaryChannel.on('disconnect', () => { // Server disconnected, clear all clients and subscriptions for (const [clientId] of this.clients) { this.cleanupClientSubscriptions(clientId); } this.clients.clear(); }); this.primaryChannel.on('error', (error) => { this.emit('error', error, 'server'); }); this.primaryChannel.on('heartbeatTimeout', (error) => { // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) this.emit('heartbeatTimeout', error, 'server'); }); // Connect the primary channel (will start as server) await this.primaryChannel.connect(); this.isRunning = true; this.startClientIdleCheck(); this.emit('start'); // Track individual client disconnects forwarded by the channel/transport this.primaryChannel.on('clientDisconnected', (clientId?: string) => { if (!clientId) return; // Clean up any topic subscriptions and client map entry this.cleanupClientSubscriptions(clientId); if (this.clients.has(clientId)) { this.clients.delete(clientId); this.emit('clientDisconnect', clientId); } }); // Handle readiness based on options if (options.readyWhen === 'accepting') { // Wait a bit to ensure handlers are fully set up await new Promise(resolve => setTimeout(resolve, 10)); this.isReady = true; this.emit('ready'); } else { // Default: ready when socket is bound this.isReady = true; this.emit('ready'); } } /** * Stop the server */ public async stop(): Promise { if (!this.isRunning) { return; } this.isRunning = false; this.stopClientIdleCheck(); // Disconnect all clients const disconnectPromises: Promise[] = []; for (const [clientId, client] of this.clients) { disconnectPromises.push( client.channel.disconnect() .then(() => { this.emit('clientDisconnect', clientId); }) .catch(() => {}) // Ignore disconnect errors ); } await Promise.all(disconnectPromises); this.clients.clear(); // Disconnect primary channel if (this.primaryChannel) { await this.primaryChannel.disconnect(); this.primaryChannel = undefined; } this.emit('stop'); } /** * Setup channel event handlers */ private setupChannelHandlers(channel: IpcChannel, clientId: string): void { // Handle client registration channel.on('__register__', async (payload: { clientId: string; metadata?: Record }) => { if (payload.clientId && payload.clientId !== clientId) { // New client registration const newClientId = payload.clientId; // Check max clients if (this.clients.size >= this.options.maxClients!) { throw new Error('Maximum number of clients reached'); } // Create new client connection const clientConnection: IClientConnection = { id: newClientId, channel: channel, connectedAt: Date.now(), lastActivity: Date.now(), metadata: payload.metadata }; this.clients.set(newClientId, clientConnection); this.emit('clientConnect', newClientId, payload.metadata); // Now messages from this channel should be associated with the new client ID clientId = newClientId; return { success: true, clientId: newClientId }; } return { success: false, error: 'Invalid registration' }; }); // Handle messages - pass the correct clientId channel.on('message', (message) => { // Try to find the actual client ID for this channel let actualClientId = clientId; for (const [id, client] of this.clients) { if (client.channel === channel) { actualClientId = id; break; } } // Update last activity if (actualClientId !== 'primary' && this.clients.has(actualClientId)) { this.clients.get(actualClientId)!.lastActivity = Date.now(); } // Forward to registered handlers if (this.messageHandlers.has(message.type)) { const handler = this.messageHandlers.get(message.type)!; handler(message.payload, actualClientId); } // Emit raw message event this.emit('message', message, actualClientId); }); // Handle disconnect channel.on('disconnect', () => { // Find and remove the actual client for (const [id, client] of this.clients) { if (client.channel === channel) { this.clients.delete(id); this.emit('clientDisconnect', id); break; } } }); // Handle errors channel.on('error', (error) => { // Find the actual client ID for this channel let actualClientId = clientId; for (const [id, client] of this.clients) { if (client.channel === channel) { actualClientId = id; break; } } this.emit('error', error, actualClientId); }); channel.on('heartbeatTimeout', (error) => { // Find the actual client ID for this channel let actualClientId = clientId; for (const [id, client] of this.clients) { if (client.channel === channel) { actualClientId = id; break; } } // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) this.emit('heartbeatTimeout', error, actualClientId); }); } /** * Register a message handler */ public onMessage(type: string, handler: (payload: any, clientId: string) => any | Promise): void { this.messageHandlers.set(type, handler); } /** * Send message to specific client */ public async sendToClient(clientId: string, type: string, payload: any, headers?: Record): Promise { const client = this.clients.get(clientId); if (!client) { throw new Error(`Client ${clientId} not found`); } // Ensure the target clientId is part of the headers so the transport // can route the message to the correct socket instead of broadcasting. const routedHeaders: Record | undefined = { ...(headers || {}), clientId, }; await client.channel.sendMessage(type, payload, routedHeaders); } /** * Send request to specific client and wait for response */ public async requestFromClient( clientId: string, type: string, payload: TReq, options?: { timeout?: number; headers?: Record } ): Promise { const client = this.clients.get(clientId); if (!client) { throw new Error(`Client ${clientId} not found`); } return client.channel.request(type, payload, options); } /** * Broadcast message to all clients */ public async broadcast(type: string, payload: any, headers?: Record): Promise { const promises: Promise[] = []; for (const [clientId] of this.clients) { promises.push( this.sendToClient(clientId, type, payload, headers).catch((error) => { this.emit('error', error, clientId); }) ); } await Promise.all(promises); } /** * Broadcast message to clients matching a filter */ public async broadcastTo( filter: (clientId: string, metadata?: Record) => boolean, type: string, payload: any, headers?: Record ): Promise { const promises: Promise[] = []; for (const [clientId, client] of this.clients) { if (filter(clientId, client.metadata)) { promises.push( this.sendToClient(clientId, type, payload, headers).catch((error) => { this.emit('error', error, clientId); }) ); } } await Promise.all(promises); } /** * Get connected client IDs */ public getClientIds(): string[] { return Array.from(this.clients.keys()); } /** * Get client information */ public getClientInfo(clientId: string): { id: string; connectedAt: number; lastActivity: number; metadata?: Record; } | undefined { const client = this.clients.get(clientId); if (!client) { return undefined; } return { id: client.id, connectedAt: client.connectedAt, lastActivity: client.lastActivity, metadata: client.metadata }; } /** * Disconnect a specific client */ public async disconnectClient(clientId: string): Promise { const client = this.clients.get(clientId); if (!client) { return; } await client.channel.disconnect(); this.clients.delete(clientId); this.cleanupClientSubscriptions(clientId); this.emit('clientDisconnect', clientId); } /** * Clean up topic subscriptions for a disconnected client */ private cleanupClientSubscriptions(clientId: string): void { const topics = this.clientTopics.get(clientId); if (topics) { for (const topic of topics) { const set = this.topicIndex.get(topic); if (set) { set.delete(clientId); if (set.size === 0) this.topicIndex.delete(topic); } } this.clientTopics.delete(clientId); } } /** * Start checking for idle clients */ private startClientIdleCheck(): void { if (!this.options.clientIdleTimeout || this.options.clientIdleTimeout <= 0) { return; } this.clientIdleCheckTimer = setInterval(() => { const now = Date.now(); const timeout = this.options.clientIdleTimeout!; for (const [clientId, client] of this.clients) { if (now - client.lastActivity > timeout) { this.disconnectClient(clientId).catch(() => {}); } } }, this.options.clientIdleTimeout / 2); } /** * Stop checking for idle clients */ private stopClientIdleCheck(): void { if (this.clientIdleCheckTimer) { clearInterval(this.clientIdleCheckTimer); this.clientIdleCheckTimer = undefined; } } /** * Get server statistics */ public getStats(): { isRunning: boolean; connectedClients: number; maxClients: number; uptime?: number; } { return { isRunning: this.isRunning, connectedClients: this.clients.size, maxClients: this.options.maxClients!, uptime: this.primaryChannel ? Date.now() - (this.primaryChannel as any).connectedAt : undefined }; } /** * Check if server is ready to accept connections */ public getIsReady(): boolean { return this.isReady; } }