fix(core): Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements

This commit is contained in:
2025-08-26 12:32:28 +00:00
parent 32f3c63fca
commit a0638b5364
7 changed files with 148 additions and 34 deletions

View File

@@ -1,5 +1,16 @@
# Changelog # Changelog
## 2025-08-26 - 2.1.2 - fix(core)
Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements
- IpcChannel: add heartbeatInitialGracePeriod handling — delay heartbeat timeout checks until the grace period elapses and use a minimum check interval (>= 1000ms)
- IpcChannel: add heartbeatGraceTimer and ensure stopHeartbeat clears the grace timer to avoid repeated events
- IpcChannel / Client / Server: forward heartbeatTimeout events instead of only throwing when configured (heartbeatThrowOnTimeout = false) so consumers can handle timeouts via events
- IpcClient: include clientId in registration request headers to enable proper routing on the server/transport side
- UnixSocketTransport: track socket <-> clientId mappings, clean them up on socket close, and update mappings when __register__ or messages containing clientId are received
- UnixSocketTransport: route messages to a specific client when headers.clientId is present (fallback to broadcasting when no target is found), and emit both clientMessage and message for parsed client messages
- ts/index.waitForServer: use SmartIpc.createClient for probing, shorten probe register timeout, and use a slightly longer retry delay between probes for stability
## 2025-08-25 - 2.1.1 - fix(readme) ## 2025-08-25 - 2.1.1 - fix(readme)
Update README: expand docs, examples, server readiness, heartbeat, and testing utilities Update README: expand docs, examples, server readiness, heartbeat, and testing utilities

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartipc', name: '@push.rocks/smartipc',
version: '2.1.1', version: '2.1.2',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.' description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
} }

View File

