8 Commits

Author SHA1 Message Date
44770bf820 2.2.1
Some checks failed
Default (tags) / security (push) Successful in 27s
Default (tags) / test (push) Failing after 3m50s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2025-08-29 08:49:04 +00:00
6c77ca1e4c fix(tests): Remove redundant manual topic handlers from tests and rely on server built-in pub/sub 2025-08-29 08:49:04 +00:00
350b3f1359 2.2.0
Some checks failed
Default (tags) / security (push) Successful in 42s
Default (tags) / test (push) Failing after 3m50s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2025-08-29 08:48:38 +00:00
fa53dcfc4f feat(ipcclient): Add clientOnly mode to prevent clients from auto-starting servers and improve registration/reconnect behavior 2025-08-29 08:48:38 +00:00
fd3fc7518b 2.1.3 2025-08-28 20:12:40 +00:00
1b462e3a35 fix(classes.ipcchannel): Normalize heartbeatThrowOnTimeout option parsing and allow registering heartbeatTimeout via IpcChannel.on 2025-08-28 20:12:40 +00:00
4ed42945fc 2.1.2 2025-08-26 12:32:28 +00:00
a0638b5364 fix(core): Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements 2025-08-26 12:32:28 +00:00
11 changed files with 356 additions and 83 deletions

View File

