4 Commits

8 changed files with 168 additions and 36 deletions

View File

@@ -1,5 +1,22 @@
# Changelog # Changelog
## 2025-08-28 - 2.1.3 - fix(classes.ipcchannel)
Normalize heartbeatThrowOnTimeout option parsing and allow registering 'heartbeatTimeout' via IpcChannel.on
- Normalize heartbeatThrowOnTimeout to boolean (accepts 'true'/'false' strings and other truthy/falsey values) to be defensive for JS consumers
- Expose 'heartbeatTimeout' as a special channel event so handlers registered via IpcChannel.on('heartbeatTimeout', ...) will be called
## 2025-08-26 - 2.1.2 - fix(core)
Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements
- IpcChannel: add heartbeatInitialGracePeriod handling — delay heartbeat timeout checks until the grace period elapses and use a minimum check interval (>= 1000ms)
- IpcChannel: add heartbeatGraceTimer and ensure stopHeartbeat clears the grace timer to avoid repeated events
- IpcChannel / Client / Server: forward heartbeatTimeout events instead of only throwing when configured (heartbeatThrowOnTimeout = false) so consumers can handle timeouts via events
- IpcClient: include clientId in registration request headers to enable proper routing on the server/transport side
- UnixSocketTransport: track socket <-> clientId mappings, clean them up on socket close, and update mappings when __register__ or messages containing clientId are received
- UnixSocketTransport: route messages to a specific client when headers.clientId is present (fallback to broadcasting when no target is found), and emit both clientMessage and message for parsed client messages
- ts/index.waitForServer: use SmartIpc.createClient for probing, shorten probe register timeout, and use a slightly longer retry delay between probes for stability
## 2025-08-25 - 2.1.1 - fix(readme) ## 2025-08-25 - 2.1.1 - fix(readme)
Update README: expand docs, examples, server readiness, heartbeat, and testing utilities Update README: expand docs, examples, server readiness, heartbeat, and testing utilities

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartipc", "name": "@push.rocks/smartipc",
"version": "2.1.1", "version": "2.1.3",
"private": false, "private": false,
"description": "A library for node inter process communication, providing an easy-to-use API for IPC.", "description": "A library for node inter process communication, providing an easy-to-use API for IPC.",
"exports": { "exports": {

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartipc', name: '@push.rocks/smartipc',
version: '2.1.1', version: '2.1.3',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.' description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
} }

View File