@@ -49,6 +49,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
private reconnectTimer?: NodeJS.Timeout; private reconnectTimer?: NodeJS.Timeout;
private heartbeatTimer?: NodeJS.Timeout; private heartbeatTimer?: NodeJS.Timeout;
private heartbeatCheckTimer?: NodeJS.Timeout; private heartbeatCheckTimer?: NodeJS.Timeout;
private heartbeatGraceTimer?: NodeJS.Timeout;
private lastHeartbeat: number = Date.now(); private lastHeartbeat: number = Date.now();
private connectionStartTime: number = Date.now(); private connectionStartTime: number = Date.now();
private isReconnecting = false; private isReconnecting = false;
@@ -217,16 +218,27 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
}); });
}, this.options.heartbeatInterval!); }, this.options.heartbeatInterval!);
// Delay starting the check until after the grace period
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
if (gracePeriod > 0) {
// Use a timer to delay the first check
this.heartbeatGraceTimer = setTimeout(() => {
this.startHeartbeatCheck();
}, gracePeriod);
} else {
// No grace period, start checking immediately
this.startHeartbeatCheck();
}
}
/**
* Start heartbeat timeout checking (separated for grace period handling)
*/
private startHeartbeatCheck(): void {
// Check for heartbeat timeout // Check for heartbeat timeout
this.heartbeatCheckTimer = setInterval(() => { this.heartbeatCheckTimer = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
const timeSinceConnection = Date.now() - this.connectionStartTime;
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
// Skip timeout check during initial grace period
if (timeSinceConnection < gracePeriod) {
return;
}
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) { if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) {
const error = new Error('Heartbeat timeout'); const error = new Error('Heartbeat timeout');
@@ -238,9 +250,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
} else { } else {
// Emit heartbeatTimeout event instead of error // Emit heartbeatTimeout event instead of error
this.emit('heartbeatTimeout', error); this.emit('heartbeatTimeout', error);
// Clear timers to avoid repeated events
this.stopHeartbeat();
} }
} }
}, this.options.heartbeatTimeout! / 2); }, Math.max(1000, Math.floor(this.options.heartbeatTimeout! / 2)));
} }
/** /**
@@ -256,6 +270,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
clearInterval(this.heartbeatCheckTimer); clearInterval(this.heartbeatCheckTimer);
this.heartbeatCheckTimer = undefined; this.heartbeatCheckTimer = undefined;
} }
if (this.heartbeatGraceTimer) {
clearTimeout(this.heartbeatGraceTimer);
this.heartbeatGraceTimer = undefined;
}
} }
/** /**

View File

@@ -75,7 +75,10 @@ export class IpcClient extends plugins.EventEmitter {
clientId: this.clientId, clientId: this.clientId,
metadata: this.options.metadata metadata: this.options.metadata
}, },
{ timeout: registerTimeoutMs } {
timeout: registerTimeoutMs,
headers: { clientId: this.clientId } // Include clientId in headers for proper routing
}
); );
if (!response.success) { if (!response.success) {
@@ -194,10 +197,20 @@ export class IpcClient extends plugins.EventEmitter {
this.emit('disconnect', reason); this.emit('disconnect', reason);
}); });
this.channel.on('error', (error) => { this.channel.on('error', (error: any) => {
// If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event
if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) {
this.emit('heartbeatTimeout', error);
return;
}
this.emit('error', error); this.emit('error', error);
}); });
this.channel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error);
});
this.channel.on('reconnecting', (info) => { this.channel.on('reconnecting', (info) => {
this.emit('reconnecting', info); this.emit('reconnecting', info);
}); });

View File

@@ -200,6 +200,11 @@ export class IpcServer extends plugins.EventEmitter {
this.emit('error', error, 'server'); this.emit('error', error, 'server');
}); });
this.primaryChannel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, 'server');
});
// Connect the primary channel (will start as server) // Connect the primary channel (will start as server)
await this.primaryChannel.connect(); await this.primaryChannel.connect();
@@ -339,6 +344,19 @@ export class IpcServer extends plugins.EventEmitter {
} }
this.emit('error', error, actualClientId); this.emit('error', error, actualClientId);
}); });
channel.on('heartbeatTimeout', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, actualClientId);
});
} }
/** /**

View File

@@ -169,6 +169,8 @@ export class UnixSocketTransport extends IpcTransport {
private socket: plugins.net.Socket | null = null; private socket: plugins.net.Socket | null = null;
private server: plugins.net.Server | null = null; private server: plugins.net.Server | null = null;
private clients: Set<plugins.net.Socket> = new Set(); private clients: Set<plugins.net.Socket> = new Set();
private socketToClientId = new WeakMap<plugins.net.Socket, string>();
private clientIdToSocket = new Map<string, plugins.net.Socket>();
/** /**
* Connect as client or start as server * Connect as client or start as server
@@ -239,6 +241,12 @@ export class UnixSocketTransport extends IpcTransport {
socket.on('close', () => { socket.on('close', () => {
this.clients.delete(socket); this.clients.delete(socket);
// Clean up clientId mappings
const clientId = this.socketToClientId.get(socket);
if (clientId && this.clientIdToSocket.get(clientId) === socket) {
this.clientIdToSocket.delete(clientId);
}
this.socketToClientId.delete(socket);
this.emit('clientDisconnected', socket); this.emit('clientDisconnected', socket);
}); });
@@ -307,7 +315,18 @@ export class UnixSocketTransport extends IpcTransport {
// Parse and emit the message with socket reference // Parse and emit the message with socket reference
try { try {
const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope; const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope;
// Update clientId mapping
const clientId = message.headers?.clientId ??
(message.type === '__register__' ? (message.payload as any)?.clientId : undefined);
if (clientId) {
this.socketToClientId.set(socket, clientId);
this.clientIdToSocket.set(clientId, socket);
}
// Emit both events so IpcChannel can process it
this.emit('clientMessage', message, socket); this.emit('clientMessage', message, socket);
this.emit('message', message);
} catch (error: any) { } catch (error: any) {
this.emit('error', new Error(`Failed to parse message: ${error.message}`)); this.emit('error', new Error(`Failed to parse message: ${error.message}`));
} }
@@ -415,27 +434,54 @@ export class UnixSocketTransport extends IpcTransport {
} }
}); });
} else if (this.server && this.clients.size > 0) { } else if (this.server && this.clients.size > 0) {
// Server mode - broadcast to all clients // Server mode - route by clientId if present, otherwise broadcast
const promises: Promise<boolean>[] = []; const targetClientId = message.headers?.clientId;
for (const client of this.clients) { if (targetClientId && this.clientIdToSocket.has(targetClientId)) {
promises.push(new Promise((resolve) => { // Send to specific client
const success = client.write(frame, (error) => { const targetSocket = this.clientIdToSocket.get(targetClientId)!;
if (error) { if (targetSocket && !targetSocket.destroyed) {
resolve(false); return new Promise((resolve) => {
} else { const success = targetSocket.write(frame, (error) => {
resolve(true); if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
targetSocket.once('drain', () => resolve(true));
} }
}); });
} else {
// Socket is destroyed, remove from mappings
this.clientIdToSocket.delete(targetClientId);
return false;
}
} else {
// Broadcast to all clients (fallback for messages without specific target)
const promises: Promise<boolean>[] = [];
for (const client of this.clients) {
promises.push(new Promise((resolve) => {
const success = client.write(frame, (error) => {
if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) { if (!success) {
client.once('drain', () => resolve(true)); client.once('drain', () => resolve(true));
} }
})); }));
}
const results = await Promise.all(promises);
return results.every(r => r);
} }
const results = await Promise.all(promises);
return results.every(r => r);
} }
return false; return false;

View File

@@ -29,20 +29,27 @@ export class SmartIpc {
while (Date.now() - startTime < timeout) { while (Date.now() - startTime < timeout) {
try { try {
// Try to connect as a temporary client // Create a temporary client with proper options
const testClient = new IpcClient({ const testClient = SmartIpc.createClient({
id: `test-probe-${Date.now()}`, id: 'test-probe',
socketPath: options.socketPath, socketPath: options.socketPath,
autoReconnect: false, clientId: `probe-${process.pid}-${Date.now()}`,
heartbeat: false heartbeat: false,
connectRetry: {
enabled: false // Don't retry, we're handling retries here
},
registerTimeoutMs: 2000 // Short timeout for quick probing
}); });
// Try to connect and register with the server
await testClient.connect(); await testClient.connect();
// Success! Clean up and return
await testClient.disconnect(); await testClient.disconnect();
return; // Server is ready return;
} catch (error) { } catch (error) {
// Server not ready yet, wait and retry // Server not ready yet, wait and retry
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 200));
} }
} }