@@ -1,5 +1,39 @@
# Changelog # Changelog
## 2025-08-29 - 2.2.1 - fix(tests)
Remove redundant manual topic handlers from tests and rely on server built-in pub/sub
- Removed manual server.onMessage('__subscribe__') and server.onMessage('__publish__') handlers from test/test.ts
- Tests now rely on the server's built-in publish/subscribe behavior: clients publish directly and subscribers receive messages
- Test code simplified without changing public API or runtime behavior
## 2025-08-29 - 2.2.0 - feat(ipcclient)
Add clientOnly mode to prevent clients from auto-starting servers and improve registration/reconnect behavior
- Introduce a clientOnly option on transports and clients, and support SMARTIPC_CLIENT_ONLY=1 env override to prevent a client from auto-starting a server when connect() encounters ECONNREFUSED/ENOENT.
- Update UnixSocketTransport/TcpTransport connect behavior: if clientOnly (or env override) is enabled, reject connect with a descriptive error instead of starting a server (preserves backward compatibility when disabled).
- Make SmartIpc.waitForServer use clientOnly probing to avoid accidental server creation during readiness checks.
- Refactor IpcClient registration flow: extract attemptRegistrationInternal, set didRegisterOnce flag, and automatically re-register on reconnects when previously registered.
- Add and update tests to cover clientOnly behavior, SMARTIPC_CLIENT_ONLY env enforcement, temporary socket paths and automatic cleanup, and other reliability improvements.
- Update README with a new 'Client-Only Mode' section documenting the option, env override, and examples.
## 2025-08-28 - 2.1.3 - fix(classes.ipcchannel)
Normalize heartbeatThrowOnTimeout option parsing and allow registering 'heartbeatTimeout' via IpcChannel.on
- Normalize heartbeatThrowOnTimeout to boolean (accepts 'true'/'false' strings and other truthy/falsey values) to be defensive for JS consumers
- Expose 'heartbeatTimeout' as a special channel event so handlers registered via IpcChannel.on('heartbeatTimeout', ...) will be called
## 2025-08-26 - 2.1.2 - fix(core)
Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements
- IpcChannel: add heartbeatInitialGracePeriod handling — delay heartbeat timeout checks until the grace period elapses and use a minimum check interval (>= 1000ms)
- IpcChannel: add heartbeatGraceTimer and ensure stopHeartbeat clears the grace timer to avoid repeated events
- IpcChannel / Client / Server: forward heartbeatTimeout events instead of only throwing when configured (heartbeatThrowOnTimeout = false) so consumers can handle timeouts via events
- IpcClient: include clientId in registration request headers to enable proper routing on the server/transport side
- UnixSocketTransport: track socket <-> clientId mappings, clean them up on socket close, and update mappings when __register__ or messages containing clientId are received
- UnixSocketTransport: route messages to a specific client when headers.clientId is present (fallback to broadcasting when no target is found), and emit both clientMessage and message for parsed client messages
- ts/index.waitForServer: use SmartIpc.createClient for probing, shorten probe register timeout, and use a slightly longer retry delay between probes for stability
## 2025-08-25 - 2.1.1 - fix(readme) ## 2025-08-25 - 2.1.1 - fix(readme)
Update README: expand docs, examples, server readiness, heartbeat, and testing utilities Update README: expand docs, examples, server readiness, heartbeat, and testing utilities
@@ -65,3 +99,10 @@ Initial release and a series of patch fixes to core components.
- 1.0.1: initial release. - 1.0.1: initial release.
- 1.0.2 → 1.0.7: a sequence of small core fixes and maintenance updates (repeated "fix(core): update" commits). - 1.0.2 → 1.0.7: a sequence of small core fixes and maintenance updates (repeated "fix(core): update" commits).
## 2025-08-29 - 2.1.4 - feat(transports)
Add client-only mode to prevent unintended server auto-start in Unix/NamedPipe transports; safer probing
- Add `clientOnly?: boolean` to transport options; when true (or `SMARTIPC_CLIENT_ONLY=1`), a client will fail fast on `ECONNREFUSED`/`ENOENT` instead of auto-starting a server.
- Update `SmartIpc.waitForServer()` to probe with `clientOnly: true` to avoid races during readiness checks.
- Extend tests to cover option and env override; update core test to use unique socket path and auto-cleanup.
- Docs: add README section for client-only mode.

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartipc", "name": "@push.rocks/smartipc",
"version": "2.1.1", "version": "2.2.1",
"private": false, "private": false,
"description": "A library for node inter process communication, providing an easy-to-use API for IPC.", "description": "A library for node inter process communication, providing an easy-to-use API for IPC.",
"exports": { "exports": {

View File

@@ -238,6 +238,33 @@ await client.connect({
}); });
``` ```
### 🛑 Client-Only Mode (No Auto-Start)
In some setups (CLI + long-running daemon), you want clients to fail fast when no server is available, rather than implicitly becoming the server. Enable client-only mode to prevent the “client becomes server” fallback for Unix domain sockets and Windows named pipes.
```typescript
// Strict client that never auto-starts a server on connect failure
const client = SmartIpc.createClient({
id: 'my-service',
socketPath: '/tmp/my-service.sock',
clientId: 'my-cli',
clientOnly: true, // NEW: disable auto-start fallback
connectRetry: { enabled: false } // optional: fail fast
});
try {
await client.connect();
} catch (err) {
// With clientOnly: true, errors become descriptive
// e.g. "Server not available (ENOENT); clientOnly prevents auto-start"
console.error(err.message);
}
```
- Default: `clientOnly` is `false` to preserve backward compatibility.
- Env override: set `SMARTIPC_CLIENT_ONLY=1` to enforce client-only behavior without code changes.
- Note: `SmartIpc.waitForServer()` internally uses `clientOnly: true` for safe probing.
### 💓 Graceful Heartbeat Monitoring ### 💓 Graceful Heartbeat Monitoring
Keep connections alive without crashing on timeouts: Keep connections alive without crashing on timeouts:

View File

