fix(ipc): Propagate per-client disconnects, add proper routing for targeted messages, and remove unused node-ipc deps

This commit is contained in:
2025-08-29 17:02:50 +00:00
parent 44770bf820
commit 1c08df8e6a
7 changed files with 53 additions and 223 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartipc',
version: '2.2.1',
version: '2.2.2',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
}

View File

@@ -128,6 +128,13 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
this.handleMessage(message);
});
// Forward per-client disconnects from transports that support multi-client servers
// We re-emit a 'clientDisconnected' event with the clientId if known so higher layers can act.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this.transport as any).on?.('clientDisconnected', (_socket: any, clientId?: string) => {
this.emit('clientDisconnected', clientId);
});
this.transport.on('drain', () => {
this.emit('drain');
});
@@ -461,7 +468,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
* Register a message handler
*/
public on(event: string, handler: (payload: any) => any | Promise<any>): this {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout') {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout' || event === 'clientDisconnected') {
// Special handling for channel events
super.on(event, handler);
} else {
@@ -522,4 +529,4 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
}
};
}
}
}

View File

@@ -211,6 +211,17 @@ export class IpcServer extends plugins.EventEmitter {
this.isRunning = true;
this.startClientIdleCheck();
this.emit('start');
// Track individual client disconnects forwarded by the channel/transport
this.primaryChannel.on('clientDisconnected', (clientId?: string) => {
if (!clientId) return;
// Clean up any topic subscriptions and client map entry
this.cleanupClientSubscriptions(clientId);
if (this.clients.has(clientId)) {
this.clients.delete(clientId);
this.emit('clientDisconnect', clientId);
}
});
// Handle readiness based on options
if (options.readyWhen === 'accepting') {
@@ -375,7 +386,14 @@ export class IpcServer extends plugins.EventEmitter {
throw new Error(`Client ${clientId} not found`);
}
await client.channel.sendMessage(type, payload, headers);
// Ensure the target clientId is part of the headers so the transport
// can route the message to the correct socket instead of broadcasting.
const routedHeaders: Record<string, any> | undefined = {
...(headers || {}),
clientId,
};
await client.channel.sendMessage(type, payload, routedHeaders);
}
/**
@@ -400,13 +418,12 @@ export class IpcServer extends plugins.EventEmitter {
*/
public async broadcast(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
const promises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
for (const [clientId] of this.clients) {
promises.push(
client.channel.sendMessage(type, payload, headers)
.catch((error) => {
this.emit('error', error, clientId);
})
this.sendToClient(clientId, type, payload, headers).catch((error) => {
this.emit('error', error, clientId);
})
);
}
@@ -423,14 +440,13 @@ export class IpcServer extends plugins.EventEmitter {
headers?: Record<string, any>
): Promise<void> {
const promises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
if (filter(clientId, client.metadata)) {
promises.push(
client.channel.sendMessage(type, payload, headers)
.catch((error) => {
this.emit('error', error, clientId);
})
this.sendToClient(clientId, type, payload, headers).catch((error) => {
this.emit('error', error, clientId);
})
);
}
}
@@ -552,4 +568,4 @@ export class IpcServer extends plugins.EventEmitter {
public getIsReady(): boolean {
return this.isReady;
}
}
}

View File

@@ -267,7 +267,8 @@ export class UnixSocketTransport extends IpcTransport {
this.clientIdToSocket.delete(clientId);
}
this.socketToClientId.delete(socket);
this.emit('clientDisconnected', socket);
// Emit with clientId if known so higher layers can react
this.emit('clientDisconnected', socket, clientId);
});
socket.on('drain', () => {