Files
smartipc/ts/classes.ipcclient.ts

365 lines
10 KiB
TypeScript
Raw Normal View History

import * as plugins from './smartipc.plugins.js';
import { IpcChannel } from './classes.ipcchannel.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Options for IPC Client
*/
export interface IConnectRetryConfig {
/** Enable connection retry */
enabled: boolean;
/** Initial delay before first retry in ms */
initialDelay?: number;
/** Maximum delay between retries in ms */
maxDelay?: number;
/** Maximum number of attempts */
maxAttempts?: number;
/** Total timeout for all retry attempts in ms */
totalTimeout?: number;
}
export interface IClientConnectOptions {
/** Wait for server to be ready before attempting connection */
waitForReady?: boolean;
/** Timeout for waiting for server readiness in ms */
waitTimeout?: number;
}
export interface IIpcClientOptions extends IIpcChannelOptions {
/** Client identifier */
clientId?: string;
/** Client metadata */
metadata?: Record<string, any>;
/** Connection retry configuration */
connectRetry?: IConnectRetryConfig;
/** Registration timeout in ms (default: 5000) */
registerTimeoutMs?: number;
}
/**
* IPC Client for connecting to an IPC server
*/
export class IpcClient extends plugins.EventEmitter {
private options: IIpcClientOptions;
private channel: IpcChannel;
private messageHandlers = new Map<string, (payload: any) => any | Promise<any>>();
private isConnected = false;
private clientId: string;
private didRegisterOnce = false;
constructor(options: IIpcClientOptions) {
super();
this.options = options;
this.clientId = options.clientId || plugins.crypto.randomUUID();
// Create the channel
this.channel = new IpcChannel(this.options);
this.setupChannelHandlers();
}
/**
* Connect to the server
*/
public async connect(connectOptions: IClientConnectOptions = {}): Promise<void> {
if (this.isConnected) {
return;
}
// Helper function to attempt registration
const attemptRegistration = async (): Promise<void> => {
await this.attemptRegistrationInternal();
};
// Helper function to attempt connection with retry
const attemptConnection = async (): Promise<void> => {
const retryConfig = this.options.connectRetry;
const maxAttempts = retryConfig?.maxAttempts || 1;
const initialDelay = retryConfig?.initialDelay || 100;
const maxDelay = retryConfig?.maxDelay || 1500;
const totalTimeout = retryConfig?.totalTimeout || 15000;
const startTime = Date.now();
let lastError: Error | undefined;
let delay = initialDelay;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
// Check total timeout
if (totalTimeout && Date.now() - startTime > totalTimeout) {
throw new Error(`Connection timeout after ${totalTimeout}ms: ${lastError?.message || 'Unknown error'}`);
}
try {
// Connect the channel
await this.channel.connect();
// Attempt registration
await attemptRegistration();
return; // Success!
} catch (error) {
lastError = error as Error;
// Disconnect channel for retry
await this.channel.disconnect().catch(() => {});
// If this isn't the last attempt and retry is enabled, wait before retrying
if (attempt < maxAttempts && retryConfig?.enabled) {
// Check if we have time for another attempt
if (totalTimeout && Date.now() - startTime + delay > totalTimeout) {
break; // Will timeout, don't wait
}
await new Promise(resolve => setTimeout(resolve, delay));
// Exponential backoff with max limit
delay = Math.min(delay * 2, maxDelay);
}
}
}
// All attempts failed
throw lastError || new Error('Failed to connect to server');
};
// If waitForReady is specified, wait for server socket to exist first
if (connectOptions.waitForReady) {
const waitTimeout = connectOptions.waitTimeout || 10000;
const startTime = Date.now();
while (Date.now() - startTime < waitTimeout) {
try {
// Try to connect
await attemptConnection();
return; // Success!
} catch (error) {
// If it's a connection refused error, server might not be ready yet
if ((error as any).message?.includes('ECONNREFUSED') ||
(error as any).message?.includes('ENOENT')) {
await new Promise(resolve => setTimeout(resolve, 100));
continue;
}
// Other errors should be thrown
throw error;
}
}
throw new Error(`Server not ready after ${waitTimeout}ms`);
} else {
// Normal connection attempt
await attemptConnection();
}
}
/**
* Attempt to register this client over the current channel connection.
* Sets connection flags and emits 'connect' on success.
*/
private async attemptRegistrationInternal(): Promise<void> {
const registerTimeoutMs = this.options.registerTimeoutMs || 5000;
try {
const response = await this.channel.request<any, any>(
'__register__',
{
clientId: this.clientId,
metadata: this.options.metadata
},
{
timeout: registerTimeoutMs,
headers: { clientId: this.clientId }
}
);
if (!response.success) {
throw new Error(response.error || 'Registration failed');
}
this.isConnected = true;
this.didRegisterOnce = true;
this.emit('connect');
} catch (error: any) {
throw new Error(`Failed to register with server: ${error.message}`);
}
}
/**
* Disconnect from the server
*/
public async disconnect(): Promise<void> {
if (!this.isConnected) {
return;
}
this.isConnected = false;
await this.channel.disconnect();
this.emit('disconnect');
}
/**
* Setup channel event handlers
*/
private setupChannelHandlers(): void {
// Forward channel events
this.channel.on('connect', async () => {
// On reconnects, re-register automatically when we had connected before
if (this.didRegisterOnce && !this.isConnected) {
try {
await this.attemptRegistrationInternal();
} catch (error) {
this.emit('error', error);
}
}
// For initial connect(), registration is handled explicitly there
});
this.channel.on('disconnect', (reason) => {
this.isConnected = false;
this.emit('disconnect', reason);
});
this.channel.on('error', (error: any) => {
// If heartbeat timeout and configured not to throw, convert to heartbeatTimeout event
if (error && error.message === 'Heartbeat timeout' && this.options.heartbeatThrowOnTimeout === false) {
this.emit('heartbeatTimeout', error);
return;
}
this.emit('error', error);
});
this.channel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error);
});
this.channel.on('reconnecting', (info) => {
this.emit('reconnecting', info);
});
// Handle messages
this.channel.on('message', (message) => {
// Check if we have a handler for this message type
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
// If message expects a response
if (message.headers?.requiresResponse && message.id) {
Promise.resolve()
.then(() => handler(message.payload))
.then((result) => {
return this.channel.sendMessage(
`${message.type}_response`,
result,
{ correlationId: message.id }
);
})
.catch((error) => {
return this.channel.sendMessage(
`${message.type}_response`,
null,
{ correlationId: message.id, error: error.message }
);
});
} else {
// Fire and forget
handler(message.payload);
}
} else {
// Emit unhandled message
this.emit('message', message);
}
});
}
/**
* Register a message handler
*/
public onMessage(type: string, handler: (payload: any) => any | Promise<any>): void {
this.messageHandlers.set(type, handler);
}
/**
* Send a message to the server
*/
public async sendMessage(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
if (!this.isConnected) {
throw new Error('Client is not connected');
}
// Always include clientId in headers
await this.channel.sendMessage(type, payload, {
...headers,
clientId: this.clientId
});
}
/**
* Send a request to the server and wait for response
*/
public async request<TReq = any, TRes = any>(
type: string,
payload: TReq,
options?: { timeout?: number; headers?: Record<string, any> }
): Promise<TRes> {
if (!this.isConnected) {
throw new Error('Client is not connected');
}
// Always include clientId in headers
return this.channel.request<TReq, TRes>(type, payload, {
...options,
headers: {
...options?.headers,
clientId: this.clientId
}
});
}
/**
* Subscribe to a topic (pub/sub pattern)
*/
public async subscribe(topic: string, handler: (payload: any) => void): Promise<void> {
// Register local handler
this.messageHandlers.set(`topic:${topic}`, handler);
// Notify server about subscription
await this.sendMessage('__subscribe__', { topic });
}
/**
* Unsubscribe from a topic
*/
public async unsubscribe(topic: string): Promise<void> {
// Remove local handler
this.messageHandlers.delete(`topic:${topic}`);
// Notify server about unsubscription
await this.sendMessage('__unsubscribe__', { topic });
}
/**
* Publish to a topic
*/
public async publish(topic: string, payload: any): Promise<void> {
await this.sendMessage('__publish__', { topic, payload });
}
/**
* Get client ID
*/
public getClientId(): string {
return this.clientId;
}
/**
* Check if client is connected
*/
public getIsConnected(): boolean {
return this.isConnected && this.channel.isConnected();
}
/**
* Get client statistics
*/
public getStats(): any {
return this.channel.getStats();
}
}