@@ -49,6 +49,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
private reconnectTimer?: NodeJS.Timeout; private reconnectTimer?: NodeJS.Timeout;
private heartbeatTimer?: NodeJS.Timeout; private heartbeatTimer?: NodeJS.Timeout;
private heartbeatCheckTimer?: NodeJS.Timeout; private heartbeatCheckTimer?: NodeJS.Timeout;
private heartbeatGraceTimer?: NodeJS.Timeout;
private lastHeartbeat: number = Date.now(); private lastHeartbeat: number = Date.now();
private connectionStartTime: number = Date.now(); private connectionStartTime: number = Date.now();
private isReconnecting = false; private isReconnecting = false;
@@ -81,6 +82,18 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
...options ...options
}; };
// Normalize heartbeatThrowOnTimeout to boolean (defensive for JS consumers)
const throwOnTimeout = (this.options as any).heartbeatThrowOnTimeout;
if (throwOnTimeout !== undefined) {
if (throwOnTimeout === 'false') {
this.options.heartbeatThrowOnTimeout = false;
} else if (throwOnTimeout === 'true') {
this.options.heartbeatThrowOnTimeout = true;
} else if (typeof throwOnTimeout !== 'boolean') {
this.options.heartbeatThrowOnTimeout = Boolean(throwOnTimeout);
}
}
this.transport = createTransport(this.options); this.transport = createTransport(this.options);
this.setupTransportHandlers(); this.setupTransportHandlers();
} }
@@ -217,16 +230,27 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
}); });
}, this.options.heartbeatInterval!); }, this.options.heartbeatInterval!);
// Delay starting the check until after the grace period
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
if (gracePeriod > 0) {
// Use a timer to delay the first check
this.heartbeatGraceTimer = setTimeout(() => {
this.startHeartbeatCheck();
}, gracePeriod);
} else {
// No grace period, start checking immediately
this.startHeartbeatCheck();
}
}
/**
* Start heartbeat timeout checking (separated for grace period handling)
*/
private startHeartbeatCheck(): void {
// Check for heartbeat timeout // Check for heartbeat timeout
this.heartbeatCheckTimer = setInterval(() => { this.heartbeatCheckTimer = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
const timeSinceConnection = Date.now() - this.connectionStartTime;
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
// Skip timeout check during initial grace period
if (timeSinceConnection < gracePeriod) {
return;
}
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) { if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) {
const error = new Error('Heartbeat timeout'); const error = new Error('Heartbeat timeout');
@@ -238,9 +262,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
} else { } else {
// Emit heartbeatTimeout event instead of error // Emit heartbeatTimeout event instead of error
this.emit('heartbeatTimeout', error); this.emit('heartbeatTimeout', error);
// Clear timers to avoid repeated events
this.stopHeartbeat();
} }
} }
}, this.options.heartbeatTimeout! / 2); }, Math.max(1000, Math.floor(this.options.heartbeatTimeout! / 2)));
} }
/** /**
@@ -256,6 +282,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
clearInterval(this.heartbeatCheckTimer); clearInterval(this.heartbeatCheckTimer);
this.heartbeatCheckTimer = undefined; this.heartbeatCheckTimer = undefined;
} }
if (this.heartbeatGraceTimer) {
clearTimeout(this.heartbeatGraceTimer);
this.heartbeatGraceTimer = undefined;
}
} }
/** /**
@@ -430,7 +461,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
* Register a message handler * Register a message handler
*/ */
public on(event: string, handler: (payload: any) => any | Promise<any>): this { public on(event: string, handler: (payload: any) => any | Promise<any>): this {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain') { if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout') {
// Special handling for channel events // Special handling for channel events
super.on(event, handler); super.on(event, handler);
} else { } else {

View File

@@ -75,7 +75,10 @@ export class IpcClient extends plugins.EventEmitter {
clientId: this.clientId, clientId: this.clientId,
metadata: this.options.metadata metadata: this.options.metadata
}, },
{ timeout: registerTimeoutMs } {
timeout: registerTimeoutMs,
headers: { clientId: this.clientId } // Include clientId in headers for proper routing
}
); );
if (!response.success) { if (!response.success) {
@@ -194,10 +197,20 @@ export class IpcClient extends plugins.EventEmitter {
this.emit('disconnect', reason); this.emit('disconnect', reason);
}); });
this.channel.on('error', (error) => { this.channel.on('error', (error: any) => {
// If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event
if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) {
this.emit('heartbeatTimeout', error);
return;
}
this.emit('error', error); this.emit('error', error);
}); });
this.channel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error);
});
this.channel.on('reconnecting', (info) => { this.channel.on('reconnecting', (info) => {
this.emit('reconnecting', info); this.emit('reconnecting', info);
}); });

View File

