494 lines
14 KiB
TypeScript
494 lines
14 KiB
TypeScript
import * as plugins from './smartipc.plugins.js';
|
|
import { IpcTransport, createTransport } from './classes.transports.js';
|
|
import type { IIpcMessageEnvelope, IIpcTransportOptions } from './classes.transports.js';
|
|
|
|
/**
|
|
* Options for IPC channel
|
|
*/
|
|
export interface IIpcChannelOptions extends IIpcTransportOptions {
|
|
/** Enable automatic reconnection */
|
|
autoReconnect?: boolean;
|
|
/** Initial reconnect delay in ms */
|
|
reconnectDelay?: number;
|
|
/** Maximum reconnect delay in ms */
|
|
maxReconnectDelay?: number;
|
|
/** Reconnect delay multiplier */
|
|
reconnectMultiplier?: number;
|
|
/** Maximum number of reconnect attempts */
|
|
maxReconnectAttempts?: number;
|
|
/** Enable heartbeat */
|
|
heartbeat?: boolean;
|
|
/** Heartbeat interval in ms */
|
|
heartbeatInterval?: number;
|
|
/** Heartbeat timeout in ms */
|
|
heartbeatTimeout?: number;
|
|
/** Initial grace period before heartbeat timeout in ms */
|
|
heartbeatInitialGracePeriodMs?: number;
|
|
/** Throw on heartbeat timeout (default: true, set false to emit event instead) */
|
|
heartbeatThrowOnTimeout?: boolean;
|
|
}
|
|
|
|
/**
|
|
* Request/Response tracking
|
|
*/
|
|
interface IPendingRequest<T = any> {
|
|
resolve: (value: T) => void;
|
|
reject: (error: Error) => void;
|
|
timer?: NodeJS.Timeout;
|
|
}
|
|
|
|
/**
|
|
* IPC Channel with connection management, auto-reconnect, and typed messaging
|
|
*/
|
|
export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEmitter {
|
|
private transport: IpcTransport;
|
|
private options: IIpcChannelOptions;
|
|
private pendingRequests = new Map<string, IPendingRequest>();
|
|
private messageHandlers = new Map<string, (payload: any) => any | Promise<any>>();
|
|
private reconnectAttempts = 0;
|
|
private reconnectTimer?: NodeJS.Timeout;
|
|
private heartbeatTimer?: NodeJS.Timeout;
|
|
private heartbeatCheckTimer?: NodeJS.Timeout;
|
|
private lastHeartbeat: number = Date.now();
|
|
private connectionStartTime: number = Date.now();
|
|
private isReconnecting = false;
|
|
private isClosing = false;
|
|
|
|
// Metrics
|
|
private metrics = {
|
|
messagesSent: 0,
|
|
messagesReceived: 0,
|
|
bytesSent: 0,
|
|
bytesReceived: 0,
|
|
reconnects: 0,
|
|
heartbeatTimeouts: 0,
|
|
errors: 0,
|
|
requestTimeouts: 0,
|
|
connectedAt: 0
|
|
};
|
|
|
|
constructor(options: IIpcChannelOptions) {
|
|
super();
|
|
this.options = {
|
|
autoReconnect: true,
|
|
reconnectDelay: 1000,
|
|
maxReconnectDelay: 30000,
|
|
reconnectMultiplier: 1.5,
|
|
maxReconnectAttempts: Infinity,
|
|
heartbeat: true,
|
|
heartbeatInterval: 5000,
|
|
heartbeatTimeout: 10000,
|
|
...options
|
|
};
|
|
|
|
this.transport = createTransport(this.options);
|
|
this.setupTransportHandlers();
|
|
}
|
|
|
|
/**
|
|
* Setup transport event handlers
|
|
*/
|
|
private setupTransportHandlers(): void {
|
|
this.transport.on('connect', () => {
|
|
this.reconnectAttempts = 0;
|
|
this.isReconnecting = false;
|
|
this.metrics.connectedAt = Date.now();
|
|
this.startHeartbeat();
|
|
this.emit('connect');
|
|
});
|
|
|
|
this.transport.on('disconnect', (reason) => {
|
|
this.stopHeartbeat();
|
|
this.clearPendingRequests(new Error(`Disconnected: ${reason || 'Unknown reason'}`));
|
|
this.emit('disconnect', reason);
|
|
|
|
if (this.options.autoReconnect && !this.isClosing) {
|
|
this.scheduleReconnect();
|
|
}
|
|
});
|
|
|
|
this.transport.on('error', (error) => {
|
|
this.emit('error', error);
|
|
});
|
|
|
|
this.transport.on('message', (message: IIpcMessageEnvelope) => {
|
|
this.handleMessage(message);
|
|
});
|
|
|
|
this.transport.on('drain', () => {
|
|
this.emit('drain');
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Connect the channel
|
|
*/
|
|
public async connect(): Promise<void> {
|
|
if (this.transport.isConnected()) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await this.transport.connect();
|
|
} catch (error) {
|
|
this.emit('error', error);
|
|
if (this.options.autoReconnect && !this.isClosing) {
|
|
this.scheduleReconnect();
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Disconnect the channel
|
|
*/
|
|
public async disconnect(): Promise<void> {
|
|
this.isClosing = true;
|
|
this.stopHeartbeat();
|
|
this.cancelReconnect();
|
|
this.clearPendingRequests(new Error('Channel closed'));
|
|
await this.transport.disconnect();
|
|
}
|
|
|
|
/**
|
|
* Schedule a reconnection attempt
|
|
*/
|
|
private scheduleReconnect(): void {
|
|
if (this.isReconnecting || this.isClosing) {
|
|
return;
|
|
}
|
|
|
|
if (this.options.maxReconnectAttempts !== Infinity &&
|
|
this.reconnectAttempts >= this.options.maxReconnectAttempts) {
|
|
this.emit('error', new Error('Maximum reconnection attempts reached'));
|
|
return;
|
|
}
|
|
|
|
this.isReconnecting = true;
|
|
this.reconnectAttempts++;
|
|
|
|
// Calculate delay with exponential backoff and jitter
|
|
const baseDelay = Math.min(
|
|
this.options.reconnectDelay! * Math.pow(this.options.reconnectMultiplier!, this.reconnectAttempts - 1),
|
|
this.options.maxReconnectDelay!
|
|
);
|
|
const jitter = Math.random() * 0.1 * baseDelay; // 10% jitter
|
|
const delay = baseDelay + jitter;
|
|
|
|
this.emit('reconnecting', { attempt: this.reconnectAttempts, delay });
|
|
|
|
this.reconnectTimer = setTimeout(async () => {
|
|
try {
|
|
await this.transport.connect();
|
|
} catch (error) {
|
|
// Connection failed, will be rescheduled by disconnect handler
|
|
}
|
|
}, delay);
|
|
}
|
|
|
|
/**
|
|
* Cancel scheduled reconnection
|
|
*/
|
|
private cancelReconnect(): void {
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = undefined;
|
|
}
|
|
this.isReconnecting = false;
|
|
}
|
|
|
|
/**
|
|
* Start heartbeat mechanism
|
|
*/
|
|
private startHeartbeat(): void {
|
|
if (!this.options.heartbeat) {
|
|
return;
|
|
}
|
|
|
|
this.stopHeartbeat();
|
|
this.lastHeartbeat = Date.now();
|
|
this.connectionStartTime = Date.now();
|
|
|
|
// Send heartbeat messages
|
|
this.heartbeatTimer = setInterval(() => {
|
|
this.sendMessage('__heartbeat__', { timestamp: Date.now() }).catch(() => {
|
|
// Ignore heartbeat send errors
|
|
});
|
|
}, this.options.heartbeatInterval!);
|
|
|
|
// Check for heartbeat timeout
|
|
this.heartbeatCheckTimer = setInterval(() => {
|
|
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
|
|
const timeSinceConnection = Date.now() - this.connectionStartTime;
|
|
const gracePeriod = this.options.heartbeatInitialGracePeriodMs || 0;
|
|
|
|
// Skip timeout check during initial grace period
|
|
if (timeSinceConnection < gracePeriod) {
|
|
return;
|
|
}
|
|
|
|
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout!) {
|
|
const error = new Error('Heartbeat timeout');
|
|
|
|
if (this.options.heartbeatThrowOnTimeout !== false) {
|
|
// Default behavior: emit error which may cause disconnect
|
|
this.emit('error', error);
|
|
this.transport.disconnect().catch(() => {});
|
|
} else {
|
|
// Emit heartbeatTimeout event instead of error
|
|
this.emit('heartbeatTimeout', error);
|
|
}
|
|
}
|
|
}, this.options.heartbeatTimeout! / 2);
|
|
}
|
|
|
|
/**
|
|
* Stop heartbeat mechanism
|
|
*/
|
|
private stopHeartbeat(): void {
|
|
if (this.heartbeatTimer) {
|
|
clearInterval(this.heartbeatTimer);
|
|
this.heartbeatTimer = undefined;
|
|
}
|
|
|
|
if (this.heartbeatCheckTimer) {
|
|
clearInterval(this.heartbeatCheckTimer);
|
|
this.heartbeatCheckTimer = undefined;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle incoming messages
|
|
*/
|
|
private handleMessage(message: IIpcMessageEnvelope): void {
|
|
// Track metrics
|
|
this.metrics.messagesReceived++;
|
|
this.metrics.bytesReceived += JSON.stringify(message).length;
|
|
|
|
// Handle heartbeat and send response
|
|
if (message.type === '__heartbeat__') {
|
|
this.lastHeartbeat = Date.now();
|
|
// Reply so the sender also observes liveness
|
|
this.transport.send({
|
|
id: plugins.crypto.randomUUID(),
|
|
type: '__heartbeat_response__',
|
|
correlationId: message.id,
|
|
timestamp: Date.now(),
|
|
payload: { timestamp: Date.now() },
|
|
headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined
|
|
}).catch(() => {});
|
|
return;
|
|
}
|
|
|
|
// Handle heartbeat response
|
|
if (message.type === '__heartbeat_response__') {
|
|
this.lastHeartbeat = Date.now();
|
|
return;
|
|
}
|
|
|
|
// Handle request/response
|
|
if (message.correlationId && this.pendingRequests.has(message.correlationId)) {
|
|
const pending = this.pendingRequests.get(message.correlationId)!;
|
|
this.pendingRequests.delete(message.correlationId);
|
|
|
|
if (pending.timer) {
|
|
clearTimeout(pending.timer);
|
|
}
|
|
|
|
if (message.headers?.error) {
|
|
pending.reject(new Error(message.headers.error));
|
|
} else {
|
|
pending.resolve(message.payload);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Handle regular messages
|
|
if (this.messageHandlers.has(message.type)) {
|
|
const handler = this.messageHandlers.get(message.type)!;
|
|
|
|
// If message expects a response
|
|
if (message.headers?.requiresResponse && message.id) {
|
|
Promise.resolve()
|
|
.then(() => handler(message.payload))
|
|
.then((result) => {
|
|
const response: IIpcMessageEnvelope = {
|
|
id: plugins.crypto.randomUUID(),
|
|
type: `${message.type}_response`,
|
|
correlationId: message.id,
|
|
timestamp: Date.now(),
|
|
payload: result,
|
|
headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined
|
|
};
|
|
return this.transport.send(response);
|
|
})
|
|
.catch((error: any) => {
|
|
const response: IIpcMessageEnvelope = {
|
|
id: plugins.crypto.randomUUID(),
|
|
type: `${message.type}_response`,
|
|
correlationId: message.id,
|
|
timestamp: Date.now(),
|
|
payload: null,
|
|
headers: {
|
|
error: error.message,
|
|
...(message.headers?.clientId ? { clientId: message.headers.clientId } : {})
|
|
}
|
|
};
|
|
return this.transport.send(response);
|
|
});
|
|
} else {
|
|
// Fire and forget
|
|
try {
|
|
handler(message.payload);
|
|
} catch (error) {
|
|
this.emit('error', error);
|
|
}
|
|
}
|
|
} else {
|
|
// Emit unhandled message
|
|
this.emit('message', message);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send a message without expecting a response
|
|
*/
|
|
public async sendMessage(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
|
|
// Extract correlationId from headers and place it at top level
|
|
const { correlationId, ...restHeaders } = headers ?? {};
|
|
const message: IIpcMessageEnvelope = {
|
|
id: plugins.crypto.randomUUID(),
|
|
type,
|
|
timestamp: Date.now(),
|
|
payload,
|
|
...(correlationId ? { correlationId } : {}),
|
|
headers: Object.keys(restHeaders).length ? restHeaders : undefined
|
|
};
|
|
|
|
const success = await this.transport.send(message);
|
|
if (!success) {
|
|
this.metrics.errors++;
|
|
throw new Error('Failed to send message');
|
|
}
|
|
|
|
// Track metrics
|
|
this.metrics.messagesSent++;
|
|
this.metrics.bytesSent += JSON.stringify(message).length;
|
|
}
|
|
|
|
/**
|
|
* Send a request and wait for response
|
|
*/
|
|
public async request<TReq = TRequest, TRes = TResponse>(
|
|
type: string,
|
|
payload: TReq,
|
|
options?: { timeout?: number; headers?: Record<string, any> }
|
|
): Promise<TRes> {
|
|
const messageId = plugins.crypto.randomUUID();
|
|
const timeout = options?.timeout || 30000;
|
|
|
|
const message: IIpcMessageEnvelope<TReq> = {
|
|
id: messageId,
|
|
type,
|
|
timestamp: Date.now(),
|
|
payload,
|
|
headers: {
|
|
...options?.headers,
|
|
requiresResponse: true
|
|
}
|
|
};
|
|
|
|
return new Promise<TRes>((resolve, reject) => {
|
|
// Setup timeout
|
|
const timer = setTimeout(() => {
|
|
this.pendingRequests.delete(messageId);
|
|
reject(new Error(`Request timeout for ${type}`));
|
|
}, timeout);
|
|
|
|
// Store pending request
|
|
this.pendingRequests.set(messageId, { resolve, reject, timer });
|
|
|
|
// Send message with better error handling
|
|
this.transport.send(message)
|
|
.then((success) => {
|
|
if (!success) {
|
|
this.pendingRequests.delete(messageId);
|
|
clearTimeout(timer);
|
|
reject(new Error('Failed to send message'));
|
|
}
|
|
})
|
|
.catch((error) => {
|
|
this.pendingRequests.delete(messageId);
|
|
clearTimeout(timer);
|
|
reject(error);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register a message handler
|
|
*/
|
|
public on(event: string, handler: (payload: any) => any | Promise<any>): this {
|
|
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain') {
|
|
// Special handling for channel events
|
|
super.on(event, handler);
|
|
} else {
|
|
// Register as message type handler
|
|
this.messageHandlers.set(event, handler);
|
|
}
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Clear all pending requests
|
|
*/
|
|
private clearPendingRequests(error: Error): void {
|
|
for (const [id, pending] of this.pendingRequests) {
|
|
if (pending.timer) {
|
|
clearTimeout(pending.timer);
|
|
}
|
|
pending.reject(error);
|
|
}
|
|
this.pendingRequests.clear();
|
|
}
|
|
|
|
/**
|
|
* Check if channel is connected
|
|
*/
|
|
public isConnected(): boolean {
|
|
return this.transport.isConnected();
|
|
}
|
|
|
|
/**
|
|
* Get channel statistics
|
|
*/
|
|
public getStats(): {
|
|
connected: boolean;
|
|
reconnectAttempts: number;
|
|
pendingRequests: number;
|
|
isReconnecting: boolean;
|
|
metrics: {
|
|
messagesSent: number;
|
|
messagesReceived: number;
|
|
bytesSent: number;
|
|
bytesReceived: number;
|
|
reconnects: number;
|
|
heartbeatTimeouts: number;
|
|
errors: number;
|
|
requestTimeouts: number;
|
|
uptime?: number;
|
|
};
|
|
} {
|
|
return {
|
|
connected: this.transport.isConnected(),
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
pendingRequests: this.pendingRequests.size,
|
|
isReconnecting: this.isReconnecting,
|
|
metrics: {
|
|
...this.metrics,
|
|
uptime: this.metrics.connectedAt ? Date.now() - this.metrics.connectedAt : undefined
|
|
}
|
|
};
|
|
}
|
|
} |