BREAKING CHANGE(core): Refactor core IPC: replace node-ipc with native transports and add IpcChannel / IpcServer / IpcClient with heartbeat, reconnection, request/response and pub/sub. Update tests and documentation.

This commit is contained in:
2025-08-24 16:39:09 +00:00
parent 234aab74d6
commit 4a1096a0ab
12 changed files with 3003 additions and 227 deletions

8
ts/00_commitinfo_data.ts Normal file
View File

@@ -0,0 +1,8 @@
/**
* autocreated commitinfo by @push.rocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/smartipc',
version: '2.0.0',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
}

472
ts/classes.ipcchannel.ts Normal file
View File

@@ -0,0 +1,472 @@
import * as plugins from './smartipc.plugins.js';
import { IpcTransport, createTransport } from './classes.transports.js';
import type { IIpcMessageEnvelope, IIpcTransportOptions } from './classes.transports.js';
/**
* Options for IPC channel
*/
export interface IIpcChannelOptions extends IIpcTransportOptions {
/** Enable automatic reconnection */
autoReconnect?: boolean;
/** Initial reconnect delay in ms */
reconnectDelay?: number;
/** Maximum reconnect delay in ms */
maxReconnectDelay?: number;
/** Reconnect delay multiplier */
reconnectMultiplier?: number;
/** Maximum number of reconnect attempts */
maxReconnectAttempts?: number;
/** Enable heartbeat */
heartbeat?: boolean;
/** Heartbeat interval in ms */
heartbeatInterval?: number;
/** Heartbeat timeout in ms */
heartbeatTimeout?: number;
}
/**
* Request/Response tracking
*/
interface IPendingRequest<T = any> {
resolve: (value: T) => void;
reject: (error: Error) => void;
timer?: NodeJS.Timeout;
}
/**
* IPC Channel with connection management, auto-reconnect, and typed messaging
*/
export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEmitter {
private transport: IpcTransport;
private options: IIpcChannelOptions;
private pendingRequests = new Map<string, IPendingRequest>();
private messageHandlers = new Map<string, (payload: any) => any | Promise<any>>();
private reconnectAttempts = 0;
private reconnectTimer?: NodeJS.Timeout;
private heartbeatTimer?: NodeJS.Timeout;
private heartbeatCheckTimer?: NodeJS.Timeout;
private lastHeartbeat: number = Date.now();
private isReconnecting = false;
private isClosing = false;
// Metrics
private metrics = {
messagesSent: 0,
messagesReceived: 0,
bytesSent: 0,
bytesReceived: 0,
reconnects: 0,
heartbeatTimeouts: 0,
errors: 0,
requestTimeouts: 0,
connectedAt: 0
};
constructor(options: IIpcChannelOptions) {
super();
this.options = {
autoReconnect: true,
reconnectDelay: 1000,
maxReconnectDelay: 30000,
reconnectMultiplier: 1.5,
maxReconnectAttempts: Infinity,
heartbeat: true,
heartbeatInterval: 5000,
heartbeatTimeout: 10000,
...options
};
this.transport = createTransport(this.options);
this.setupTransportHandlers();
}
/**
* Setup transport event handlers
*/
private setupTransportHandlers(): void {
this.transport.on('connect', () => {
this.reconnectAttempts = 0;
this.isReconnecting = false;
this.metrics.connectedAt = Date.now();
this.startHeartbeat();
this.emit('connect');
});
this.transport.on('disconnect', (reason) => {
this.stopHeartbeat();
this.clearPendingRequests(new Error(`Disconnected: ${reason || 'Unknown reason'}`));
this.emit('disconnect', reason);
if (this.options.autoReconnect && !this.isClosing) {
this.scheduleReconnect();
}
});
this.transport.on('error', (error) => {
this.emit('error', error);
});
this.transport.on('message', (message: IIpcMessageEnvelope) => {
this.handleMessage(message);
});
this.transport.on('drain', () => {
this.emit('drain');
});
}
/**
* Connect the channel
*/
public async connect(): Promise<void> {
if (this.transport.isConnected()) {
return;
}
try {
await this.transport.connect();
} catch (error) {
this.emit('error', error);
if (this.options.autoReconnect && !this.isClosing) {
this.scheduleReconnect();
} else {
throw error;
}
}
}
/**
* Disconnect the channel
*/
public async disconnect(): Promise<void> {
this.isClosing = true;
this.stopHeartbeat();
this.cancelReconnect();
this.clearPendingRequests(new Error('Channel closed'));
await this.transport.disconnect();
}
/**
* Schedule a reconnection attempt
*/
private scheduleReconnect(): void {
if (this.isReconnecting || this.isClosing) {
return;
}
if (this.options.maxReconnectAttempts !== Infinity &&
this.reconnectAttempts >= this.options.maxReconnectAttempts) {
this.emit('error', new Error('Maximum reconnection attempts reached'));
return;
}
this.isReconnecting = true;
this.reconnectAttempts++;
// Calculate delay with exponential backoff and jitter
const baseDelay = Math.min(
this.options.reconnectDelay! * Math.pow(this.options.reconnectMultiplier!, this.reconnectAttempts - 1),
this.options.maxReconnectDelay!
);
const jitter = Math.random() * 0.1 * baseDelay; // 10% jitter
const delay = baseDelay + jitter;
this.emit('reconnecting', { attempt: this.reconnectAttempts, delay });
this.reconnectTimer = setTimeout(async () => {
try {
await this.transport.connect();
} catch (error) {
// Connection failed, will be rescheduled by disconnect handler
}
}, delay);
}
/**
* Cancel scheduled reconnection
*/
private cancelReconnect(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
}
this.isReconnecting = false;
}
/**
* Start heartbeat mechanism
*/
private startHeartbeat(): void {
if (!this.options.heartbeat) {
return;
}
this.stopHeartbeat();
this.lastHeartbeat = Date.now();
// Send heartbeat messages
this.heartbeatTimer = setInterval(() => {
this.sendMessage('__heartbeat__', { timestamp: Date.now() }).catch(() => {
// Ignore heartbeat send errors
});
}, this.options.heartbeatInterval!);
// Check for heartbeat timeout
this.heartbeatCheckTimer = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) {
this.emit('error', new Error('Heartbeat timeout'));
this.transport.disconnect().catch(() => {});
}
}, this.options.heartbeatTimeout! / 2);
}
/**
* Stop heartbeat mechanism
*/
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
}
if (this.heartbeatCheckTimer) {
clearInterval(this.heartbeatCheckTimer);
this.heartbeatCheckTimer = undefined;
}
}
/**
* Handle incoming messages
*/
private handleMessage(message: IIpcMessageEnvelope): void {
// Track metrics
this.metrics.messagesReceived++;
this.metrics.bytesReceived += JSON.stringify(message).length;
// Handle heartbeat and send response
if (message.type === '__heartbeat__') {
this.lastHeartbeat = Date.now();
// Reply so the sender also observes liveness
this.transport.send({
id: plugins.crypto.randomUUID(),
type: '__heartbeat_response__',
correlationId: message.id,
timestamp: Date.now(),
payload: { timestamp: Date.now() },
headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined
}).catch(() => {});
return;
}
// Handle heartbeat response
if (message.type === '__heartbeat_response__') {
this.lastHeartbeat = Date.now();
return;
}
// Handle request/response
if (message.correlationId && this.pendingRequests.has(message.correlationId)) {
const pending = this.pendingRequests.get(message.correlationId)!;
this.pendingRequests.delete(message.correlationId);
if (pending.timer) {
clearTimeout(pending.timer);
}
if (message.headers?.error) {
pending.reject(new Error(message.headers.error));
} else {
pending.resolve(message.payload);
}
return;
}
// Handle regular messages
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
// If message expects a response
if (message.headers?.requiresResponse && message.id) {
Promise.resolve()
.then(() => handler(message.payload))
.then((result) => {
const response: IIpcMessageEnvelope = {
id: plugins.crypto.randomUUID(),
type: `${message.type}_response`,
correlationId: message.id,
timestamp: Date.now(),
payload: result,
headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined
};
return this.transport.send(response);
})
.catch((error: any) => {
const response: IIpcMessageEnvelope = {
id: plugins.crypto.randomUUID(),
type: `${message.type}_response`,
correlationId: message.id,
timestamp: Date.now(),
payload: null,
headers: {
error: error.message,
...(message.headers?.clientId ? { clientId: message.headers.clientId } : {})
}
};
return this.transport.send(response);
});
} else {
// Fire and forget
try {
handler(message.payload);
} catch (error) {
this.emit('error', error);
}
}
} else {
// Emit unhandled message
this.emit('message', message);
}
}
/**
* Send a message without expecting a response
*/
public async sendMessage(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
// Extract correlationId from headers and place it at top level
const { correlationId, ...restHeaders } = headers ?? {};
const message: IIpcMessageEnvelope = {
id: plugins.crypto.randomUUID(),
type,
timestamp: Date.now(),
payload,
...(correlationId ? { correlationId } : {}),
headers: Object.keys(restHeaders).length ? restHeaders : undefined
};
const success = await this.transport.send(message);
if (!success) {
this.metrics.errors++;
throw new Error('Failed to send message');
}
// Track metrics
this.metrics.messagesSent++;
this.metrics.bytesSent += JSON.stringify(message).length;
}
/**
* Send a request and wait for response
*/
public async request<TReq = TRequest, TRes = TResponse>(
type: string,
payload: TReq,
options?: { timeout?: number; headers?: Record<string, any> }
): Promise<TRes> {
const messageId = plugins.crypto.randomUUID();
const timeout = options?.timeout || 30000;
const message: IIpcMessageEnvelope<TReq> = {
id: messageId,
type,
timestamp: Date.now(),
payload,
headers: {
...options?.headers,
requiresResponse: true
}
};
return new Promise<TRes>((resolve, reject) => {
// Setup timeout
const timer = setTimeout(() => {
this.pendingRequests.delete(messageId);
reject(new Error(`Request timeout for ${type}`));
}, timeout);
// Store pending request
this.pendingRequests.set(messageId, { resolve, reject, timer });
// Send message with better error handling
this.transport.send(message)
.then((success) => {
if (!success) {
this.pendingRequests.delete(messageId);
clearTimeout(timer);
reject(new Error('Failed to send message'));
}
})
.catch((error) => {
this.pendingRequests.delete(messageId);
clearTimeout(timer);
reject(error);
});
});
}
/**
* Register a message handler
*/
public on(event: string, handler: (payload: any) => any | Promise<any>): this {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain') {
// Special handling for channel events
super.on(event, handler);
} else {
// Register as message type handler
this.messageHandlers.set(event, handler);
}
return this;
}
/**
* Clear all pending requests
*/
private clearPendingRequests(error: Error): void {
for (const [id, pending] of this.pendingRequests) {
if (pending.timer) {
clearTimeout(pending.timer);
}
pending.reject(error);
}
this.pendingRequests.clear();
}
/**
* Check if channel is connected
*/
public isConnected(): boolean {
return this.transport.isConnected();
}
/**
* Get channel statistics
*/
public getStats(): {
connected: boolean;
reconnectAttempts: number;
pendingRequests: number;
isReconnecting: boolean;
metrics: {
messagesSent: number;
messagesReceived: number;
bytesSent: number;
bytesReceived: number;
reconnects: number;
heartbeatTimeouts: number;
errors: number;
requestTimeouts: number;
uptime?: number;
};
} {
return {
connected: this.transport.isConnected(),
reconnectAttempts: this.reconnectAttempts,
pendingRequests: this.pendingRequests.size,
isReconnecting: this.isReconnecting,
metrics: {
...this.metrics,
uptime: this.metrics.connectedAt ? Date.now() - this.metrics.connectedAt : undefined
}
};
}
}

