8 Commits

11 changed files with 1321 additions and 310 deletions

View File

@@ -1,5 +1,43 @@
# Changelog
## 2025-08-28 - 2.1.3 - fix(classes.ipcchannel)
Normalize heartbeatThrowOnTimeout option parsing and allow registering 'heartbeatTimeout' via IpcChannel.on
- Normalize heartbeatThrowOnTimeout to boolean (accepts 'true'/'false' strings and other truthy/falsey values) to be defensive for JS consumers
- Expose 'heartbeatTimeout' as a special channel event so handlers registered via IpcChannel.on('heartbeatTimeout', ...) will be called
## 2025-08-26 - 2.1.2 - fix(core)
Improve heartbeat handling and transport routing; forward heartbeat timeout events; include clientId routing and probe improvements
- IpcChannel: add heartbeatInitialGracePeriod handling — delay heartbeat timeout checks until the grace period elapses and use a minimum check interval (>= 1000ms)
- IpcChannel: add heartbeatGraceTimer and ensure stopHeartbeat clears the grace timer to avoid repeated events
- IpcChannel / Client / Server: forward heartbeatTimeout events instead of only throwing when configured (heartbeatThrowOnTimeout = false) so consumers can handle timeouts via events
- IpcClient: include clientId in registration request headers to enable proper routing on the server/transport side
- UnixSocketTransport: track socket <-> clientId mappings, clean them up on socket close, and update mappings when __register__ or messages containing clientId are received
- UnixSocketTransport: route messages to a specific client when headers.clientId is present (fallback to broadcasting when no target is found), and emit both clientMessage and message for parsed client messages
- ts/index.waitForServer: use SmartIpc.createClient for probing, shorten probe register timeout, and use a slightly longer retry delay between probes for stability
## 2025-08-25 - 2.1.1 - fix(readme)
Update README: expand docs, examples, server readiness, heartbeat, and testing utilities
- Rewrite introduction and overall tone to emphasize zero-dependency, reliability, and TypeScript support
- Replace several Quick Start examples to use socketPath and show autoCleanupSocketFile usage
- Add Server readiness detection docs and SmartIpc.waitForServer example
- Document smart connection retry options (connectRetry) and registerTimeoutMs usage
- Clarify heartbeat configuration and add heartbeatThrowOnTimeout option to emit events instead of throwing
- Add sections for automatic socket cleanup, broadcasting, testing utilities (waitForServer, spawnAndConnect), and metrics
- Various formatting and copy improvements throughout README
## 2025-08-25 - 2.1.0 - feat(core)
Add heartbeat grace/timeout options, client retry/wait-for-ready, server readiness and socket cleanup, transport socket options, helper utilities, and tests
- IpcChannel: add heartbeatInitialGracePeriodMs and heartbeatThrowOnTimeout; emit 'heartbeatTimeout' event when configured instead of throwing and disconnecting immediately.
- IpcClient: add connectRetry configuration, registerTimeoutMs, waitForReady option and robust connect logic with exponential backoff and total timeout handling.
- IpcServer: add start option readyWhen ('accepting'), isReady/getIsReady API, autoCleanupSocketFile and socketMode support for managing stale socket files and permissions.
- Transports: support autoCleanupSocketFile and socketMode (cleanup stale socket files and set socket permissions where applicable).
- SmartIpc: add waitForServer helper to wait until a server is ready and spawnAndConnect helper to spawn a server process and connect a client.
- Tests: add comprehensive tests (test.improvements.ts and test.reliability.ts) covering readiness, socket cleanup, retries, heartbeat behavior, race conditions, multiple clients, and server restart scenarios.
## 2025-08-25 - 2.0.3 - fix(ipc)
Patch release prep: bump patch version and release minor fixes

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartipc",
"version": "2.0.3",
"version": "2.1.3",
"private": false,
"description": "A library for node inter process communication, providing an easy-to-use API for IPC.",
"exports": {

632
readme.md
View File

@@ -1,34 +1,33 @@
# @push.rocks/smartipc 🚀
**Lightning-fast, type-safe IPC for modern Node.js applications**
**Rock-solid IPC for Node.js with zero dependencies**
[![npm version](https://img.shields.io/npm/v/@push.rocks/smartipc.svg)](https://www.npmjs.com/package/@push.rocks/smartipc)
[![TypeScript](https://img.shields.io/badge/TypeScript-5.x-blue.svg)](https://www.typescriptlang.org/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](./license)
SmartIPC is a production-grade Inter-Process Communication library that brings enterprise-level messaging patterns to Node.js. Built with TypeScript from the ground up, it offers zero-dependency native IPC with automatic reconnection, type-safe messaging, and built-in observability.
SmartIPC delivers bulletproof Inter-Process Communication for Node.js applications. Built for real-world production use, it handles all the edge cases that make IPC tricky - automatic reconnection, race conditions, heartbeat monitoring, and clean shutdowns. All with **zero external dependencies** and full TypeScript support.
## Why SmartIPC?
## 🎯 Why SmartIPC?
- **🎯 Zero External Dependencies** - Pure Node.js implementation using native `net` module
- **🔒 Type-Safe** - Full TypeScript support with generics for compile-time safety
- **🔄 Auto-Reconnect** - Built-in exponential backoff and circuit breaker patterns
- **📊 Observable** - Real-time metrics and connection tracking
- **⚡ High Performance** - Length-prefixed framing, backpressure handling, and optimized buffers
- **🎭 Multiple Patterns** - Request/Response, Pub/Sub, and Fire-and-Forget messaging
- **🛡️ Production Ready** - Message size limits, heartbeat monitoring, and graceful shutdown
- **Zero Dependencies** - Pure Node.js implementation using native modules
- **Battle-tested Reliability** - Automatic reconnection, graceful degradation, and timeout handling
- **Type-Safe** - Full TypeScript support with generics for compile-time safety
- **CI/Test Ready** - Built-in helpers and race condition prevention for testing
- **Observable** - Real-time metrics, connection tracking, and health monitoring
- **Multiple Patterns** - Request/Response, Pub/Sub, and Fire-and-Forget messaging
## Installation
## 📦 Installation
```bash
npm install @push.rocks/smartipc
# or
pnpm add @push.rocks/smartipc
# or
npm install @push.rocks/smartipc
yarn add @push.rocks/smartipc
```
## Quick Start
### Simple TCP Server & Client
## 🚀 Quick Start
```typescript
import { SmartIpc } from '@push.rocks/smartipc';
@@ -36,34 +35,42 @@ import { SmartIpc } from '@push.rocks/smartipc';
// Create a server
const server = SmartIpc.createServer({
id: 'my-service',
host: 'localhost',
port: 9876
socketPath: '/tmp/my-service.sock',
autoCleanupSocketFile: true // Clean up stale sockets automatically
});
// Handle incoming messages
server.onMessage('hello', async (data, clientId) => {
console.log(`Client ${clientId} says:`, data);
return { response: 'Hello back!' };
server.onMessage('greet', async (data, clientId) => {
console.log(`Client ${clientId} says:`, data.message);
return { response: `Hello ${data.name}!` };
});
await server.start();
// Start the server
await server.start({ readyWhen: 'accepting' }); // Wait until fully ready
console.log('Server is ready to accept connections! ✨');
// Create a client
const client = SmartIpc.createClient({
id: 'my-service',
host: 'localhost',
port: 9876,
clientId: 'client-1'
socketPath: '/tmp/my-service.sock',
connectRetry: {
enabled: true,
maxAttempts: 10
}
});
// Connect with automatic retry
await client.connect();
// Send a message and get response
const response = await client.request('hello', { message: 'Hi server!' });
console.log('Server responded:', response);
// Send a request and get a response
const response = await client.request('greet', {
name: 'World',
message: 'Hi there!'
});
console.log('Server said:', response.response); // "Hello World!"
```
## Core Concepts
## 🎮 Core Concepts
### Transport Types
@@ -84,350 +91,396 @@ const unixServer = SmartIpc.createServer({
});
// Windows Named Pipe (Windows optimal)
const pipeServer = SmartIpc.createServer({
// Automatically used on Windows when socketPath is provided
const windowsServer = SmartIpc.createServer({
id: 'pipe-service',
pipeName: 'my-app-pipe'
socketPath: '\\\\.\\pipe\\my-app-pipe'
});
```
### Message Patterns
#### 🔥 Fire and Forget
Fast, one-way messaging when you don't need a response:
Send messages without waiting for a response:
```typescript
// Server
server.onMessage('log', (data, clientId) => {
console.log(`[${clientId}]:`, data.message);
// No return value needed
console.log(`[${clientId}] ${data.level}:`, data.message);
// No return needed
});
// Client
await client.sendMessage('log', {
level: 'info',
message: 'User logged in',
timestamp: Date.now()
});
```
#### 📞 Request/Response
RPC-style communication with timeouts and type safety:
RPC-style communication with type safety:
```typescript
// Server - Define your handler with types
interface CalculateRequest {
operation: 'add' | 'multiply';
values: number[];
interface UserRequest {
userId: string;
fields?: string[];
}
interface CalculateResponse {
result: number;
computedAt: number;
interface UserResponse {
id: string;
name: string;
email?: string;
createdAt: number;
}
server.onMessage<CalculateRequest, CalculateResponse>('calculate', async (data) => {
const result = data.operation === 'add'
? data.values.reduce((a, b) => a + b, 0)
: data.values.reduce((a, b) => a * b, 1);
// Server
server.onMessage<UserRequest, UserResponse>('getUser', async (data) => {
const user = await db.getUser(data.userId);
return {
result,
computedAt: Date.now()
id: user.id,
name: user.name,
email: data.fields?.includes('email') ? user.email : undefined,
createdAt: user.createdAt
};
});
// Client - Type-safe request
const response = await client.request<CalculateRequest, CalculateResponse>(
'calculate',
{ operation: 'add', values: [1, 2, 3, 4, 5] },
// Client - with timeout
const user = await client.request<UserRequest, UserResponse>(
'getUser',
{ userId: '123', fields: ['email'] },
{ timeout: 5000 }
);
console.log(`Sum is ${response.result}`);
```
#### 📢 Pub/Sub Pattern
Topic-based message broadcasting:
```typescript
// Server automatically handles subscriptions
const publisher = SmartIpc.createClient({
id: 'events-service',
clientId: 'publisher'
});
// Subscribers
const subscriber1 = SmartIpc.createClient({
id: 'events-service',
clientId: 'subscriber-1'
socketPath: '/tmp/events.sock'
});
const subscriber2 = SmartIpc.createClient({
id: 'events-service',
clientId: 'subscriber-2'
});
// Subscribe to topics
await subscriber1.connect();
await subscriber1.subscribe('user.login', (data) => {
console.log('User logged in:', data);
});
await subscriber2.subscribe('user.*', (data) => {
console.log('User event:', data);
// Publisher
const publisher = SmartIpc.createClient({
id: 'events-service',
socketPath: '/tmp/events.sock'
});
// Publish events
await publisher.connect();
await publisher.publish('user.login', {
userId: '123',
ip: '192.168.1.1',
timestamp: Date.now()
});
```
## Advanced Features
## 💪 Advanced Features
### 🔄 Auto-Reconnection with Exponential Backoff
### 🏁 Server Readiness Detection
Clients automatically reconnect on connection loss:
Eliminate race conditions in tests and production:
```typescript
const server = SmartIpc.createServer({
id: 'my-service',
socketPath: '/tmp/my-service.sock',
autoCleanupSocketFile: true
});
// Option 1: Wait for full readiness
await server.start({ readyWhen: 'accepting' });
// Server is now FULLY ready to accept connections
// Option 2: Use ready event
server.on('ready', () => {
console.log('Server is ready!');
startClients();
});
await server.start();
// Option 3: Check readiness state
if (server.getIsReady()) {
console.log('Ready to rock! 🎸');
}
```
### 🔄 Smart Connection Retry
Never lose messages due to temporary connection issues:
```typescript
const client = SmartIpc.createClient({
id: 'resilient-service',
clientId: 'auto-reconnect-client',
reconnect: {
id: 'resilient-client',
socketPath: '/tmp/service.sock',
connectRetry: {
enabled: true,
initialDelay: 1000, // Start with 1 second
maxDelay: 30000, // Cap at 30 seconds
factor: 2, // Double each time
maxAttempts: Infinity // Keep trying forever
}
initialDelay: 100, // Start with 100ms
maxDelay: 1500, // Cap at 1.5 seconds
maxAttempts: 20, // Try 20 times
totalTimeout: 15000 // Give up after 15 seconds total
},
registerTimeoutMs: 8000 // Registration handshake timeout
});
// Monitor connection state
client.on('connected', () => console.log('Connected! 🟢'));
client.on('disconnected', () => console.log('Connection lost! 🔴'));
client.on('reconnecting', (attempt) => console.log(`Reconnecting... Attempt ${attempt} 🟡`));
// Will retry automatically if server isn't ready yet
await client.connect({
waitForReady: true, // Wait for server to exist
waitTimeout: 10000 // Wait up to 10 seconds
});
```
### 💓 Heartbeat Monitoring
### 💓 Graceful Heartbeat Monitoring
Keep connections alive and detect failures quickly:
Keep connections alive without crashing on timeouts:
```typescript
const server = SmartIpc.createServer({
id: 'monitored-service',
heartbeat: {
enabled: true,
interval: 5000, // Send heartbeat every 5 seconds
timeout: 15000 // Consider dead after 15 seconds
}
socketPath: '/tmp/monitored.sock',
heartbeat: true,
heartbeatInterval: 3000,
heartbeatTimeout: 10000,
heartbeatInitialGracePeriodMs: 5000, // Grace period for startup
heartbeatThrowOnTimeout: false // Emit event instead of throwing
});
// Clients automatically respond to heartbeats
server.on('heartbeatTimeout', (clientId) => {
console.log(`Client ${clientId} heartbeat timeout - will handle gracefully`);
});
// Client configuration
const client = SmartIpc.createClient({
id: 'monitored-service',
clientId: 'heartbeat-client',
heartbeat: true // Enable heartbeat responses
socketPath: '/tmp/monitored.sock',
heartbeat: true,
heartbeatInterval: 3000,
heartbeatTimeout: 10000,
heartbeatInitialGracePeriodMs: 5000,
heartbeatThrowOnTimeout: false
});
client.on('heartbeatTimeout', () => {
console.log('Heartbeat timeout detected, reconnecting...');
// Handle reconnection logic
});
```
### 📊 Real-time Metrics & Observability
### 🧹 Automatic Socket Cleanup
Track performance and connection health:
Never worry about stale socket files:
```typescript
// Server metrics
const server = SmartIpc.createServer({
id: 'clean-service',
socketPath: '/tmp/service.sock',
autoCleanupSocketFile: true, // Remove stale socket on start
socketMode: 0o600 // Set socket permissions (Unix only)
});
// Socket file will be cleaned up automatically on start
await server.start();
```
### 📊 Real-time Metrics
Monitor your IPC performance:
```typescript
// Server stats
const serverStats = server.getStats();
console.log({
isRunning: serverStats.isRunning,
connectedClients: serverStats.connectedClients,
totalConnections: serverStats.totalConnections,
uptime: serverStats.uptime,
metrics: {
messagesSent: serverStats.metrics.messagesSent,
messagesReceived: serverStats.metrics.messagesReceived,
bytesSent: serverStats.metrics.bytesSent,
bytesReceived: serverStats.metrics.bytesReceived,
errors: serverStats.metrics.errors
}
});
// Client metrics
// Client stats
const clientStats = client.getStats();
console.log({
connected: clientStats.connected,
reconnectAttempts: clientStats.reconnectAttempts,
lastActivity: clientStats.lastActivity,
metrics: clientStats.metrics
});
// Track specific clients on server
const clientInfo = server.getClientInfo('client-1');
// Get specific client info
const clientInfo = server.getClientInfo('client-123');
console.log({
clientId: clientInfo.clientId,
metadata: clientInfo.metadata,
connectedAt: clientInfo.connectedAt,
lastActivity: clientInfo.lastActivity,
subscriptions: clientInfo.subscriptions
connectedAt: new Date(clientInfo.connectedAt),
lastActivity: new Date(clientInfo.lastActivity),
metadata: clientInfo.metadata
});
```
### 🛡️ Security & Limits
### 🎯 Broadcasting
Protect against malicious or misbehaving clients:
```typescript
const secureServer = SmartIpc.createServer({
id: 'secure-service',
maxMessageSize: 10 * 1024 * 1024, // 10MB max message size
maxConnections: 100, // Limit concurrent connections
connectionTimeout: 60000, // Drop idle connections after 1 minute
// Authentication (coming soon)
auth: {
required: true,
validator: async (token) => {
// Validate auth token
return validateToken(token);
}
}
});
// Rate limiting per client
secureServer.use(rateLimitMiddleware({
windowMs: 60000, // 1 minute window
max: 100 // 100 requests per window
}));
```
### 🎯 Broadcast to Specific Clients
Send targeted messages:
Send messages to multiple clients:
```typescript
// Broadcast to all connected clients
server.broadcast('system-alert', {
message: 'Maintenance in 5 minutes'
await server.broadcast('announcement', {
message: 'Server will restart in 5 minutes',
severity: 'warning'
});
// Send to specific client
server.sendToClient('client-1', 'personal-message', {
content: 'This is just for you'
});
// Send to specific clients
await server.broadcastTo(
['client-1', 'client-2'],
'private-message',
{ content: 'This is just for you two' }
);
// Send to multiple specific clients
server.sendToClients(['client-1', 'client-2'], 'group-message', {
content: 'Group notification'
// Send to one client
await server.sendToClient('client-1', 'direct', {
data: 'Personal message'
});
// Get all connected client IDs
const clients = server.getConnectedClients();
console.log('Connected clients:', clients);
```
## Error Handling
## 🧪 Testing Utilities
Comprehensive error handling with typed errors:
SmartIPC includes powerful helpers for testing:
### Wait for Server
```typescript
import { IpcError, ConnectionError, TimeoutError } from '@push.rocks/smartipc';
import { SmartIpc } from '@push.rocks/smartipc';
// Client error handling
// Start your server in another process
const serverProcess = spawn('node', ['server.js']);
// Wait for it to be ready
await SmartIpc.waitForServer({
socketPath: '/tmp/test.sock',
timeoutMs: 10000
});
// Now safe to connect clients
const client = SmartIpc.createClient({
id: 'test-client',
socketPath: '/tmp/test.sock'
});
await client.connect();
```
### Spawn and Connect
```typescript
// Helper that spawns a server and connects a client
const { client, serverProcess } = await SmartIpc.spawnAndConnect({
serverScript: './server.js',
socketPath: '/tmp/test.sock',
clientId: 'test-client',
connectRetry: {
enabled: true,
maxAttempts: 10
}
});
// Use the client
const response = await client.request('ping', {});
// Cleanup
await client.disconnect();
serverProcess.kill();
```
## 🎭 Event Handling
SmartIPC provides comprehensive event emitters:
```typescript
// Server events
server.on('start', () => console.log('Server started'));
server.on('ready', () => console.log('Server ready for connections'));
server.on('clientConnect', (clientId, metadata) => {
console.log(`Client ${clientId} connected with metadata:`, metadata);
});
server.on('clientDisconnect', (clientId) => {
console.log(`Client ${clientId} disconnected`);
});
server.on('error', (error, clientId) => {
console.error(`Error from ${clientId}:`, error);
});
// Client events
client.on('connect', () => console.log('Connected to server'));
client.on('disconnect', () => console.log('Disconnected from server'));
client.on('reconnecting', (attempt) => {
console.log(`Reconnection attempt ${attempt}`);
});
client.on('error', (error) => {
if (error instanceof ConnectionError) {
console.error('Connection failed:', error.message);
} else if (error instanceof TimeoutError) {
console.error('Request timed out:', error.message);
console.error('Client error:', error);
});
client.on('heartbeatTimeout', (error) => {
console.warn('Heartbeat timeout:', error);
});
```
## 🛡️ Error Handling
Robust error handling with detailed error information:
```typescript
// Client-side error handling
try {
const response = await client.request('riskyOperation', data, {
timeout: 5000
});
} catch (error) {
if (error.message.includes('timeout')) {
console.error('Request timed out');
} else if (error.message.includes('Failed to register')) {
console.error('Could not register with server');
} else {
console.error('Unknown error:', error);
}
});
// Server error handling
server.on('client-error', (clientId, error) => {
console.error(`Client ${clientId} error:`, error);
// Optionally disconnect misbehaving clients
if (error.code === 'INVALID_MESSAGE') {
server.disconnectClient(clientId);
}
});
// Request with error handling
try {
const response = await client.request('risky-operation', data, {
timeout: 5000,
retries: 3
});
} catch (error) {
if (error instanceof TimeoutError) {
// Handle timeout
} else {
// Handle other errors
}
}
```
## Testing
SmartIPC includes comprehensive testing utilities:
```typescript
import { createTestServer, createTestClient } from '@push.rocks/smartipc/testing';
describe('My IPC integration', () => {
let server, client;
beforeEach(async () => {
server = await createTestServer();
client = await createTestClient(server);
});
afterEach(async () => {
await client.disconnect();
await server.stop();
});
it('should handle messages', async () => {
server.onMessage('test', (data) => ({ echo: data }));
const response = await client.request('test', { value: 42 });
expect(response.echo.value).toBe(42);
});
// Server-side error boundaries
server.onMessage('process', async (data, clientId) => {
try {
return await riskyProcessing(data);
} catch (error) {
console.error(`Processing failed for ${clientId}:`, error);
throw error; // Will be sent back to client as error
}
});
```
## Performance Benchmarks
## 🏗️ Architecture
SmartIPC has been optimized for high throughput and low latency:
| Transport | Messages/sec | Avg Latency | Use Case |
|-----------|-------------|-------------|----------|
| Unix Socket | 150,000+ | < 0.1ms | Local high-performance IPC |
| TCP (localhost) | 100,000+ | < 0.2ms | Local network-capable IPC |
| TCP (network) | 50,000+ | < 1ms | Distributed systems |
| Named Pipe | 120,000+ | < 0.15ms | Windows local IPC |
*Benchmarked on modern hardware with 1KB message payloads*
## Architecture
SmartIPC uses a layered architecture for maximum flexibility:
SmartIPC uses a clean, layered architecture:
```
┌─────────────────────────────────────────┐
│ Application Layer
(Your business logic and handlers)
Your Application │
(Business logic)
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
IPC Client / Server
│ (High-level API, patterns, routing) │
IpcServer / IpcClient
│ (High-level API, Message routing)
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ IPC Channel │
│ (Connection management, reconnection,
heartbeat, request/response)
│ IpcChannel
│ (Connection management, Heartbeat,
Reconnection, Request/Response) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Transport Layer │
│ (TCP, Unix Socket, Named Pipe) │
@@ -435,25 +488,94 @@ SmartIPC uses a layered architecture for maximum flexibility:
└─────────────────────────────────────────┘
```
## Comparison with Alternatives
## 🎯 Common Use Cases
| Feature | SmartIPC | node-ipc | zeromq | |
|---------|----------|----------|---------|--|
| Zero Dependencies | ✅ | ❌ | ❌ | |
| TypeScript Native | ✅ | ❌ | ❌ | |
| Auto-Reconnect | ✅ | ⚠️ | ✅ | |
| Request/Response | ✅ | ⚠️ | ✅ | |
| Pub/Sub | ✅ | ❌ | ✅ | |
| Built-in Metrics | ✅ | ❌ | ❌ | |
| Heartbeat | ✅ | ❌ | ✅ | |
| Message Size Limits | ✅ | ❌ | ✅ | |
| Type Safety | ✅ | ❌ | ❌ | |
### Microservices Communication
```typescript
// API Gateway
const gateway = SmartIpc.createServer({
id: 'api-gateway',
socketPath: '/tmp/gateway.sock'
});
## Support
// User Service
const userService = SmartIpc.createClient({
id: 'api-gateway',
socketPath: '/tmp/gateway.sock',
clientId: 'user-service'
});
- 📖 [Documentation](https://code.foss.global/push.rocks/smartipc)
- 🐛 [Issue Tracker](https://code.foss.global/push.rocks/smartipc/issues)
- 💬 [Discussions](https://code.foss.global/push.rocks/smartipc/discussions)
// Order Service
const orderService = SmartIpc.createClient({
id: 'api-gateway',
socketPath: '/tmp/gateway.sock',
clientId: 'order-service'
});
```
### Worker Process Management
```typescript
// Main process
const server = SmartIpc.createServer({
id: 'main',
socketPath: '/tmp/workers.sock'
});
server.onMessage('job-complete', (result, workerId) => {
console.log(`Worker ${workerId} completed job:`, result);
});
// Worker process
const worker = SmartIpc.createClient({
id: 'main',
socketPath: '/tmp/workers.sock',
clientId: `worker-${process.pid}`
});
await worker.sendMessage('job-complete', {
jobId: '123',
result: processedData
});
```
### Real-time Event Distribution
```typescript
// Event bus
const eventBus = SmartIpc.createServer({
id: 'event-bus',
socketPath: '/tmp/events.sock'
});
// Services subscribe to events
const analyticsService = SmartIpc.createClient({
id: 'event-bus',
socketPath: '/tmp/events.sock'
});
await analyticsService.subscribe('user.*', (event) => {
trackEvent(event);
});
```
## 📈 Performance
SmartIPC is optimized for high throughput and low latency:
| Transport | Messages/sec | Avg Latency | Use Case |
|-----------|-------------|-------------|----------|
| Unix Socket | 150,000+ | < 0.1ms | Local high-performance IPC (Linux/macOS) |
| Named Pipe | 120,000+ | < 0.15ms | Windows local IPC |
| TCP (localhost) | 100,000+ | < 0.2ms | Local network-capable IPC |
| TCP (network) | 50,000+ | < 1ms | Distributed systems |
- **Memory efficient**: Streaming support for large payloads
- **CPU efficient**: Event-driven, non-blocking I/O
## 🔧 Requirements
- Node.js >= 14.x
- TypeScript >= 4.x (for development)
- Unix-like OS (Linux, macOS) or Windows
## License and Legal Information
@@ -473,7 +595,3 @@ Registered at District court Bremen HRB 35230 HB, Germany
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
---
**Built with ❤️ by Task Venture Capital GmbH**

204
test/test.improvements.ts Normal file
View File

@@ -0,0 +1,204 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartipc from '../ts/index.js';
import * as path from 'path';
import * as fs from 'fs';
import * as os from 'os';
const testSocketPath = path.join(os.tmpdir(), `test-ipc-improvements-${Date.now()}.sock`);
// Test 1: Server Readiness API
tap.test('Server readiness API should emit ready event', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: false // Disable heartbeat for this test
});
let readyEventFired = false;
server.on('ready', () => {
readyEventFired = true;
});
await server.start({ readyWhen: 'accepting' });
expect(readyEventFired).toBeTrue();
expect(server.getIsReady()).toBeTrue();
await server.stop();
});
// Test 2: Automatic Socket Cleanup
tap.test('Should cleanup stale socket file automatically', async () => {
// Create a stale socket file
fs.writeFileSync(testSocketPath, '');
expect(fs.existsSync(testSocketPath)).toBeTrue();
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: false // Disable heartbeat for this test
});
// Should clean up and start successfully
await server.start();
expect(server.getIsReady()).toBeTrue();
await server.stop();
});
// Test 3: Basic Connection with New Options
tap.test('Client should connect with basic configuration', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: false // Disable heartbeat for this test
});
await server.start({ readyWhen: 'accepting' });
// Wait for server to be fully ready
await new Promise(resolve => setTimeout(resolve, 200));
const client = smartipc.SmartIpc.createClient({
id: 'test-server',
socketPath: testSocketPath,
clientId: 'test-client',
registerTimeoutMs: 10000 // Longer timeout
});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
// Test 4: Heartbeat Configuration Without Throwing
tap.test('Heartbeat should use event mode instead of throwing', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: false // Disable server heartbeat for this test
});
// Add error handler to prevent unhandled errors
server.on('error', () => {});
await server.start({ readyWhen: 'accepting' });
await new Promise(resolve => setTimeout(resolve, 200));
const client = smartipc.SmartIpc.createClient({
id: 'test-server',
socketPath: testSocketPath,
clientId: 'heartbeat-client',
heartbeat: true,
heartbeatInterval: 100,
heartbeatTimeout: 300,
heartbeatInitialGracePeriodMs: 1000,
heartbeatThrowOnTimeout: false // Don't throw, emit event
});
let heartbeatTimeoutFired = false;
client.on('heartbeatTimeout', () => {
heartbeatTimeoutFired = true;
});
client.on('error', () => {});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
// Wait a bit but within grace period
await new Promise(resolve => setTimeout(resolve, 500));
// Should still be connected, no timeout during grace period
expect(heartbeatTimeoutFired).toBeFalse();
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
// Test 5: Wait for Server Helper
tap.test('waitForServer should detect when server becomes ready', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: false // Disable heartbeat for this test
});
// Start server after delay
setTimeout(async () => {
await server.start();
}, 200);
// Wait for server should succeed
await smartipc.SmartIpc.waitForServer({
socketPath: testSocketPath,
timeoutMs: 3000
});
// Server should be ready now
const client = smartipc.SmartIpc.createClient({
id: 'test-server',
socketPath: testSocketPath,
clientId: 'wait-test-client'
});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
// Test 6: Connect Retry Configuration
tap.test('Client retry should work with delayed server', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: false // Disable heartbeat for this test
});
const client = smartipc.SmartIpc.createClient({
id: 'test-server',
socketPath: testSocketPath,
clientId: 'retry-client',
connectRetry: {
enabled: true,
initialDelay: 100,
maxDelay: 500,
maxAttempts: 10,
totalTimeout: 5000
}
});
// Start server after a delay
setTimeout(async () => {
await server.start({ readyWhen: 'accepting' });
}, 300);
// Client should retry and eventually connect
await client.connect({ waitForReady: true, waitTimeout: 5000 });
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
// Cleanup
tap.test('Cleanup test socket', async () => {
try {
fs.unlinkSync(testSocketPath);
} catch (e) {
// Ignore if doesn't exist
}
});
export default tap.start();

286
test/test.reliability.ts Normal file
View File

@@ -0,0 +1,286 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartipc from '../ts/index.js';
import * as path from 'path';
import * as fs from 'fs';
import * as os from 'os';
const testSocketPath = path.join(os.tmpdir(), `test-ipc-reliability-${Date.now()}.sock`);
tap.test('Server Readiness API', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true
});
let readyEventFired = false;
server.on('ready', () => {
readyEventFired = true;
});
// Start server with 'accepting' readiness mode
await server.start({ readyWhen: 'accepting' });
// Check that ready event was fired
expect(readyEventFired).toBeTrue();
expect(server.getIsReady()).toBeTrue();
await server.stop();
});
tap.test('Automatic Socket Cleanup', async () => {
// Create a stale socket file
fs.writeFileSync(testSocketPath, '');
expect(fs.existsSync(testSocketPath)).toBeTrue();
const server = smartipc.SmartIpc.createServer({
id: 'test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
socketMode: 0o600
});
// Should clean up stale socket and start successfully
await server.start();
expect(server.getIsReady()).toBeTrue();
await server.stop();
});
tap.test('Client Connection Retry', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'retry-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true
});
// Create client with retry configuration
const client = smartipc.SmartIpc.createClient({
id: 'retry-client',
socketPath: testSocketPath,
connectRetry: {
enabled: true,
initialDelay: 50,
maxDelay: 500,
maxAttempts: 10,
totalTimeout: 5000
},
registerTimeoutMs: 3000
});
// Start server first with accepting readiness mode
await server.start({ readyWhen: 'accepting' });
// Give server a moment to be fully ready
await new Promise(resolve => setTimeout(resolve, 100));
// Client should connect successfully with retry enabled
await client.connect();
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
tap.test('Graceful Heartbeat Handling', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'heartbeat-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
heartbeat: true,
heartbeatInterval: 100,
heartbeatTimeout: 500,
heartbeatInitialGracePeriodMs: 1000,
heartbeatThrowOnTimeout: false
});
// Add error handler to prevent unhandled error
server.on('error', (error) => {
// Ignore heartbeat errors in this test
});
await server.start({ readyWhen: 'accepting' });
// Give server a moment to be fully ready
await new Promise(resolve => setTimeout(resolve, 100));
const client = smartipc.SmartIpc.createClient({
id: 'heartbeat-client',
socketPath: testSocketPath,
heartbeat: true,
heartbeatInterval: 100,
heartbeatTimeout: 500,
heartbeatInitialGracePeriodMs: 1000,
heartbeatThrowOnTimeout: false
});
let heartbeatTimeoutFired = false;
client.on('heartbeatTimeout', () => {
heartbeatTimeoutFired = true;
});
// Add error handler to prevent unhandled error
client.on('error', (error) => {
// Ignore errors in this test
});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
// Wait to ensure heartbeat is working
await new Promise(resolve => setTimeout(resolve, 300));
// Heartbeat should not timeout during normal operation
expect(heartbeatTimeoutFired).toBeFalse();
await client.disconnect();
await server.stop();
});
tap.test('Test Helper - waitForServer', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'wait-test-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true
});
// Start server after a delay
setTimeout(() => {
server.start();
}, 100);
// Wait for server should succeed
await smartipc.SmartIpc.waitForServer({
socketPath: testSocketPath,
timeoutMs: 3000
});
// Server should be ready
const client = smartipc.SmartIpc.createClient({
id: 'wait-test-client',
socketPath: testSocketPath
});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
tap.test('Race Condition - Immediate Connect After Server Start', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'race-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true
});
// Start server and immediately try to connect
const serverPromise = server.start({ readyWhen: 'accepting' });
const client = smartipc.SmartIpc.createClient({
id: 'race-client',
socketPath: testSocketPath,
connectRetry: {
enabled: true,
maxAttempts: 20,
initialDelay: 10,
maxDelay: 100
},
registerTimeoutMs: 5000
});
// Wait for server to be ready
await serverPromise;
// Client should be able to connect without race condition
await client.connect();
expect(client.getIsConnected()).toBeTrue();
// Test request/response to ensure full functionality
server.onMessage('test', async (data) => {
return { echo: data };
});
const response = await client.request('test', { message: 'hello' });
expect(response.echo.message).toEqual('hello');
await client.disconnect();
await server.stop();
});
tap.test('Multiple Clients with Retry', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'multi-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true,
maxClients: 10
});
await server.start({ readyWhen: 'accepting' });
// Create multiple clients with retry
const clients = [];
for (let i = 0; i < 5; i++) {
const client = smartipc.SmartIpc.createClient({
id: `client-${i}`,
socketPath: testSocketPath,
connectRetry: {
enabled: true,
maxAttempts: 5
}
});
clients.push(client);
}
// Connect all clients concurrently
await Promise.all(clients.map(c => c.connect()));
// Verify all connected
for (const client of clients) {
expect(client.getIsConnected()).toBeTrue();
}
// Disconnect all
await Promise.all(clients.map(c => c.disconnect()));
await server.stop();
});
tap.test('Server Restart with Socket Cleanup', async () => {
const server = smartipc.SmartIpc.createServer({
id: 'restart-server',
socketPath: testSocketPath,
autoCleanupSocketFile: true
});
// First start
await server.start();
expect(server.getIsReady()).toBeTrue();
await server.stop();
// Second start - should cleanup and work
await server.start();
expect(server.getIsReady()).toBeTrue();
const client = smartipc.SmartIpc.createClient({
id: 'restart-client',
socketPath: testSocketPath
});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
await client.disconnect();
await server.stop();
});
// Clean up test socket file
tap.test('Cleanup', async () => {
try {
fs.unlinkSync(testSocketPath);
} catch (e) {
// Ignore if doesn't exist
}
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartipc',
version: '2.0.3',
version: '2.1.3',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
}

View File

@@ -22,6 +22,10 @@ export interface IIpcChannelOptions extends IIpcTransportOptions {
heartbeatInterval?: number;
/** Heartbeat timeout in ms */
heartbeatTimeout?: number;
/** Initial grace period before heartbeat timeout in ms */
heartbeatInitialGracePeriodMs?: number;
/** Throw on heartbeat timeout (default: true, set false to emit event instead) */
heartbeatThrowOnTimeout?: boolean;
}
/**
@@ -45,7 +49,9 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
private reconnectTimer?: NodeJS.Timeout;
private heartbeatTimer?: NodeJS.Timeout;
private heartbeatCheckTimer?: NodeJS.Timeout;
private heartbeatGraceTimer?: NodeJS.Timeout;
private lastHeartbeat: number = Date.now();
private connectionStartTime: number = Date.now();
private isReconnecting = false;
private isClosing = false;
@@ -76,6 +82,18 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
...options
};
// Normalize heartbeatThrowOnTimeout to boolean (defensive for JS consumers)
const throwOnTimeout = (this.options as any).heartbeatThrowOnTimeout;
if (throwOnTimeout !== undefined) {
if (throwOnTimeout === 'false') {
this.options.heartbeatThrowOnTimeout = false;
} else if (throwOnTimeout === 'true') {
this.options.heartbeatThrowOnTimeout = true;
} else if (typeof throwOnTimeout !== 'boolean') {
this.options.heartbeatThrowOnTimeout = Boolean(throwOnTimeout);
}
}
this.transport = createTransport(this.options);
this.setupTransportHandlers();
}
@@ -203,6 +221,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
this.stopHeartbeat();
this.lastHeartbeat = Date.now();
this.connectionStartTime = Date.now();
// Send heartbeat messages
this.heartbeatTimer = setInterval(() => {
@@ -211,14 +230,43 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
});
}, this.options.heartbeatInterval!);
// Delay starting the check until after the grace period
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
if (gracePeriod > 0) {
// Use a timer to delay the first check
this.heartbeatGraceTimer = setTimeout(() => {
this.startHeartbeatCheck();
}, gracePeriod);
} else {
// No grace period, start checking immediately
this.startHeartbeatCheck();
}
}
/**
* Start heartbeat timeout checking (separated for grace period handling)
*/
private startHeartbeatCheck(): void {
// Check for heartbeat timeout
this.heartbeatCheckTimer = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) {
this.emit('error', new Error('Heartbeat timeout'));
this.transport.disconnect().catch(() => {});
const error = new Error('Heartbeat timeout');
if (this.options.heartbeatThrowOnTimeout !== false) {
// Default behavior: emit error which may cause disconnect
this.emit('error', error);
this.transport.disconnect().catch(() => {});
} else {
// Emit heartbeatTimeout event instead of error
this.emit('heartbeatTimeout', error);
// Clear timers to avoid repeated events
this.stopHeartbeat();
}
}
}, this.options.heartbeatTimeout! / 2);
}, Math.max(1000, Math.floor(this.options.heartbeatTimeout! / 2)));
}
/**
@@ -234,6 +282,11 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
clearInterval(this.heartbeatCheckTimer);
this.heartbeatCheckTimer = undefined;
}
if (this.heartbeatGraceTimer) {
clearTimeout(this.heartbeatGraceTimer);
this.heartbeatGraceTimer = undefined;
}
}
/**
@@ -408,7 +461,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
* Register a message handler
*/
public on(event: string, handler: (payload: any) => any | Promise<any>): this {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain') {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout') {
// Special handling for channel events
super.on(event, handler);
} else {

View File

@@ -5,11 +5,35 @@ import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Options for IPC Client
*/
export interface IConnectRetryConfig {
/** Enable connection retry */
enabled: boolean;
/** Initial delay before first retry in ms */
initialDelay?: number;
/** Maximum delay between retries in ms */
maxDelay?: number;
/** Maximum number of attempts */
maxAttempts?: number;
/** Total timeout for all retry attempts in ms */
totalTimeout?: number;
}
export interface IClientConnectOptions {
/** Wait for server to be ready before attempting connection */
waitForReady?: boolean;
/** Timeout for waiting for server readiness in ms */
waitTimeout?: number;
}
export interface IIpcClientOptions extends IIpcChannelOptions {
/** Client identifier */
clientId?: string;
/** Client metadata */
metadata?: Record<string, any>;
/** Connection retry configuration */
connectRetry?: IConnectRetryConfig;
/** Registration timeout in ms (default: 5000) */
registerTimeoutMs?: number;
}
/**
@@ -35,34 +59,114 @@ export class IpcClient extends plugins.EventEmitter {
/**
* Connect to the server
*/
public async connect(): Promise<void> {
public async connect(connectOptions: IClientConnectOptions = {}): Promise<void> {
if (this.isConnected) {
return;
}
// Connect the channel
await this.channel.connect();
// Helper function to attempt registration
const attemptRegistration = async (): Promise<void> => {
const registerTimeoutMs = this.options.registerTimeoutMs || 5000;
// Register with the server
try {
const response = await this.channel.request<any, any>(
'__register__',
{
clientId: this.clientId,
metadata: this.options.metadata
},
{ timeout: 5000 }
);
try {
const response = await this.channel.request<any, any>(
'__register__',
{
clientId: this.clientId,
metadata: this.options.metadata
},
{
timeout: registerTimeoutMs,
headers: { clientId: this.clientId } // Include clientId in headers for proper routing
}
);
if (!response.success) {
throw new Error(response.error || 'Registration failed');
if (!response.success) {
throw new Error(response.error || 'Registration failed');
}
this.isConnected = true;
this.emit('connect');
} catch (error) {
throw new Error(`Failed to register with server: ${error.message}`);
}
};
// Helper function to attempt connection with retry
const attemptConnection = async (): Promise<void> => {
const retryConfig = this.options.connectRetry;
const maxAttempts = retryConfig?.maxAttempts || 1;
const initialDelay = retryConfig?.initialDelay || 100;
const maxDelay = retryConfig?.maxDelay || 1500;
const totalTimeout = retryConfig?.totalTimeout || 15000;
const startTime = Date.now();
let lastError: Error | undefined;
let delay = initialDelay;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
// Check total timeout
if (totalTimeout && Date.now() - startTime > totalTimeout) {
throw new Error(`Connection timeout after ${totalTimeout}ms: ${lastError?.message || 'Unknown error'}`);
}
try {
// Connect the channel
await this.channel.connect();
// Attempt registration
await attemptRegistration();
return; // Success!
} catch (error) {
lastError = error as Error;
// Disconnect channel for retry
await this.channel.disconnect().catch(() => {});
// If this isn't the last attempt and retry is enabled, wait before retrying
if (attempt < maxAttempts && retryConfig?.enabled) {
// Check if we have time for another attempt
if (totalTimeout && Date.now() - startTime + delay > totalTimeout) {
break; // Will timeout, don't wait
}
await new Promise(resolve => setTimeout(resolve, delay));
// Exponential backoff with max limit
delay = Math.min(delay * 2, maxDelay);
}
}
}
this.isConnected = true;
this.emit('connect');
} catch (error) {
await this.channel.disconnect();
throw new Error(`Failed to register with server: ${error.message}`);
// All attempts failed
throw lastError || new Error('Failed to connect to server');
};
// If waitForReady is specified, wait for server socket to exist first
if (connectOptions.waitForReady) {
const waitTimeout = connectOptions.waitTimeout || 10000;
const startTime = Date.now();
while (Date.now() - startTime < waitTimeout) {
try {
// Try to connect
await attemptConnection();
return; // Success!
} catch (error) {
// If it's a connection refused error, server might not be ready yet
if ((error as any).message?.includes('ECONNREFUSED') ||
(error as any).message?.includes('ENOENT')) {
await new Promise(resolve => setTimeout(resolve, 100));
continue;
}
// Other errors should be thrown
throw error;
}
}
throw new Error(`Server not ready after ${waitTimeout}ms`);
} else {
// Normal connection attempt
await attemptConnection();
}
}
@@ -93,10 +197,20 @@ export class IpcClient extends plugins.EventEmitter {
this.emit('disconnect', reason);
});
this.channel.on('error', (error) => {
this.channel.on('error', (error: any) => {
// If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event
if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) {
this.emit('heartbeatTimeout', error);
return;
}
this.emit('error', error);
});
this.channel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error);
});
this.channel.on('reconnecting', (info) => {
this.emit('reconnecting', info);
});

View File

@@ -5,11 +5,20 @@ import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Options for IPC Server
*/
export interface IServerStartOptions {
/** When to consider server ready (default: 'socket-bound') */
readyWhen?: 'socket-bound' | 'accepting';
}
export interface IIpcServerOptions extends Omit<IIpcChannelOptions, 'autoReconnect' | 'reconnectDelay' | 'maxReconnectDelay' | 'reconnectMultiplier' | 'maxReconnectAttempts'> {
/** Maximum number of client connections */
maxClients?: number;
/** Client idle timeout in ms */
clientIdleTimeout?: number;
/** Automatically cleanup stale socket file on start (default: false) */
autoCleanupSocketFile?: boolean;
/** Socket file permissions mode (e.g. 0o600) */
socketMode?: number;
}
/**
@@ -32,6 +41,7 @@ export class IpcServer extends plugins.EventEmitter {
private messageHandlers = new Map<string, (payload: any, clientId: string) => any | Promise<any>>();
private primaryChannel?: IpcChannel;
private isRunning = false;
private isReady = false;
private clientIdleCheckTimer?: NodeJS.Timeout;
// Pub/sub tracking
@@ -50,7 +60,7 @@ export class IpcServer extends plugins.EventEmitter {
/**
* Start the server
*/
public async start(): Promise<void> {
public async start(options: IServerStartOptions = {}): Promise<void> {
if (this.isRunning) {
return;
}
@@ -190,12 +200,29 @@ export class IpcServer extends plugins.EventEmitter {
this.emit('error', error, 'server');
});
this.primaryChannel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, 'server');
});
// Connect the primary channel (will start as server)
await this.primaryChannel.connect();
this.isRunning = true;
this.startClientIdleCheck();
this.emit('start');
// Handle readiness based on options
if (options.readyWhen === 'accepting') {
// Wait a bit to ensure handlers are fully set up
await new Promise(resolve => setTimeout(resolve, 10));
this.isReady = true;
this.emit('ready');
} else {
// Default: ready when socket is bound
this.isReady = true;
this.emit('ready');
}
}
/**
@@ -317,6 +344,19 @@ export class IpcServer extends plugins.EventEmitter {
}
this.emit('error', error, actualClientId);
});
channel.on('heartbeatTimeout', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, actualClientId);
});
}
/**
@@ -505,4 +545,11 @@ export class IpcServer extends plugins.EventEmitter {
uptime: this.primaryChannel ? Date.now() - (this.primaryChannel as any).connectedAt : undefined
};
}
/**
* Check if server is ready to accept connections
*/
public getIsReady(): boolean {
return this.isReady;
}
}

View File

@@ -34,6 +34,10 @@ export interface IIpcTransportOptions {
noDelay?: boolean;
/** Maximum message size in bytes (default: 8MB) */
maxMessageSize?: number;
/** Automatically cleanup stale socket file on start (default: false) */
autoCleanupSocketFile?: boolean;
/** Socket file permissions mode (e.g. 0o600) */
socketMode?: number;
}
/**
@@ -165,6 +169,8 @@ export class UnixSocketTransport extends IpcTransport {
private socket: plugins.net.Socket | null = null;
private server: plugins.net.Server | null = null;
private clients: Set<plugins.net.Socket> = new Set();
private socketToClientId = new WeakMap<plugins.net.Socket, string>();
private clientIdToSocket = new Map<string, plugins.net.Socket>();
/**
* Connect as client or start as server
@@ -206,11 +212,13 @@ export class UnixSocketTransport extends IpcTransport {
*/
private async startServer(socketPath: string): Promise<void> {
return new Promise((resolve, reject) => {
// Clean up stale socket file if it exists
try {
plugins.fs.unlinkSync(socketPath);
} catch (error) {
// File doesn't exist, that's fine
// Clean up stale socket file if autoCleanupSocketFile is enabled
if (this.options.autoCleanupSocketFile) {
try {
plugins.fs.unlinkSync(socketPath);
} catch (error) {
// File doesn't exist, that's fine
}
}
this.server = plugins.net.createServer((socket) => {
@@ -233,6 +241,12 @@ export class UnixSocketTransport extends IpcTransport {
socket.on('close', () => {
this.clients.delete(socket);
// Clean up clientId mappings
const clientId = this.socketToClientId.get(socket);
if (clientId && this.clientIdToSocket.get(clientId) === socket) {
this.clientIdToSocket.delete(clientId);
}
this.socketToClientId.delete(socket);
this.emit('clientDisconnected', socket);
});
@@ -247,6 +261,15 @@ export class UnixSocketTransport extends IpcTransport {
this.server.on('error', reject);
this.server.listen(socketPath, () => {
// Set socket permissions if specified
if (this.options.socketMode !== undefined && process.platform !== 'win32') {
try {
plugins.fs.chmodSync(socketPath, this.options.socketMode);
} catch (error) {
// Ignore permission errors, not critical
}
}
this.connected = true;
this.emit('connect');
resolve();
@@ -292,7 +315,18 @@ export class UnixSocketTransport extends IpcTransport {
// Parse and emit the message with socket reference
try {
const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope;
// Update clientId mapping
const clientId = message.headers?.clientId ??
(message.type === '__register__' ? (message.payload as any)?.clientId : undefined);
if (clientId) {
this.socketToClientId.set(socket, clientId);
this.clientIdToSocket.set(clientId, socket);
}
// Emit both events so IpcChannel can process it
this.emit('clientMessage', message, socket);
this.emit('message', message);
} catch (error: any) {
this.emit('error', new Error(`Failed to parse message: ${error.message}`));
}
@@ -400,27 +434,54 @@ export class UnixSocketTransport extends IpcTransport {
}
});
} else if (this.server && this.clients.size > 0) {
// Server mode - broadcast to all clients
const promises: Promise<boolean>[] = [];
// Server mode - route by clientId if present, otherwise broadcast
const targetClientId = message.headers?.clientId;
for (const client of this.clients) {
promises.push(new Promise((resolve) => {
const success = client.write(frame, (error) => {
if (error) {
resolve(false);
} else {
resolve(true);
if (targetClientId && this.clientIdToSocket.has(targetClientId)) {
// Send to specific client
const targetSocket = this.clientIdToSocket.get(targetClientId)!;
if (targetSocket && !targetSocket.destroyed) {
return new Promise((resolve) => {
const success = targetSocket.write(frame, (error) => {
if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
targetSocket.once('drain', () => resolve(true));
}
});
} else {
// Socket is destroyed, remove from mappings
this.clientIdToSocket.delete(targetClientId);
return false;
}
} else {
// Broadcast to all clients (fallback for messages without specific target)
const promises: Promise<boolean>[] = [];
if (!success) {
client.once('drain', () => resolve(true));
}
}));
for (const client of this.clients) {
promises.push(new Promise((resolve) => {
const success = client.write(frame, (error) => {
if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
client.once('drain', () => resolve(true));
}
}));
}
const results = await Promise.all(promises);
return results.every(r => r);
}
const results = await Promise.all(promises);
return results.every(r => r);
}
return false;

View File

@@ -7,7 +7,7 @@ import { IpcServer } from './classes.ipcserver.js';
import { IpcClient } from './classes.ipcclient.js';
import { IpcChannel } from './classes.ipcchannel.js';
import type { IIpcServerOptions } from './classes.ipcserver.js';
import type { IIpcClientOptions } from './classes.ipcclient.js';
import type { IIpcClientOptions, IConnectRetryConfig } from './classes.ipcclient.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
@@ -17,6 +17,96 @@ export class SmartIpc {
/**
* Create an IPC server
*/
/**
* Wait for a server to become ready at the given socket path
*/
public static async waitForServer(options: {
socketPath: string;
timeoutMs?: number;
}): Promise<void> {
const timeout = options.timeoutMs || 10000;
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
try {
// Create a temporary client with proper options
const testClient = SmartIpc.createClient({
id: 'test-probe',
socketPath: options.socketPath,
clientId: `probe-${process.pid}-${Date.now()}`,
heartbeat: false,
connectRetry: {
enabled: false // Don't retry, we're handling retries here
},
registerTimeoutMs: 2000 // Short timeout for quick probing
});
// Try to connect and register with the server
await testClient.connect();
// Success! Clean up and return
await testClient.disconnect();
return;
} catch (error) {
// Server not ready yet, wait and retry
await new Promise(resolve => setTimeout(resolve, 200));
}
}
throw new Error(`Server not ready at ${options.socketPath} after ${timeout}ms`);
}
/**
* Helper to spawn a server process and connect a client
*/
public static async spawnAndConnect(options: {
serverScript: string;
socketPath: string;
clientId?: string;
spawnOptions?: any;
connectRetry?: IConnectRetryConfig;
timeoutMs?: number;
}): Promise<{
client: IpcClient;
serverProcess: any;
}> {
const { spawn } = await import('child_process');
// Spawn the server process
const serverProcess = spawn('node', [options.serverScript], {
detached: true,
stdio: 'pipe',
...options.spawnOptions
});
// Handle server process errors
serverProcess.on('error', (error: Error) => {
console.error('Server process error:', error);
});
// Wait for server to be ready
await SmartIpc.waitForServer({
socketPath: options.socketPath,
timeoutMs: options.timeoutMs || 10000
});
// Create and connect client
const client = new IpcClient({
id: options.clientId || 'test-client',
socketPath: options.socketPath,
connectRetry: options.connectRetry || {
enabled: true,
maxAttempts: 10,
initialDelay: 100,
maxDelay: 1000
}
});
await client.connect({ waitForReady: true });
return { client, serverProcess };
}
public static createServer(options: IIpcServerOptions): IpcServer {
return new IpcServer(options);
}