Files
smartipc/ts/classes.ipcserver.ts

555 lines
16 KiB
TypeScript

import * as plugins from './smartipc.plugins.js';
import { IpcChannel } from './classes.ipcchannel.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
/**
* Options for IPC Server
*/
export interface IServerStartOptions {
/** When to consider server ready (default: 'socket-bound') */
readyWhen?: 'socket-bound' | 'accepting';
}
export interface IIpcServerOptions extends Omit<IIpcChannelOptions, 'autoReconnect' | 'reconnectDelay' | 'maxReconnectDelay' | 'reconnectMultiplier' | 'maxReconnectAttempts'> {
/** Maximum number of client connections */
maxClients?: number;
/** Client idle timeout in ms */
clientIdleTimeout?: number;
/** Automatically cleanup stale socket file on start (default: false) */
autoCleanupSocketFile?: boolean;
/** Socket file permissions mode (e.g. 0o600) */
socketMode?: number;
}
/**
* Client connection information
*/
interface IClientConnection {
id: string;
channel: IpcChannel;
connectedAt: number;
lastActivity: number;
metadata?: Record<string, any>;
}
/**
* IPC Server for handling multiple client connections
*/
export class IpcServer extends plugins.EventEmitter {
private options: IIpcServerOptions;
private clients = new Map<string, IClientConnection>();
private messageHandlers = new Map<string, (payload: any, clientId: string) => any | Promise<any>>();
private primaryChannel?: IpcChannel;
private isRunning = false;
private isReady = false;
private clientIdleCheckTimer?: NodeJS.Timeout;
// Pub/sub tracking
private topicIndex = new Map<string, Set<string>>(); // topic -> clientIds
private clientTopics = new Map<string, Set<string>>(); // clientId -> topics
constructor(options: IIpcServerOptions) {
super();
this.options = {
maxClients: Infinity,
clientIdleTimeout: 0, // 0 means no timeout
...options
};
}
/**
* Start the server
*/
public async start(options: IServerStartOptions = {}): Promise<void> {
if (this.isRunning) {
return;
}
// Create primary channel for initial connections
this.primaryChannel = new IpcChannel({
...this.options,
autoReconnect: false // Server doesn't auto-reconnect
});
// Register the __register__ handler on the channel
this.primaryChannel.on('__register__', async (payload: { clientId: string; metadata?: Record<string, any> }) => {
const clientId = payload.clientId;
const metadata = payload.metadata;
// Check max clients
if (this.clients.size >= this.options.maxClients!) {
return { success: false, error: 'Maximum number of clients reached' };
}
// Create new client connection
const clientConnection: IClientConnection = {
id: clientId,
channel: this.primaryChannel!,
connectedAt: Date.now(),
lastActivity: Date.now(),
metadata: metadata
};
this.clients.set(clientId, clientConnection);
this.emit('clientConnect', clientId, metadata);
return { success: true, clientId: clientId };
});
// Handle other messages
this.primaryChannel.on('message', (message) => {
// Extract client ID from message headers
const clientId = message.headers?.clientId || 'unknown';
// Update last activity
if (this.clients.has(clientId)) {
this.clients.get(clientId)!.lastActivity = Date.now();
}
// Handle pub/sub messages
if (message.type === '__subscribe__') {
const topic = message.payload?.topic;
if (typeof topic === 'string' && topic.length) {
let set = this.topicIndex.get(topic);
if (!set) this.topicIndex.set(topic, (set = new Set()));
set.add(clientId);
let cset = this.clientTopics.get(clientId);
if (!cset) this.clientTopics.set(clientId, (cset = new Set()));
cset.add(topic);
}
return;
}
if (message.type === '__unsubscribe__') {
const topic = message.payload?.topic;
const set = this.topicIndex.get(topic);
if (set) {
set.delete(clientId);
if (set.size === 0) this.topicIndex.delete(topic);
}
const cset = this.clientTopics.get(clientId);
if (cset) {
cset.delete(topic);
if (cset.size === 0) this.clientTopics.delete(clientId);
}
return;
}
if (message.type === '__publish__') {
const topic = message.payload?.topic;
const payload = message.payload?.payload;
const targets = this.topicIndex.get(topic);
if (targets && targets.size) {
// Send to subscribers
const sends: Promise<void>[] = [];
for (const subClientId of targets) {
sends.push(
this.sendToClient(subClientId, `topic:${topic}`, payload)
.catch(err => {
this.emit('error', err, subClientId);
})
);
}
Promise.allSettled(sends).catch(() => {});
}
return;
}
// Forward to registered handlers
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
// If message expects a response
if (message.headers?.requiresResponse && message.id) {
Promise.resolve()
.then(() => handler(message.payload, clientId))
.then((result) => {
return this.primaryChannel!.sendMessage(
`${message.type}_response`,
result,
{ correlationId: message.id, clientId }
);
})
.catch((error) => {
return this.primaryChannel!.sendMessage(
`${message.type}_response`,
null,
{ correlationId: message.id, error: error.message, clientId }
);
});
} else {
// Fire and forget
handler(message.payload, clientId);
}
}
// Emit raw message event
this.emit('message', message, clientId);
});
// Setup primary channel handlers
this.primaryChannel.on('disconnect', () => {
// Server disconnected, clear all clients and subscriptions
for (const [clientId] of this.clients) {
this.cleanupClientSubscriptions(clientId);
}
this.clients.clear();
});
this.primaryChannel.on('error', (error) => {
this.emit('error', error, 'server');
});
this.primaryChannel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, 'server');
});
// Connect the primary channel (will start as server)
await this.primaryChannel.connect();
this.isRunning = true;
this.startClientIdleCheck();
this.emit('start');
// Handle readiness based on options
if (options.readyWhen === 'accepting') {
// Wait a bit to ensure handlers are fully set up
await new Promise(resolve => setTimeout(resolve, 10));
this.isReady = true;
this.emit('ready');
} else {
// Default: ready when socket is bound
this.isReady = true;
this.emit('ready');
}
}
/**
* Stop the server
*/
public async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
this.isRunning = false;
this.stopClientIdleCheck();
// Disconnect all clients
const disconnectPromises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
disconnectPromises.push(
client.channel.disconnect()
.then(() => {
this.emit('clientDisconnect', clientId);
})
.catch(() => {}) // Ignore disconnect errors
);
}
await Promise.all(disconnectPromises);
this.clients.clear();
// Disconnect primary channel
if (this.primaryChannel) {
await this.primaryChannel.disconnect();
this.primaryChannel = undefined;
}
this.emit('stop');
}
/**
* Setup channel event handlers
*/
private setupChannelHandlers(channel: IpcChannel, clientId: string): void {
// Handle client registration
channel.on('__register__', async (payload: { clientId: string; metadata?: Record<string, any> }) => {
if (payload.clientId && payload.clientId !== clientId) {
// New client registration
const newClientId = payload.clientId;
// Check max clients
if (this.clients.size >= this.options.maxClients!) {
throw new Error('Maximum number of clients reached');
}
// Create new client connection
const clientConnection: IClientConnection = {
id: newClientId,
channel: channel,
connectedAt: Date.now(),
lastActivity: Date.now(),
metadata: payload.metadata
};
this.clients.set(newClientId, clientConnection);
this.emit('clientConnect', newClientId, payload.metadata);
// Now messages from this channel should be associated with the new client ID
clientId = newClientId;
return { success: true, clientId: newClientId };
}
return { success: false, error: 'Invalid registration' };
});
// Handle messages - pass the correct clientId
channel.on('message', (message) => {
// Try to find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Update last activity
if (actualClientId !== 'primary' && this.clients.has(actualClientId)) {
this.clients.get(actualClientId)!.lastActivity = Date.now();
}
// Forward to registered handlers
if (this.messageHandlers.has(message.type)) {
const handler = this.messageHandlers.get(message.type)!;
handler(message.payload, actualClientId);
}
// Emit raw message event
this.emit('message', message, actualClientId);
});
// Handle disconnect
channel.on('disconnect', () => {
// Find and remove the actual client
for (const [id, client] of this.clients) {
if (client.channel === channel) {
this.clients.delete(id);
this.emit('clientDisconnect', id);
break;
}
}
});
// Handle errors
channel.on('error', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
this.emit('error', error, actualClientId);
});
channel.on('heartbeatTimeout', (error) => {
// Find the actual client ID for this channel
let actualClientId = clientId;
for (const [id, client] of this.clients) {
if (client.channel === channel) {
actualClientId = id;
break;
}
}
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, actualClientId);
});
}
/**
* Register a message handler
*/
public onMessage(type: string, handler: (payload: any, clientId: string) => any | Promise<any>): void {
this.messageHandlers.set(type, handler);
}
/**
* Send message to specific client
*/
public async sendToClient(clientId: string, type: string, payload: any, headers?: Record<string, any>): Promise<void> {
const client = this.clients.get(clientId);
if (!client) {
throw new Error(`Client ${clientId} not found`);
}
await client.channel.sendMessage(type, payload, headers);
}
/**
* Send request to specific client and wait for response
*/
public async requestFromClient<TReq = any, TRes = any>(
clientId: string,
type: string,
payload: TReq,
options?: { timeout?: number; headers?: Record<string, any> }
): Promise<TRes> {
const client = this.clients.get(clientId);
if (!client) {
throw new Error(`Client ${clientId} not found`);
}
return client.channel.request<TReq, TRes>(type, payload, options);
}
/**
* Broadcast message to all clients
*/
public async broadcast(type: string, payload: any, headers?: Record<string, any>): Promise<void> {
const promises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
promises.push(
client.channel.sendMessage(type, payload, headers)
.catch((error) => {
this.emit('error', error, clientId);
})
);
}
await Promise.all(promises);
}
/**
* Broadcast message to clients matching a filter
*/
public async broadcastTo(
filter: (clientId: string, metadata?: Record<string, any>) => boolean,
type: string,
payload: any,
headers?: Record<string, any>
): Promise<void> {
const promises: Promise<void>[] = [];
for (const [clientId, client] of this.clients) {
if (filter(clientId, client.metadata)) {
promises.push(
client.channel.sendMessage(type, payload, headers)
.catch((error) => {
this.emit('error', error, clientId);
})
);
}
}
await Promise.all(promises);
}
/**
* Get connected client IDs
*/
public getClientIds(): string[] {
return Array.from(this.clients.keys());
}
/**
* Get client information
*/
public getClientInfo(clientId: string): {
id: string;
connectedAt: number;
lastActivity: number;
metadata?: Record<string, any>;
} | undefined {
const client = this.clients.get(clientId);
if (!client) {
return undefined;
}
return {
id: client.id,
connectedAt: client.connectedAt,
lastActivity: client.lastActivity,
metadata: client.metadata
};
}
/**
* Disconnect a specific client
*/
public async disconnectClient(clientId: string): Promise<void> {
const client = this.clients.get(clientId);
if (!client) {
return;
}
await client.channel.disconnect();
this.clients.delete(clientId);
this.cleanupClientSubscriptions(clientId);
this.emit('clientDisconnect', clientId);
}
/**
* Clean up topic subscriptions for a disconnected client
*/
private cleanupClientSubscriptions(clientId: string): void {
const topics = this.clientTopics.get(clientId);
if (topics) {
for (const topic of topics) {
const set = this.topicIndex.get(topic);
if (set) {
set.delete(clientId);
if (set.size === 0) this.topicIndex.delete(topic);
}
}
this.clientTopics.delete(clientId);
}
}
/**
* Start checking for idle clients
*/
private startClientIdleCheck(): void {
if (!this.options.clientIdleTimeout || this.options.clientIdleTimeout <= 0) {
return;
}
this.clientIdleCheckTimer = setInterval(() => {
const now = Date.now();
const timeout = this.options.clientIdleTimeout!;
for (const [clientId, client] of this.clients) {
if (now - client.lastActivity > timeout) {
this.disconnectClient(clientId).catch(() => {});
}
}
}, this.options.clientIdleTimeout / 2);
}
/**
* Stop checking for idle clients
*/
private stopClientIdleCheck(): void {
if (this.clientIdleCheckTimer) {
clearInterval(this.clientIdleCheckTimer);
this.clientIdleCheckTimer = undefined;
}
}
/**
* Get server statistics
*/
public getStats(): {
isRunning: boolean;
connectedClients: number;
maxClients: number;
uptime?: number;
} {
return {
isRunning: this.isRunning,
connectedClients: this.clients.size,
maxClients: this.options.maxClients!,
uptime: this.primaryChannel ? Date.now() - (this.primaryChannel as any).connectedAt : undefined
};
}
/**
* Check if server is ready to accept connections
*/
public getIsReady(): boolean {
return this.isReady;
}
}