232
ts/classes.ipcclient.ts Normal file
View File

@@ -0,0 +1,232 @@
import * as plugins from './smartipc.plugins.js';
import { IpcChannel } from './classes.ipcchannel.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Options for IPC Client
*/
export interface IIpcClientOptions extends IIpcChannelOptions {
/** Client identifier */
clientId?: string;
/** Client metadata */
metadata?: Record<string, any>;
}
/**
* IPC Client for connecting to an IPC server
*/
export class IpcClient extends plugins.EventEmitter {
private options: IIpcClientOptions;
private channel: IpcChannel;
private messageHandlers = new Map<string, (payload: any) => any | Promise<any>>();
private isConnected = false;
private clientId: string;
constructor(options: IIpcClientOptions) {
super();
this.options = options;
this.clientId = options.clientId || plugins.crypto.randomUUID();
// Create the channel
this.channel = new IpcChannel(this.options);
this.setupChannelHandlers();
}
/**
* Connect to the server
*/
public async connect(): Promise<void> {
if (this.isConnected) {
return;
}
// Connect the channel
await this.channel.connect();
// Register with the server
try {
const response = await this.channel.request<any, any>(
'__register__',
{
clientId: this.clientId,
metadata: this.options.metadata
},
{ timeout: 5000 }
);
if (!response.success) {
throw new Error(response.error || 'Registration failed');
}
this.isConnected = true;
this.emit('connect');
} catch (error) {
await this.channel.disconnect();
throw new Error(`Failed to register with server: ${error.message}`);
}
}
/**
* Disconnect from the server
*/
public async disconnect(): Promise<void> {
if (!this.isConnected) {
return;
}
this.isConnected = false;
await this.channel.disconnect();
this.emit('disconnect');
}
/**
* Setup channel event handlers
*/
private setupChannelHandlers(): void {
// Forward channel events
this.channel.on('connect', () => {
// Don't emit connect here, wait for successful registration
});
this.channel.on('disconnect', (reason) => {
this.isConnected = false;
this.emit('disconnect', reason);
});
this.channel.on('error', (error) => {
this.emit('error', error);
});
this.channel.on('reconnecting', (info) => {
this.emit('reconnecting', info);
});
// Handle messages
this.channel.on('message', (message) => {
// Check if we have a handler for this message type
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
// If message expects a response
if (message.headers?.requiresResponse && message.id) {
Promise.resolve()
.then(() => handler(message.payload))
.then((result) => {
return this.channel.sendMessage(
`${message.type}_response`,
result,
{ correlationId: message.id }
);
})
.catch((error) => {
return this.channel.sendMessage(
`${message.type}_response`,
null,
{ correlationId: message.id, error: error.message }
);
});
} else {
// Fire and forget
handler(message.payload);
}
} else {
// Emit unhandled message
this.emit('message', message);
}
});
}
/**
* Register a message handler
*/
public onMessage(type: string, handler: (payload: any) => any | Promise<any>): void {
this.messageHandlers.set(type, handler);
}
/**
* Send a message to the server
*/
public async sendMessage(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
if (!this.isConnected) {
throw new Error('Client is not connected');
}
// Always include clientId in headers
await this.channel.sendMessage(type, payload, {
...headers,
clientId: this.clientId
});
}
/**
* Send a request to the server and wait for response
*/
public async request<TReq = any, TRes = any>(
type: string,
payload: TReq,
options?: { timeout?: number; headers?: Record<string, any> }
): Promise<TRes> {
if (!this.isConnected) {
throw new Error('Client is not connected');
}
// Always include clientId in headers
return this.channel.request<TReq, TRes>(type, payload, {
...options,
headers: {
...options?.headers,
clientId: this.clientId
}
});
}
/**
* Subscribe to a topic (pub/sub pattern)
*/
public async subscribe(topic: string, handler: (payload: any) => void): Promise<void> {
// Register local handler
this.messageHandlers.set(`topic:${topic}`, handler);
// Notify server about subscription
await this.sendMessage('__subscribe__', { topic });
}
/**
* Unsubscribe from a topic
*/
public async unsubscribe(topic: string): Promise<void> {
// Remove local handler
this.messageHandlers.delete(`topic:${topic}`);
// Notify server about unsubscription
await this.sendMessage('__unsubscribe__', { topic });
}
/**
* Publish to a topic
*/
public async publish(topic: string, payload: any): Promise<void> {
await this.sendMessage('__publish__', { topic, payload });
}
/**
* Get client ID
*/
public getClientId(): string {
return this.clientId;
}
/**
* Check if client is connected
*/
public getIsConnected(): boolean {
return this.isConnected && this.channel.isConnected();
}
/**
* Get client statistics
*/
public getStats(): any {
return this.channel.getStats();
}
}

