diff --git a/changelog.md b/changelog.md index 76a1f22..fdb2667 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # Changelog +## 2025-08-26 - 2.1.2 - fix(core) +Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements + +- IpcChannel: add heartbeatInitialGracePeriod handling — delay heartbeat timeout checks until the grace period elapses and use a minimum check interval (>= 1000ms) +- IpcChannel: add heartbeatGraceTimer and ensure stopHeartbeat clears the grace timer to avoid repeated events +- IpcChannel / Client / Server: forward heartbeatTimeout events instead of only throwing when configured (heartbeatThrowOnTimeout = false) so consumers can handle timeouts via events +- IpcClient: include clientId in registration request headers to enable proper routing on the server/transport side +- UnixSocketTransport: track socket <-> clientId mappings, clean them up on socket close, and update mappings when __register__ or messages containing clientId are received +- UnixSocketTransport: route messages to a specific client when headers.clientId is present (fallback to broadcasting when no target is found), and emit both clientMessage and message for parsed client messages +- ts/index.waitForServer: use SmartIpc.createClient for probing, shorten probe register timeout, and use a slightly longer retry delay between probes for stability + ## 2025-08-25 - 2.1.1 - fix(readme) Update README: expand docs, examples, server readiness, heartbeat, and testing utilities diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 05d21d0..c504b35 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartipc', - version: '2.1.1', + version: '2.1.2', description: 'A library for node inter process communication, providing an easy-to-use API for IPC.' } diff --git a/ts/classes.ipcchannel.ts b/ts/classes.ipcchannel.ts index 0477f0b..d620877 100644 --- a/ts/classes.ipcchannel.ts +++ b/ts/classes.ipcchannel.ts @@ -49,6 +49,7 @@ export class IpcChannel extends plugins.EventEm private reconnectTimer?: NodeJS.Timeout; private heartbeatTimer?: NodeJS.Timeout; private heartbeatCheckTimer?: NodeJS.Timeout; + private heartbeatGraceTimer?: NodeJS.Timeout; private lastHeartbeat: number = Date.now(); private connectionStartTime: number = Date.now(); private isReconnecting = false; @@ -217,16 +218,27 @@ export class IpcChannel extends plugins.EventEm }); }, this.options.heartbeatInterval!); + // Delay starting the check until after the grace period + const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0; + + if (gracePeriod > 0) { + // Use a timer to delay the first check + this.heartbeatGraceTimer = setTimeout(() => { + this.startHeartbeatCheck(); + }, gracePeriod); + } else { + // No grace period, start checking immediately + this.startHeartbeatCheck(); + } + } + + /** + * Start heartbeat timeout checking (separated for grace period handling) + */ + private startHeartbeatCheck(): void { // Check for heartbeat timeout this.heartbeatCheckTimer = setInterval(() => { const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; - const timeSinceConnection = Date.now() - this.connectionStartTime; - const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0; - - // Skip timeout check during initial grace period - if (timeSinceConnection < gracePeriod) { - return; - } if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) { const error = new Error('Heartbeat timeout'); @@ -238,9 +250,11 @@ export class IpcChannel extends plugins.EventEm } else { // Emit heartbeatTimeout event instead of error this.emit('heartbeatTimeout', error); + // Clear timers to avoid repeated events + this.stopHeartbeat(); } } - }, this.options.heartbeatTimeout! / 2); + }, Math.max(1000, Math.floor(this.options.heartbeatTimeout! / 2))); } /** @@ -256,6 +270,11 @@ export class IpcChannel extends plugins.EventEm clearInterval(this.heartbeatCheckTimer); this.heartbeatCheckTimer = undefined; } + + if (this.heartbeatGraceTimer) { + clearTimeout(this.heartbeatGraceTimer); + this.heartbeatGraceTimer = undefined; + } } /** diff --git a/ts/classes.ipcclient.ts b/ts/classes.ipcclient.ts index 74741f3..4d535d4 100644 --- a/ts/classes.ipcclient.ts +++ b/ts/classes.ipcclient.ts @@ -75,7 +75,10 @@ export class IpcClient extends plugins.EventEmitter { clientId: this.clientId, metadata: this.options.metadata }, - { timeout: registerTimeoutMs } + { + timeout: registerTimeoutMs, + headers: { clientId: this.clientId } // Include clientId in headers for proper routing + } ); if (!response.success) { @@ -194,10 +197,20 @@ export class IpcClient extends plugins.EventEmitter { this.emit('disconnect', reason); }); - this.channel.on('error', (error) => { + this.channel.on('error', (error: any) => { + // If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event + if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) { + this.emit('heartbeatTimeout', error); + return; + } this.emit('error', error); }); + this.channel.on('heartbeatTimeout', (error) => { + // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) + this.emit('heartbeatTimeout', error); + }); + this.channel.on('reconnecting', (info) => { this.emit('reconnecting', info); }); diff --git a/ts/classes.ipcserver.ts b/ts/classes.ipcserver.ts index de3e8b3..f42df1f 100644 --- a/ts/classes.ipcserver.ts +++ b/ts/classes.ipcserver.ts @@ -200,6 +200,11 @@ export class IpcServer extends plugins.EventEmitter { this.emit('error', error, 'server'); }); + this.primaryChannel.on('heartbeatTimeout', (error) => { + // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) + this.emit('heartbeatTimeout', error, 'server'); + }); + // Connect the primary channel (will start as server) await this.primaryChannel.connect(); @@ -339,6 +344,19 @@ export class IpcServer extends plugins.EventEmitter { } this.emit('error', error, actualClientId); }); + + channel.on('heartbeatTimeout', (error) => { + // Find the actual client ID for this channel + let actualClientId = clientId; + for (const [id, client] of this.clients) { + if (client.channel === channel) { + actualClientId = id; + break; + } + } + // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) + this.emit('heartbeatTimeout', error, actualClientId); + }); } /** diff --git a/ts/classes.transports.ts b/ts/classes.transports.ts index 5adec6d..2c7eec5 100644 --- a/ts/classes.transports.ts +++ b/ts/classes.transports.ts @@ -169,6 +169,8 @@ export class UnixSocketTransport extends IpcTransport { private socket: plugins.net.Socket | null = null; private server: plugins.net.Server | null = null; private clients: Set = new Set(); + private socketToClientId = new WeakMap(); + private clientIdToSocket = new Map(); /** * Connect as client or start as server @@ -239,6 +241,12 @@ export class UnixSocketTransport extends IpcTransport { socket.on('close', () => { this.clients.delete(socket); + // Clean up clientId mappings + const clientId = this.socketToClientId.get(socket); + if (clientId && this.clientIdToSocket.get(clientId) === socket) { + this.clientIdToSocket.delete(clientId); + } + this.socketToClientId.delete(socket); this.emit('clientDisconnected', socket); }); @@ -307,7 +315,18 @@ export class UnixSocketTransport extends IpcTransport { // Parse and emit the message with socket reference try { const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope; + + // Update clientId mapping + const clientId = message.headers?.clientId ?? + (message.type === '__register__' ? (message.payload as any)?.clientId : undefined); + if (clientId) { + this.socketToClientId.set(socket, clientId); + this.clientIdToSocket.set(clientId, socket); + } + + // Emit both events so IpcChannel can process it this.emit('clientMessage', message, socket); + this.emit('message', message); } catch (error: any) { this.emit('error', new Error(`Failed to parse message: ${error.message}`)); } @@ -415,27 +434,54 @@ export class UnixSocketTransport extends IpcTransport { } }); } else if (this.server && this.clients.size > 0) { - // Server mode - broadcast to all clients - const promises: Promise[] = []; + // Server mode - route by clientId if present, otherwise broadcast + const targetClientId = message.headers?.clientId; - for (const client of this.clients) { - promises.push(new Promise((resolve) => { - const success = client.write(frame, (error) => { - if (error) { - resolve(false); - } else { - resolve(true); + if (targetClientId && this.clientIdToSocket.has(targetClientId)) { + // Send to specific client + const targetSocket = this.clientIdToSocket.get(targetClientId)!; + if (targetSocket && !targetSocket.destroyed) { + return new Promise((resolve) => { + const success = targetSocket.write(frame, (error) => { + if (error) { + resolve(false); + } else { + resolve(true); + } + }); + + if (!success) { + targetSocket.once('drain', () => resolve(true)); } }); + } else { + // Socket is destroyed, remove from mappings + this.clientIdToSocket.delete(targetClientId); + return false; + } + } else { + // Broadcast to all clients (fallback for messages without specific target) + const promises: Promise[] = []; + + for (const client of this.clients) { + promises.push(new Promise((resolve) => { + const success = client.write(frame, (error) => { + if (error) { + resolve(false); + } else { + resolve(true); + } + }); - if (!success) { - client.once('drain', () => resolve(true)); - } - })); + if (!success) { + client.once('drain', () => resolve(true)); + } + })); + } + + const results = await Promise.all(promises); + return results.every(r => r); } - - const results = await Promise.all(promises); - return results.every(r => r); } return false; diff --git a/ts/index.ts b/ts/index.ts index 0191393..633e21c 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -29,20 +29,27 @@ export class SmartIpc { while (Date.now() - startTime < timeout) { try { - // Try to connect as a temporary client - const testClient = new IpcClient({ - id: `test-probe-${Date.now()}`, + // Create a temporary client with proper options + const testClient = SmartIpc.createClient({ + id: 'test-probe', socketPath: options.socketPath, - autoReconnect: false, - heartbeat: false + clientId: `probe-${process.pid}-${Date.now()}`, + heartbeat: false, + connectRetry: { + enabled: false // Don't retry, we're handling retries here + }, + registerTimeoutMs: 2000 // Short timeout for quick probing }); + // Try to connect and register with the server await testClient.connect(); + + // Success! Clean up and return await testClient.disconnect(); - return; // Server is ready + return; } catch (error) { // Server not ready yet, wait and retry - await new Promise(resolve => setTimeout(resolve, 100)); + await new Promise(resolve => setTimeout(resolve, 200)); } }