@@ -200,6 +200,11 @@ export class IpcServer extends plugins.EventEmitter {
this.emit('error', error, 'server'); this.emit('error', error, 'server');
}); });
this.primaryChannel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, 'server');
});
// Connect the primary channel (will start as server) // Connect the primary channel (will start as server)
await this.primaryChannel.connect(); await this.primaryChannel.connect();
@@ -339,6 +344,19 @@ export class IpcServer extends plugins.EventEmitter {
} }
this.emit('error', error, actualClientId); this.emit('error', error, actualClientId);
}); });
channel.on('heartbeatTimeout', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, actualClientId);
});
} }
/** /**

View File

@@ -169,6 +169,8 @@ export class UnixSocketTransport extends IpcTransport {
private socket: plugins.net.Socket | null = null; private socket: plugins.net.Socket | null = null;
private server: plugins.net.Server | null = null; private server: plugins.net.Server | null = null;
private clients: Set<plugins.net.Socket> = new Set(); private clients: Set<plugins.net.Socket> = new Set();
private socketToClientId = new WeakMap<plugins.net.Socket, string>();
private clientIdToSocket = new Map<string, plugins.net.Socket>();
/** /**
* Connect as client or start as server * Connect as client or start as server
@@ -239,6 +241,12 @@ export class UnixSocketTransport extends IpcTransport {
socket.on('close', () => { socket.on('close', () => {
this.clients.delete(socket); this.clients.delete(socket);
// Clean up clientId mappings
const clientId = this.socketToClientId.get(socket);
if (clientId && this.clientIdToSocket.get(clientId) === socket) {
this.clientIdToSocket.delete(clientId);
}
this.socketToClientId.delete(socket);
this.emit('clientDisconnected', socket); this.emit('clientDisconnected', socket);
}); });
@@ -307,7 +315,18 @@ export class UnixSocketTransport extends IpcTransport {
// Parse and emit the message with socket reference // Parse and emit the message with socket reference
try { try {
const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope; const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope;
// Update clientId mapping
const clientId = message.headers?.clientId ??
(message.type === '__register__' ? (message.payload as any)?.clientId : undefined);
if (clientId) {
this.socketToClientId.set(socket, clientId);
this.clientIdToSocket.set(clientId, socket);
}
// Emit both events so IpcChannel can process it
this.emit('clientMessage', message, socket); this.emit('clientMessage', message, socket);
this.emit('message', message);
} catch (error: any) { } catch (error: any) {
this.emit('error', new Error(`Failed to parse message: ${error.message}`)); this.emit('error', new Error(`Failed to parse message: ${error.message}`));
} }
@@ -415,27 +434,54 @@ export class UnixSocketTransport extends IpcTransport {
} }
}); });
} else if (this.server && this.clients.size > 0) { } else if (this.server && this.clients.size > 0) {
// Server mode - broadcast to all clients // Server mode - route by clientId if present, otherwise broadcast
const promises: Promise<boolean>[] = []; const targetClientId = message.headers?.clientId;
for (const client of this.clients) { if (targetClientId && this.clientIdToSocket.has(targetClientId)) {
promises.push(new Promise((resolve) => { // Send to specific client
const success = client.write(frame, (error) => { const targetSocket = this.clientIdToSocket.get(targetClientId)!;
if (error) { if (targetSocket && !targetSocket.destroyed) {
resolve(false); return new Promise((resolve) => {
} else { const success = targetSocket.write(frame, (error) => {
resolve(true); if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
targetSocket.once('drain', () => resolve(true));
} }
}); });
} else {
// Socket is destroyed, remove from mappings
this.clientIdToSocket.delete(targetClientId);
return false;
}
} else {
// Broadcast to all clients (fallback for messages without specific target)
const promises: Promise<boolean>[] = [];
if (!success) { for (const client of this.clients) {
client.once('drain', () => resolve(true)); promises.push(new Promise((resolve) => {
} const success = client.write(frame, (error) => {
})); if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
client.once('drain', () => resolve(true));
}
}));
}
const results = await Promise.all(promises);
return results.every(r => r);
} }
const results = await Promise.all(promises);
return results.every(r => r);
} }
return false; return false;

View File

@@ -29,20 +29,27 @@ export class SmartIpc {
while (Date.now() - startTime < timeout) { while (Date.now() - startTime < timeout) {
try { try {
// Try to connect as a temporary client // Create a temporary client with proper options
const testClient = new IpcClient({ const testClient = SmartIpc.createClient({
id: `test-probe-${Date.now()}`, id: 'test-probe',
socketPath: options.socketPath, socketPath: options.socketPath,
autoReconnect: false, clientId: `probe-${process.pid}-${Date.now()}`,
heartbeat: false heartbeat: false,
connectRetry: {
enabled: false // Don't retry, we're handling retries here
},
registerTimeoutMs: 2000 // Short timeout for quick probing
}); });
// Try to connect and register with the server
await testClient.connect(); await testClient.connect();
// Success! Clean up and return
await testClient.disconnect(); await testClient.disconnect();
return; // Server is ready return;
} catch (error) { } catch (error) {
// Server not ready yet, wait and retry // Server not ready yet, wait and retry
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 200));
} }
} }