508
ts/classes.ipcserver.ts Normal file
View File

@@ -0,0 +1,508 @@
import * as plugins from './smartipc.plugins.js';
import { IpcChannel } from './classes.ipcchannel.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Options for IPC Server
*/
export interface IIpcServerOptions extends Omit<IIpcChannelOptions, 'autoReconnect' | 'reconnectDelay' | 'maxReconnectDelay' | 'reconnectMultiplier' | 'maxReconnectAttempts'> {
/** Maximum number of client connections */
maxClients?: number;
/** Client idle timeout in ms */
clientIdleTimeout?: number;
}
/**
* Client connection information
*/
interface IClientConnection {
id: string;
channel: IpcChannel;
connectedAt: number;
lastActivity: number;
metadata?: Record<string, any>;
}
/**
* IPC Server for handling multiple client connections
*/
export class IpcServer extends plugins.EventEmitter {
private options: IIpcServerOptions;
private clients = new Map<string, IClientConnection>();
private messageHandlers = new Map<string, (payload: any, clientId: string) => any | Promise<any>>();
private primaryChannel?: IpcChannel;
private isRunning = false;
private clientIdleCheckTimer?: NodeJS.Timeout;
// Pub/sub tracking
private topicIndex = new Map<string, Set<string>>(); // topic -> clientIds
private clientTopics = new Map<string, Set<string>>(); // clientId -> topics
constructor(options: IIpcServerOptions) {
super();
this.options = {
maxClients: Infinity,
clientIdleTimeout: 0, // 0 means no timeout
...options
};
}
/**
* Start the server
*/
public async start(): Promise<void> {
if (this.isRunning) {
return;
}
// Create primary channel for initial connections
this.primaryChannel = new IpcChannel({
...this.options,
autoReconnect: false // Server doesn't auto-reconnect
});
// Register the __register__ handler on the channel
this.primaryChannel.on('__register__', async (payload: { clientId: string; metadata?: Record<string, any> }) => {
const clientId = payload.clientId;
const metadata = payload.metadata;
// Check max clients
if (this.clients.size >= this.options.maxClients!) {
return { success: false, error: 'Maximum number of clients reached' };
}
// Create new client connection
const clientConnection: IClientConnection = {
id: clientId,
channel: this.primaryChannel!,
connectedAt: Date.now(),
lastActivity: Date.now(),
metadata: metadata
};
this.clients.set(clientId, clientConnection);
this.emit('clientConnect', clientId, metadata);
return { success: true, clientId: clientId };
});
// Handle other messages
this.primaryChannel.on('message', (message) => {
// Extract client ID from message headers
const clientId = message.headers?.clientId || 'unknown';
// Update last activity
if (this.clients.has(clientId)) {
this.clients.get(clientId)!.lastActivity = Date.now();
}
// Handle pub/sub messages
if (message.type === '__subscribe__') {
const topic = message.payload?.topic;
if (typeof topic === 'string' && topic.length) {
let set = this.topicIndex.get(topic);
if (!set) this.topicIndex.set(topic, (set = new Set()));
set.add(clientId);
let cset = this.clientTopics.get(clientId);
if (!cset) this.clientTopics.set(clientId, (cset = new Set()));
cset.add(topic);
}
return;
}
if (message.type === '__unsubscribe__') {
const topic = message.payload?.topic;
const set = this.topicIndex.get(topic);
if (set) {
set.delete(clientId);
if (set.size === 0) this.topicIndex.delete(topic);
}
const cset = this.clientTopics.get(clientId);
if (cset) {
cset.delete(topic);
if (cset.size === 0) this.clientTopics.delete(clientId);
}
return;
}
if (message.type === '__publish__') {
const topic = message.payload?.topic;
const payload = message.payload?.payload;
const targets = this.topicIndex.get(topic);
if (targets && targets.size) {
// Send to subscribers
const sends: Promise<void>[] = [];
for (const subClientId of targets) {
sends.push(
this.sendToClient(subClientId, `topic:${topic}`, payload)
.catch(err => {
this.emit('error', err, subClientId);
})
);
}
Promise.allSettled(sends).catch(() => {});
}
return;
}
// Forward to registered handlers
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
// If message expects a response
if (message.headers?.requiresResponse && message.id) {
Promise.resolve()
.then(() => handler(message.payload, clientId))
.then((result) => {
return this.primaryChannel!.sendMessage(
`${message.type}_response`,
result,
{ correlationId: message.id, clientId }
);
})
.catch((error) => {
return this.primaryChannel!.sendMessage(
`${message.type}_response`,
null,
{ correlationId: message.id, error: error.message, clientId }
);
});
} else {
// Fire and forget
handler(message.payload, clientId);
}
}
// Emit raw message event
this.emit('message', message, clientId);
});
// Setup primary channel handlers
this.primaryChannel.on('disconnect', () => {
// Server disconnected, clear all clients and subscriptions
for (const [clientId] of this.clients) {
this.cleanupClientSubscriptions(clientId);
}
this.clients.clear();
});
this.primaryChannel.on('error', (error) => {
this.emit('error', error, 'server');
});
// Connect the primary channel (will start as server)
await this.primaryChannel.connect();
this.isRunning = true;
this.startClientIdleCheck();
this.emit('start');
}
/**
* Stop the server
*/
public async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
this.isRunning = false;
this.stopClientIdleCheck();
// Disconnect all clients
const disconnectPromises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
disconnectPromises.push(
client.channel.disconnect()
.then(() => {
this.emit('clientDisconnect', clientId);
})
.catch(() => {}) // Ignore disconnect errors
);
}
await Promise.all(disconnectPromises);
this.clients.clear();
// Disconnect primary channel
if (this.primaryChannel) {
await this.primaryChannel.disconnect();
this.primaryChannel = undefined;
}
this.emit('stop');
}
/**
* Setup channel event handlers
*/
private setupChannelHandlers(channel: IpcChannel, clientId: string): void {
// Handle client registration
channel.on('__register__', async (payload: { clientId: string; metadata?: Record<string, any> }) => {
if (payload.clientId && payload.clientId !== clientId) {
// New client registration
const newClientId = payload.clientId;
// Check max clients
if (this.clients.size >= this.options.maxClients!) {
throw new Error('Maximum number of clients reached');
}
// Create new client connection
const clientConnection: IClientConnection = {
id: newClientId,
channel: channel,
connectedAt: Date.now(),
lastActivity: Date.now(),
metadata: payload.metadata
};
this.clients.set(newClientId, clientConnection);
this.emit('clientConnect', newClientId, payload.metadata);
// Now messages from this channel should be associated with the new client ID
clientId = newClientId;
return { success: true, clientId: newClientId };
}
return { success: false, error: 'Invalid registration' };
});
// Handle messages - pass the correct clientId
channel.on('message', (message) => {
// Try to find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Update last activity
if (actualClientId !== 'primary' && this.clients.has(actualClientId)) {
this.clients.get(actualClientId)!.lastActivity = Date.now();
}
// Forward to registered handlers
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
handler(message.payload, actualClientId);
}
// Emit raw message event
this.emit('message', message, actualClientId);
});
// Handle disconnect
channel.on('disconnect', () => {
// Find and remove the actual client
for (const [id, client] of this.clients) {
if (client.channel === channel) {
this.clients.delete(id);
this.emit('clientDisconnect', id);
break;
}
}
});
// Handle errors
channel.on('error', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
this.emit('error', error, actualClientId);
});
}
/**
* Register a message handler
*/
public onMessage(type: string, handler: (payload: any, clientId: string) => any | Promise<any>): void {
this.messageHandlers.set(type, handler);
}
/**
* Send message to specific client
*/
public async sendToClient(clientId: string, type: string, payload: any, headers?: Record<string, any>): Promise<void> {
const client = this.clients.get(clientId);
if (!client) {
throw new Error(`Client ${clientId} not found`);
}
await client.channel.sendMessage(type, payload, headers);
}
/**
* Send request to specific client and wait for response
*/
public async requestFromClient<TReq = any, TRes = any>(
clientId: string,
type: string,
payload: TReq,
options?: { timeout?: number; headers?: Record<string, any> }
): Promise<TRes> {
const client = this.clients.get(clientId);
if (!client) {
throw new Error(`Client ${clientId} not found`);
}
return client.channel.request<TReq, TRes>(type, payload, options);
}
/**
* Broadcast message to all clients
*/
public async broadcast(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
const promises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
promises.push(
client.channel.sendMessage(type, payload, headers)
.catch((error) => {
this.emit('error', error, clientId);
})
);
}
await Promise.all(promises);
}
/**
* Broadcast message to clients matching a filter
*/
public async broadcastTo(
filter: (clientId: string, metadata?: Record<string, any>) => boolean,
type: string,
payload: any,
headers?: Record<string, any>
): Promise<void> {
const promises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
if (filter(clientId, client.metadata)) {
promises.push(
client.channel.sendMessage(type, payload, headers)
.catch((error) => {
this.emit('error', error, clientId);
})
);
}
}
await Promise.all(promises);
}
/**
* Get connected client IDs
*/
public getClientIds(): string[] {
return Array.from(this.clients.keys());
}
/**
* Get client information
*/
public getClientInfo(clientId: string): {
id: string;
connectedAt: number;
lastActivity: number;
metadata?: Record<string, any>;
} | undefined {
const client = this.clients.get(clientId);
if (!client) {
return undefined;
}
return {
id: client.id,
connectedAt: client.connectedAt,
lastActivity: client.lastActivity,
metadata: client.metadata
};
}
/**
* Disconnect a specific client
*/
public async disconnectClient(clientId: string): Promise<void> {
const client = this.clients.get(clientId);
if (!client) {
return;
}
await client.channel.disconnect();
this.clients.delete(clientId);
this.cleanupClientSubscriptions(clientId);
this.emit('clientDisconnect', clientId);
}
/**
* Clean up topic subscriptions for a disconnected client
*/
private cleanupClientSubscriptions(clientId: string): void {
const topics = this.clientTopics.get(clientId);
if (topics) {
for (const topic of topics) {
const set = this.topicIndex.get(topic);
if (set) {
set.delete(clientId);
if (set.size === 0) this.topicIndex.delete(topic);
}
}
this.clientTopics.delete(clientId);
}
}
/**
* Start checking for idle clients
*/
private startClientIdleCheck(): void {
if (!this.options.clientIdleTimeout || this.options.clientIdleTimeout <= 0) {
return;
}
this.clientIdleCheckTimer = setInterval(() => {
const now = Date.now();
const timeout = this.options.clientIdleTimeout!;
for (const [clientId, client] of this.clients) {
if (now - client.lastActivity > timeout) {
this.disconnectClient(clientId).catch(() => {});
}
}
}, this.options.clientIdleTimeout / 2);
}
/**
* Stop checking for idle clients
*/
private stopClientIdleCheck(): void {
if (this.clientIdleCheckTimer) {
clearInterval(this.clientIdleCheckTimer);
this.clientIdleCheckTimer = undefined;
}
}
/**
* Get server statistics
*/
public getStats(): {
isRunning: boolean;
connectedClients: number;
maxClients: number;
uptime?: number;
} {
return {
isRunning: this.isRunning,
connectedClients: this.clients.size,
maxClients: this.options.maxClients!,
uptime: this.primaryChannel ? Date.now() - (this.primaryChannel as any).connectedAt : undefined
};
}
}