@@ -192,6 +192,61 @@ tap.test('Client retry should work with delayed server', async () => {
await server.stop(); await server.stop();
}); });
// Test 7: clientOnly prevents client from auto-starting a server
tap.test('clientOnly should prevent auto-start and fail fast', async () => {
const uniqueSocketPath = path.join(os.tmpdir(), `smartipc-clientonly-${Date.now()}.sock`);
const client = smartipc.SmartIpc.createClient({
id: 'clientonly-test',
socketPath: uniqueSocketPath,
clientId: 'co-client-1',
clientOnly: true,
connectRetry: { enabled: false }
});
let failed = false;
try {
await client.connect();
} catch (err: any) {
failed = true;
expect(err.message).toContain('clientOnly prevents auto-start');
}
expect(failed).toBeTrue();
// Ensure no server-side socket was created
expect(fs.existsSync(uniqueSocketPath)).toBeFalse();
});
// Test 8: env SMARTIPC_CLIENT_ONLY enforces clientOnly behavior
tap.test('SMARTIPC_CLIENT_ONLY=1 should enforce clientOnly', async () => {
const uniqueSocketPath = path.join(os.tmpdir(), `smartipc-clientonly-env-${Date.now()}.sock`);
const prev = process.env.SMARTIPC_CLIENT_ONLY;
process.env.SMARTIPC_CLIENT_ONLY = '1';
const client = smartipc.SmartIpc.createClient({
id: 'clientonly-test-env',
socketPath: uniqueSocketPath,
clientId: 'co-client-2',
connectRetry: { enabled: false }
});
let failed = false;
try {
await client.connect();
} catch (err: any) {
failed = true;
expect(err.message).toContain('clientOnly prevents auto-start');
}
expect(failed).toBeTrue();
expect(fs.existsSync(uniqueSocketPath)).toBeFalse();
// restore env
if (prev === undefined) {
delete process.env.SMARTIPC_CLIENT_ONLY;
} else {
process.env.SMARTIPC_CLIENT_ONLY = prev;
}
});
// Cleanup // Cleanup
tap.test('Cleanup test socket', async () => { tap.test('Cleanup test socket', async () => {
try { try {

View File

@@ -2,6 +2,10 @@ import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartipc from '../ts/index.js'; import * as smartipc from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay'; import * as smartdelay from '@push.rocks/smartdelay';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as path from 'path';
import * as os from 'os';
const testSocketPath = path.join(os.tmpdir(), `test-smartipc-${Date.now()}.sock`);
let server: smartipc.IpcServer; let server: smartipc.IpcServer;
let client1: smartipc.IpcClient; let client1: smartipc.IpcClient;
@@ -11,12 +15,13 @@ let client2: smartipc.IpcClient;
tap.test('should create and start an IPC server', async () => { tap.test('should create and start an IPC server', async () => {
server = smartipc.SmartIpc.createServer({ server = smartipc.SmartIpc.createServer({
id: 'test-server', id: 'test-server',
socketPath: '/tmp/test-smartipc.sock', socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: true, heartbeat: true,
heartbeatInterval: 2000 heartbeatInterval: 2000
}); });
await server.start(); await server.start({ readyWhen: 'accepting' });
expect(server.getStats().isRunning).toBeTrue(); expect(server.getStats().isRunning).toBeTrue();
}); });
@@ -24,11 +29,12 @@ tap.test('should create and start an IPC server', async () => {
tap.test('should create and connect a client', async () => { tap.test('should create and connect a client', async () => {
client1 = smartipc.SmartIpc.createClient({ client1 = smartipc.SmartIpc.createClient({
id: 'test-server', id: 'test-server',
socketPath: '/tmp/test-smartipc.sock', socketPath: testSocketPath,
clientId: 'client-1', clientId: 'client-1',
metadata: { name: 'Test Client 1' }, metadata: { name: 'Test Client 1' },
autoReconnect: true, autoReconnect: true,
heartbeat: true heartbeat: true,
clientOnly: true
}); });
await client1.connect(); await client1.connect();
@@ -76,9 +82,10 @@ tap.test('should handle request/response pattern', async () => {
tap.test('should handle multiple clients', async () => { tap.test('should handle multiple clients', async () => {
client2 = smartipc.SmartIpc.createClient({ client2 = smartipc.SmartIpc.createClient({
id: 'test-server', id: 'test-server',
socketPath: '/tmp/test-smartipc.sock', socketPath: testSocketPath,
clientId: 'client-2', clientId: 'client-2',
metadata: { name: 'Test Client 2' } metadata: { name: 'Test Client 2' },
clientOnly: true
}); });
await client2.connect(); await client2.connect();
@@ -154,17 +161,6 @@ tap.test('should handle pub/sub pattern', async () => {
messageReceived.resolve(); messageReceived.resolve();
}); });
// Server handles the subscription
server.onMessage('__subscribe__', async (payload, clientId) => {
expect(payload.topic).toEqual('news');
});
// Server handles publishing
server.onMessage('__publish__', async (payload, clientId) => {
// Broadcast to all subscribers of the topic
await server.broadcast(`topic:${payload.topic}`, payload.payload);
});
// Client 2 publishes to the topic // Client 2 publishes to the topic
await client2.publish('news', { headline: 'Breaking news!' }); await client2.publish('news', { headline: 'Breaking news!' });

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartipc', name: '@push.rocks/smartipc',
version: '2.1.1', version: '2.2.1',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.' description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
} }

View File

@@ -49,6 +49,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
private reconnectTimer?: NodeJS.Timeout; private reconnectTimer?: NodeJS.Timeout;
private heartbeatTimer?: NodeJS.Timeout; private heartbeatTimer?: NodeJS.Timeout;
private heartbeatCheckTimer?: NodeJS.Timeout; private heartbeatCheckTimer?: NodeJS.Timeout;
private heartbeatGraceTimer?: NodeJS.Timeout;
private lastHeartbeat: number = Date.now(); private lastHeartbeat: number = Date.now();
private connectionStartTime: number = Date.now(); private connectionStartTime: number = Date.now();
private isReconnecting = false; private isReconnecting = false;
@@ -81,6 +82,18 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
...options ...options
}; };
// Normalize heartbeatThrowOnTimeout to boolean (defensive for JS consumers)
const throwOnTimeout = (this.options as any).heartbeatThrowOnTimeout;
if (throwOnTimeout !== undefined) {
if (throwOnTimeout === 'false') {
this.options.heartbeatThrowOnTimeout = false;
} else if (throwOnTimeout === 'true') {
this.options.heartbeatThrowOnTimeout = true;
} else if (typeof throwOnTimeout !== 'boolean') {
this.options.heartbeatThrowOnTimeout = Boolean(throwOnTimeout);
}
}
this.transport = createTransport(this.options); this.transport = createTransport(this.options);
this.setupTransportHandlers(); this.setupTransportHandlers();
} }
@@ -217,16 +230,27 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
}); });
}, this.options.heartbeatInterval!); }, this.options.heartbeatInterval!);
// Delay starting the check until after the grace period
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
if (gracePeriod > 0) {
// Use a timer to delay the first check
this.heartbeatGraceTimer = setTimeout(() => {
this.startHeartbeatCheck();
}, gracePeriod);
} else {
// No grace period, start checking immediately
this.startHeartbeatCheck();
}
}
/**
* Start heartbeat timeout checking (separated for grace period handling)
*/
private startHeartbeatCheck(): void {
// Check for heartbeat timeout // Check for heartbeat timeout
this.heartbeatCheckTimer = setInterval(() => { this.heartbeatCheckTimer = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
const timeSinceConnection = Date.now() - this.connectionStartTime;
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
// Skip timeout check during initial grace period
if (timeSinceConnection < gracePeriod) {
return;
}
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) { if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) {
const error = new Error('Heartbeat timeout'); const error = new Error('Heartbeat timeout');
@@ -238,9 +262,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
} else { } else {
// Emit heartbeatTimeout event instead of error // Emit heartbeatTimeout event instead of error
this.emit('heartbeatTimeout', error); this.emit('heartbeatTimeout', error);
// Clear timers to avoid repeated events
this.stopHeartbeat();
} }
} }
}, this.options.heartbeatTimeout! / 2); }, Math.max(1000, Math.floor(this.options.heartbeatTimeout! / 2)));
} }
/** /**
@@ -256,6 +282,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
clearInterval(this.heartbeatCheckTimer); clearInterval(this.heartbeatCheckTimer);
this.heartbeatCheckTimer = undefined; this.heartbeatCheckTimer = undefined;
} }
if (this.heartbeatGraceTimer) {
clearTimeout(this.heartbeatGraceTimer);
this.heartbeatGraceTimer = undefined;
}
} }
/** /**
@@ -430,7 +461,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
* Register a message handler * Register a message handler
*/ */
public on(event: string, handler: (payload: any) => any | Promise<any>): this { public on(event: string, handler: (payload: any) => any | Promise<any>): this {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain') { if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout') {
// Special handling for channel events // Special handling for channel events
super.on(event, handler); super.on(event, handler);
} else { } else {

View File

@@ -45,6 +45,7 @@ export class IpcClient extends plugins.EventEmitter {
private messageHandlers = new Map<string, (payload: any) => any | Promise<any>>(); private messageHandlers = new Map<string, (payload: any) => any | Promise<any>>();
private isConnected = false; private isConnected = false;
private clientId: string; private clientId: string;
private didRegisterOnce = false;
constructor(options: IIpcClientOptions) { constructor(options: IIpcClientOptions) {
super(); super();
@@ -66,27 +67,7 @@ export class IpcClient extends plugins.EventEmitter {
// Helper function to attempt registration // Helper function to attempt registration
const attemptRegistration = async (): Promise<void> => { const attemptRegistration = async (): Promise<void> => {
const registerTimeoutMs = this.options.registerTimeoutMs || 5000; await this.attemptRegistrationInternal();
try {
const response = await this.channel.request<any, any>(
'__register__',
{
clientId: this.clientId,
metadata: this.options.metadata
},
{ timeout: registerTimeoutMs }
);
if (!response.success) {
throw new Error(response.error || 'Registration failed');
}
this.isConnected = true;
this.emit('connect');
} catch (error) {
throw new Error(`Failed to register with server: ${error.message}`);
}
}; };
// Helper function to attempt connection with retry // Helper function to attempt connection with retry
@@ -167,6 +148,38 @@ export class IpcClient extends plugins.EventEmitter {
} }
} }
/**
* Attempt to register this client over the current channel connection.
* Sets connection flags and emits 'connect' on success.
*/
private async attemptRegistrationInternal(): Promise<void> {
const registerTimeoutMs = this.options.registerTimeoutMs || 5000;
try {
const response = await this.channel.request<any, any>(
'__register__',
{
clientId: this.clientId,
metadata: this.options.metadata
},
{
timeout: registerTimeoutMs,
headers: { clientId: this.clientId }
}
);
if (!response.success) {
throw new Error(response.error || 'Registration failed');
}
this.isConnected = true;
this.didRegisterOnce = true;
this.emit('connect');
} catch (error: any) {
throw new Error(`Failed to register with server: ${error.message}`);
}
}
/** /**
* Disconnect from the server * Disconnect from the server
*/ */
@@ -185,8 +198,16 @@ export class IpcClient extends plugins.EventEmitter {
*/ */
private setupChannelHandlers(): void { private setupChannelHandlers(): void {
// Forward channel events // Forward channel events
this.channel.on('connect', () => { this.channel.on('connect', async () => {
// Don't emit connect here, wait for successful registration // On reconnects, re-register automatically when we had connected before
if (this.didRegisterOnce && !this.isConnected) {
try {
await this.attemptRegistrationInternal();
} catch (error) {
this.emit('error', error);
}
}
// For initial connect(), registration is handled explicitly there
}); });
this.channel.on('disconnect', (reason) => { this.channel.on('disconnect', (reason) => {
@@ -194,10 +215,20 @@ export class IpcClient extends plugins.EventEmitter {
this.emit('disconnect', reason); this.emit('disconnect', reason);
}); });
this.channel.on('error', (error) => { this.channel.on('error', (error: any) => {
// If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event
if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) {
this.emit('heartbeatTimeout', error);
return;
}
this.emit('error', error); this.emit('error', error);
}); });
this.channel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error);
});
this.channel.on('reconnecting', (info) => { this.channel.on('reconnecting', (info) => {
this.emit('reconnecting', info); this.emit('reconnecting', info);
}); });

View File

@@ -200,6 +200,11 @@ export class IpcServer extends plugins.EventEmitter {
this.emit('error', error, 'server'); this.emit('error', error, 'server');
}); });
this.primaryChannel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, 'server');
});
// Connect the primary channel (will start as server) // Connect the primary channel (will start as server)
await this.primaryChannel.connect(); await this.primaryChannel.connect();
@@ -339,6 +344,19 @@ export class IpcServer extends plugins.EventEmitter {
} }
this.emit('error', error, actualClientId); this.emit('error', error, actualClientId);
}); });
channel.on('heartbeatTimeout', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, actualClientId);
});
} }
/** /**

View File

@@ -18,6 +18,12 @@ export interface IIpcMessageEnvelope<T = any> {
export interface IIpcTransportOptions { export interface IIpcTransportOptions {
/** Unique identifier for this transport */ /** Unique identifier for this transport */
id: string; id: string;
/**
* When true, a client transport will NOT auto-start a server when connect()
* encounters ECONNREFUSED/ENOENT. Useful for strict client/daemon setups.
* Default: false. Can also be overridden by env SMARTIPC_CLIENT_ONLY=1.
*/
clientOnly?: boolean;
/** Socket path for Unix domain sockets or pipe name for Windows */ /** Socket path for Unix domain sockets or pipe name for Windows */
socketPath?: string; socketPath?: string;
/** TCP host for network transport */ /** TCP host for network transport */
@@ -169,6 +175,8 @@ export class UnixSocketTransport extends IpcTransport {
private socket: plugins.net.Socket | null = null; private socket: plugins.net.Socket | null = null;
private server: plugins.net.Server | null = null; private server: plugins.net.Server | null = null;
private clients: Set<plugins.net.Socket> = new Set(); private clients: Set<plugins.net.Socket> = new Set();
private socketToClientId = new WeakMap<plugins.net.Socket, string>();
private clientIdToSocket = new Map<string, plugins.net.Socket>();
/** /**
* Connect as client or start as server * Connect as client or start as server
@@ -193,7 +201,21 @@ export class UnixSocketTransport extends IpcTransport {
this.socket.on('error', (error: any) => { this.socket.on('error', (error: any) => {
if (error.code === 'ECONNREFUSED' || error.code === 'ENOENT') { if (error.code === 'ECONNREFUSED' || error.code === 'ENOENT') {
// No server exists, we should become the server // Determine if we must NOT auto-start server
const envVal = process.env.SMARTIPC_CLIENT_ONLY;
const envClientOnly = !!envVal && (envVal === '1' || envVal === 'true' || envVal === 'TRUE');
const clientOnly = this.options.clientOnly === true || envClientOnly;
if (clientOnly) {
// Reject instead of starting a server to avoid races
const reason = error.code || 'UNKNOWN';
const err = new Error(`Server not available (${reason}); clientOnly prevents auto-start`);
(err as any).code = reason;
reject(err);
return;
}
// No server exists and clientOnly is false: become the server (back-compat)
this.socket = null; this.socket = null;
this.startServer(socketPath).then(resolve).catch(reject); this.startServer(socketPath).then(resolve).catch(reject);
} else { } else {
@@ -239,6 +261,12 @@ export class UnixSocketTransport extends IpcTransport {
socket.on('close', () => { socket.on('close', () => {
this.clients.delete(socket); this.clients.delete(socket);
// Clean up clientId mappings
const clientId = this.socketToClientId.get(socket);
if (clientId && this.clientIdToSocket.get(clientId) === socket) {
this.clientIdToSocket.delete(clientId);
}
this.socketToClientId.delete(socket);
this.emit('clientDisconnected', socket); this.emit('clientDisconnected', socket);
}); });
@@ -307,7 +335,18 @@ export class UnixSocketTransport extends IpcTransport {
// Parse and emit the message with socket reference // Parse and emit the message with socket reference
try { try {
const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope; const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope;
// Update clientId mapping
const clientId = message.headers?.clientId ??
(message.type === '__register__' ? (message.payload as any)?.clientId : undefined);
if (clientId) {
this.socketToClientId.set(socket, clientId);
this.clientIdToSocket.set(clientId, socket);
}
// Emit both events so IpcChannel can process it
this.emit('clientMessage', message, socket); this.emit('clientMessage', message, socket);
this.emit('message', message);
} catch (error: any) { } catch (error: any) {
this.emit('error', new Error(`Failed to parse message: ${error.message}`)); this.emit('error', new Error(`Failed to parse message: ${error.message}`));
} }
@@ -415,27 +454,54 @@ export class UnixSocketTransport extends IpcTransport {
} }
}); });
} else if (this.server && this.clients.size > 0) { } else if (this.server && this.clients.size > 0) {
// Server mode - broadcast to all clients // Server mode - route by clientId if present, otherwise broadcast
const promises: Promise<boolean>[] = []; const targetClientId = message.headers?.clientId;
for (const client of this.clients) { if (targetClientId && this.clientIdToSocket.has(targetClientId)) {
promises.push(new Promise((resolve) => { // Send to specific client
const success = client.write(frame, (error) => { const targetSocket = this.clientIdToSocket.get(targetClientId)!;
if (error) { if (targetSocket && !targetSocket.destroyed) {
resolve(false); return new Promise((resolve) => {
} else { const success = targetSocket.write(frame, (error) => {
resolve(true); if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
targetSocket.once('drain', () => resolve(true));
} }
}); });
} else {
// Socket is destroyed, remove from mappings
this.clientIdToSocket.delete(targetClientId);
return false;
}
} else {
// Broadcast to all clients (fallback for messages without specific target)
const promises: Promise<boolean>[] = [];
if (!success) { for (const client of this.clients) {
client.once('drain', () => resolve(true)); promises.push(new Promise((resolve) => {
} const success = client.write(frame, (error) => {
})); if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
client.once('drain', () => resolve(true));
}
}));
}
const results = await Promise.all(promises);
return results.every(r => r);
} }
const results = await Promise.all(promises);
return results.every(r => r);
} }
return false; return false;

View File

@@ -29,20 +29,28 @@ export class SmartIpc {
while (Date.now() - startTime < timeout) { while (Date.now() - startTime < timeout) {
try { try {
// Try to connect as a temporary client // Create a temporary client with proper options
const testClient = new IpcClient({ const testClient = SmartIpc.createClient({
id: `test-probe-${Date.now()}`, id: 'test-probe',
socketPath: options.socketPath, socketPath: options.socketPath,
autoReconnect: false, clientId: `probe-${process.pid}-${Date.now()}`,
heartbeat: false heartbeat: false,
clientOnly: true,
connectRetry: {
enabled: false // Don't retry, we're handling retries here
},
registerTimeoutMs: 2000 // Short timeout for quick probing
}); });
// Try to connect and register with the server
await testClient.connect(); await testClient.connect();
// Success! Clean up and return
await testClient.disconnect(); await testClient.disconnect();
return; // Server is ready return;
} catch (error) { } catch (error) {
// Server not ready yet, wait and retry // Server not ready yet, wait and retry
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 200));
} }
} }