660
ts/classes.transports.ts Normal file
View File

@@ -0,0 +1,660 @@
import * as plugins from './smartipc.plugins.js';
/**
* Message envelope structure for all IPC messages
*/
export interface IIpcMessageEnvelope<T = any> {
id: string;
type: string;
correlationId?: string;
timestamp: number;
payload: T;
headers?: Record<string, any>;
}
/**
* Transport configuration options
*/
export interface IIpcTransportOptions {
/** Unique identifier for this transport */
id: string;
/** Socket path for Unix domain sockets or pipe name for Windows */
socketPath?: string;
/** TCP host for network transport */
host?: string;
/** TCP port for network transport */
port?: number;
/** Enable message encryption */
encryption?: boolean;
/** Authentication token */
authToken?: string;
/** Socket timeout in ms */
timeout?: number;
/** Enable TCP no delay (Nagle's algorithm) */
noDelay?: boolean;
/** Maximum message size in bytes (default: 8MB) */
maxMessageSize?: number;
}
/**
* Connection state events
*/
export interface IIpcTransportEvents {
connect: () => void;
disconnect: (reason?: string) => void;
error: (error: Error) => void;
message: (message: IIpcMessageEnvelope) => void;
drain: () => void;
}
/**
* Abstract base class for IPC transports
*/
export abstract class IpcTransport extends plugins.EventEmitter {
protected options: IIpcTransportOptions;
protected connected: boolean = false;
protected messageBuffer: Buffer = Buffer.alloc(0);
protected currentMessageLength: number | null = null;
constructor(options: IIpcTransportOptions) {
super();
this.options = options;
}
/**
* Connect the transport
*/
abstract connect(): Promise<void>;
/**
* Disconnect the transport
*/
abstract disconnect(): Promise<void>;
/**
* Send a message through the transport
*/
abstract send(message: IIpcMessageEnvelope): Promise<boolean>;
/**
* Check if transport is connected
*/
public isConnected(): boolean {
return this.connected;
}
/**
* Parse incoming data with length-prefixed framing
*/
protected parseIncomingData(data: Buffer): void {
// Append new data to buffer
this.messageBuffer = Buffer.concat([this.messageBuffer, data]);
while (this.messageBuffer.length > 0) {
// If we don't have a message length yet, try to read it
if (this.currentMessageLength === null) {
if (this.messageBuffer.length >= 4) {
// Read the length prefix (4 bytes, big endian)
this.currentMessageLength = this.messageBuffer.readUInt32BE(0);
// Check max message size
const maxSize = this.options.maxMessageSize || 8 * 1024 * 1024; // 8MB default
if (this.currentMessageLength > maxSize) {
this.emit('error', new Error(`Message size ${this.currentMessageLength} exceeds maximum ${maxSize}`));
// Reset state to recover
this.messageBuffer = Buffer.alloc(0);
this.currentMessageLength = null;
return;
}
this.messageBuffer = this.messageBuffer.slice(4);
} else {
// Not enough data for length prefix
break;
}
}
// If we have a message length, try to read the message
if (this.currentMessageLength !== null) {
if (this.messageBuffer.length >= this.currentMessageLength) {
// Extract the message
const messageData = this.messageBuffer.slice(0, this.currentMessageLength);
this.messageBuffer = this.messageBuffer.slice(this.currentMessageLength);
this.currentMessageLength = null;
// Parse and emit the message
try {
const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope;
this.emit('message', message);
} catch (error: any) {
this.emit('error', new Error(`Failed to parse message: ${error.message}`));
}
} else {
// Not enough data for the complete message
break;
}
}
}
}
/**
* Frame a message with length prefix
*/
protected frameMessage(message: IIpcMessageEnvelope): Buffer {
const messageStr = JSON.stringify(message);
const messageBuffer = Buffer.from(messageStr, 'utf8');
const lengthBuffer = Buffer.allocUnsafe(4);
lengthBuffer.writeUInt32BE(messageBuffer.length, 0);
return Buffer.concat([lengthBuffer, messageBuffer]);
}
/**
* Handle socket errors
*/
protected handleError(error: Error): void {
this.emit('error', error);
this.connected = false;
this.emit('disconnect', error.message);
}
}
/**
* Unix domain socket transport for Linux/Mac
*/
export class UnixSocketTransport extends IpcTransport {
private socket: plugins.net.Socket | null = null;
private server: plugins.net.Server | null = null;
private clients: Set<plugins.net.Socket> = new Set();
/**
* Connect as client or start as server
*/
public async connect(): Promise<void> {
return new Promise((resolve, reject) => {
const socketPath = this.getSocketPath();
// Try to connect as client first
this.socket = new plugins.net.Socket();
if (this.options.noDelay !== false) {
this.socket.setNoDelay(true);
}
this.socket.on('connect', () => {
this.connected = true;
this.setupSocketHandlers(this.socket!);
this.emit('connect');
resolve();
});
this.socket.on('error', (error: any) => {
if (error.code === 'ECONNREFUSED' || error.code === 'ENOENT') {
// No server exists, we should become the server
this.socket = null;
this.startServer(socketPath).then(resolve).catch(reject);
} else {
reject(error);
}
});
this.socket.connect(socketPath);
});
}
/**
* Start as server
*/
private async startServer(socketPath: string): Promise<void> {
return new Promise((resolve, reject) => {
// Clean up stale socket file if it exists
try {
plugins.fs.unlinkSync(socketPath);
} catch (error) {
// File doesn't exist, that's fine
}
this.server = plugins.net.createServer((socket) => {
// Each new connection gets added to clients
this.clients.add(socket);
if (this.options.noDelay !== false) {
socket.setNoDelay(true);
}
// Set up handlers for this client socket
socket.on('data', (data) => {
// Parse incoming data and emit with socket reference
this.parseIncomingDataFromClient(data, socket);
});
socket.on('error', (error) => {
this.emit('clientError', error, socket);
});
socket.on('close', () => {
this.clients.delete(socket);
this.emit('clientDisconnected', socket);
});
socket.on('drain', () => {
this.emit('drain');
});
// Emit new client connection
this.emit('clientConnected', socket);
});
this.server.on('error', reject);
this.server.listen(socketPath, () => {
this.connected = true;
this.emit('connect');
resolve();
});
});
}
/**
* Parse incoming data from a specific client socket
*/
private parseIncomingDataFromClient(data: Buffer, socket: plugins.net.Socket): void {
// We need to maintain separate buffers per client
// For now, just emit the raw message with the socket reference
const socketBuffers = this.clientBuffers || (this.clientBuffers = new WeakMap());
let buffer = socketBuffers.get(socket) || Buffer.alloc(0);
let currentLength = this.clientLengths?.get(socket) || null;
// Append new data to buffer
buffer = Buffer.concat([buffer, data]);
while (buffer.length > 0) {
// If we don't have a message length yet, try to read it
if (currentLength === null) {
if (buffer.length >= 4) {
// Read the length prefix (4 bytes, big endian)
currentLength = buffer.readUInt32BE(0);
buffer = buffer.slice(4);
} else {
// Not enough data for length prefix
break;
}
}
// If we have a message length, try to read the message
if (currentLength !== null) {
if (buffer.length >= currentLength) {
// Extract the message
const messageData = buffer.slice(0, currentLength);
buffer = buffer.slice(currentLength);
currentLength = null;
// Parse and emit the message with socket reference
try {
const message = JSON.parse(messageData.toString('utf8')) as IIpcMessageEnvelope;
this.emit('clientMessage', message, socket);
} catch (error: any) {
this.emit('error', new Error(`Failed to parse message: ${error.message}`));
}
} else {
// Not enough data for the complete message
break;
}
}
}
// Store the buffer and length for next time
socketBuffers.set(socket, buffer);
if (this.clientLengths) {
if (currentLength !== null) {
this.clientLengths.set(socket, currentLength);
} else {
this.clientLengths.delete(socket);
}
} else {
this.clientLengths = new WeakMap();
if (currentLength !== null) {
this.clientLengths.set(socket, currentLength);
}
}
}
private clientBuffers?: WeakMap<plugins.net.Socket, Buffer>;
private clientLengths?: WeakMap<plugins.net.Socket, number | null>;
/**
* Setup socket event handlers
*/
private setupSocketHandlers(socket: plugins.net.Socket): void {
socket.on('data', (data) => {
this.parseIncomingData(data);
});
socket.on('error', (error) => {
this.handleError(error);
});
socket.on('close', () => {
this.connected = false;
this.emit('disconnect');
});
socket.on('drain', () => {
this.emit('drain');
});
}
/**
* Disconnect the transport
*/
public async disconnect(): Promise<void> {
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
if (this.server) {
for (const client of this.clients) {
client.destroy();
}
this.clients.clear();
await new Promise<void>((resolve) => {
this.server!.close(() => resolve());
});
this.server = null;
// Clean up socket file
try {
plugins.fs.unlinkSync(this.getSocketPath());
} catch (error) {
// Ignore cleanup errors
}
}
this.connected = false;
this.emit('disconnect');
}
/**
* Send a message
*/
public async send(message: IIpcMessageEnvelope): Promise<boolean> {
const frame = this.frameMessage(message);
if (this.socket) {
// Client mode
return new Promise((resolve) => {
const success = this.socket!.write(frame, (error) => {
if (error) {
this.handleError(error);
resolve(false);
} else {
resolve(true);
}
});
// Handle backpressure
if (!success) {
this.socket!.once('drain', () => resolve(true));
}
});
} else if (this.server && this.clients.size > 0) {
// Server mode - broadcast to all clients
const promises: Promise<boolean>[] = [];
for (const client of this.clients) {
promises.push(new Promise((resolve) => {
const success = client.write(frame, (error) => {
if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
client.once('drain', () => resolve(true));
}
}));
}
const results = await Promise.all(promises);
return results.every(r => r);
}
return false;
}
/**
* Get the socket path
*/
private getSocketPath(): string {
if (this.options.socketPath) {
return this.options.socketPath;
}
const platform = plugins.os.platform();
const tmpDir = plugins.os.tmpdir();
const socketName = `smartipc-${this.options.id}.sock`;
if (platform === 'win32') {
// Windows named pipe path
return `\\\\.\\pipe\\${socketName}`;
} else {
// Unix domain socket path
return plugins.path.join(tmpDir, socketName);
}
}
}
/**
* Named pipe transport for Windows
*/
export class NamedPipeTransport extends UnixSocketTransport {
// Named pipes on Windows use the same net module interface
// The main difference is the path format, which is handled in getSocketPath()
// Additional Windows-specific handling can be added here if needed
}
/**
* TCP transport for network IPC
*/
export class TcpTransport extends IpcTransport {
private socket: plugins.net.Socket | null = null;
private server: plugins.net.Server | null = null;
private clients: Set<plugins.net.Socket> = new Set();
/**
* Connect as client or start as server
*/
public async connect(): Promise<void> {
return new Promise((resolve, reject) => {
const host = this.options.host || 'localhost';
const port = this.options.port || 8765;
// Try to connect as client first
this.socket = new plugins.net.Socket();
if (this.options.noDelay !== false) {
this.socket.setNoDelay(true);
}
if (this.options.timeout) {
this.socket.setTimeout(this.options.timeout);
}
this.socket.on('connect', () => {
this.connected = true;
this.setupSocketHandlers(this.socket!);
this.emit('connect');
resolve();
});
this.socket.on('error', (error: any) => {
if (error.code === 'ECONNREFUSED') {
// No server exists, we should become the server
this.socket = null;
this.startServer(host, port).then(resolve).catch(reject);
} else {
reject(error);
}
});
this.socket.connect(port, host);
});
}
/**
* Start as server
*/
private async startServer(host: string, port: number): Promise<void> {
return new Promise((resolve, reject) => {
this.server = plugins.net.createServer((socket) => {
this.clients.add(socket);
if (this.options.noDelay !== false) {
socket.setNoDelay(true);
}
if (this.options.timeout) {
socket.setTimeout(this.options.timeout);
}
this.setupSocketHandlers(socket);
socket.on('close', () => {
this.clients.delete(socket);
});
});
this.server.on('error', reject);
this.server.listen(port, host, () => {
this.connected = true;
this.emit('connect');
resolve();
});
});
}
/**
* Setup socket event handlers
*/
private setupSocketHandlers(socket: plugins.net.Socket): void {
socket.on('data', (data) => {
this.parseIncomingData(data);
});
socket.on('error', (error) => {
this.handleError(error);
});
socket.on('close', () => {
this.connected = false;
this.emit('disconnect');
});
socket.on('timeout', () => {
this.handleError(new Error('Socket timeout'));
socket.destroy();
});
socket.on('drain', () => {
this.emit('drain');
});
}
/**
* Disconnect the transport
*/
public async disconnect(): Promise<void> {
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
if (this.server) {
for (const client of this.clients) {
client.destroy();
}
this.clients.clear();
await new Promise<void>((resolve) => {
this.server!.close(() => resolve());
});
this.server = null;
}
this.connected = false;
this.emit('disconnect');
}
/**
* Send a message
*/
public async send(message: IIpcMessageEnvelope): Promise<boolean> {
const frame = this.frameMessage(message);
if (this.socket) {
// Client mode
return new Promise((resolve) => {
const success = this.socket!.write(frame, (error) => {
if (error) {
this.handleError(error);
resolve(false);
} else {
resolve(true);
}
});
// Handle backpressure
if (!success) {
this.socket!.once('drain', () => resolve(true));
}
});
} else if (this.server && this.clients.size > 0) {
// Server mode - broadcast to all clients
const promises: Promise<boolean>[] = [];
for (const client of this.clients) {
promises.push(new Promise((resolve) => {
const success = client.write(frame, (error) => {
if (error) {
resolve(false);
} else {
resolve(true);
}
});
if (!success) {
client.once('drain', () => resolve(true));
}
}));
}
const results = await Promise.all(promises);
return results.every(r => r);
}
return false;
}
}
/**
* Factory function to create appropriate transport based on platform and options
*/
export function createTransport(options: IIpcTransportOptions): IpcTransport {
// If TCP is explicitly requested
if (options.host || options.port) {
return new TcpTransport(options);
}
// Platform-specific default transport
const platform = plugins.os.platform();
if (platform === 'win32') {
return new NamedPipeTransport(options);
} else {
return new UnixSocketTransport(options);
}
}

View File

@@ -1,103 +1,40 @@
import * as plugins from './smartipc.plugins.js';
import { EventEmitter } from 'events';
export * from './classes.transports.js';
export * from './classes.ipcchannel.js';
export * from './classes.ipcserver.js';
export * from './classes.ipcclient.js';
export interface ISmartIpcConstructorOptions {
type: 'server' | 'client';
/**
* the name of the message string
*/
ipcSpace: string;
}
export interface ISmartIpcHandlerPackage {
keyword: string;
handlerFunc: (dataArg: string) => void;
}
import { IpcServer } from './classes.ipcserver.js';
import { IpcClient } from './classes.ipcclient.js';
import { IpcChannel } from './classes.ipcchannel.js';
import type { IIpcServerOptions } from './classes.ipcserver.js';
import type { IIpcClientOptions } from './classes.ipcclient.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Main SmartIpc class - Factory for creating IPC servers, clients, and channels
*/
export class SmartIpc {
public ipc = new plugins.nodeIpc.IPC();
public handlers: ISmartIpcHandlerPackage[] = [];
public options: ISmartIpcConstructorOptions;
constructor(optionsArg: ISmartIpcConstructorOptions) {
this.options = optionsArg;
/**
* Create an IPC server
*/
public static createServer(options: IIpcServerOptions): IpcServer {
return new IpcServer(options);
}
/**
* connect to the channel
* Create an IPC client
*/
public async start() {
const done = plugins.smartpromise.defer();
let ipcEventEmitter;
switch (this.options.type) {
case 'server':
this.ipc.config.id = this.options.ipcSpace;
this.ipc.serve(() => {
ipcEventEmitter = this.ipc.server;
done.resolve();
});
this.ipc.server.start();
await plugins.smartdelay.delayFor(1000);
await done.promise;
break;
case 'client':
this.ipc.connectTo(this.options.ipcSpace, () => {
ipcEventEmitter = this.ipc.of[this.options.ipcSpace];
done.resolve();
});
await done.promise;
break;
default:
throw new Error(
'type of ipc is not valid. Must be "server" or "client"',
);
}
for (const handler of this.handlers) {
ipcEventEmitter.on(handler.keyword, (dataArg) => {
handler.handlerFunc(dataArg);
});
}
public static createClient(options: IIpcClientOptions): IpcClient {
return new IpcClient(options);
}
/**
* should stop the server
* Create a raw IPC channel (for advanced use cases)
*/
public async stop() {
switch (this.options.type) {
case 'server':
this.ipc.server.stop();
break;
case 'client':
break;
}
}
/**
* regsiters a handler
*/
public registerHandler(handlerPackage: ISmartIpcHandlerPackage) {
this.handlers.push(handlerPackage);
}
/**
* sends a message
* @param payloadArg
*/
public sendMessage(messageIdentifierArg: string, payloadArg: string | any) {
let payload: string = null;
if (typeof payloadArg === 'string') {
payload = payloadArg;
} else {
payload = JSON.stringify(payloadArg);
}
switch (this.options.type) {
case 'server':
this.ipc.server.emit(messageIdentifierArg, payload);
break;
case 'client':
this.ipc.of[this.options.ipcSpace].emit(messageIdentifierArg, payload);
}
public static createChannel(options: IIpcChannelOptions): IpcChannel {
return new IpcChannel(options);
}
}
// Export the main class as default
export default SmartIpc;

View File

@@ -5,7 +5,12 @@ import * as smartrx from '@push.rocks/smartrx';
export { smartdelay, smartpromise, smartrx };
// third party scope
import * as nodeIpc from 'node-ipc';
// node built-in modules
import * as net from 'net';
import * as os from 'os';
import * as path from 'path';
import * as fs from 'fs';
import * as crypto from 'crypto';
import { EventEmitter } from 'events';
export { nodeIpc };
export { net, os, path, fs, crypto, EventEmitter };