feat(streaming): Add streaming support: chunked stream transfers, file send/receive, stream events and helpers

This commit is contained in:
2025-08-30 23:02:49 +00:00
parent 7ba064584b
commit 994b1d20fb
9 changed files with 678 additions and 11 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartipc',
version: '2.2.2',
version: '2.3.0',
description: 'A library for node inter process communication, providing an easy-to-use API for IPC.'
}

View File

@@ -26,6 +26,8 @@ export interface IIpcChannelOptions extends IIpcTransportOptions {
heartbeatInitialGracePeriodMs?: number;
/** Throw on heartbeat timeout (default: true, set false to emit event instead) */
heartbeatThrowOnTimeout?: boolean;
/** Maximum concurrent streams (incoming/outgoing) */
maxConcurrentStreams?: number;
}
/**
@@ -54,6 +56,12 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
private connectionStartTime: number = Date.now();
private isReconnecting = false;
private isClosing = false;
// Streaming state
private incomingStreams = new Map<string, plugins.stream.PassThrough>();
private incomingStreamMeta = new Map<string, Record<string, any>>();
private outgoingStreams = new Map<string, { cancelled: boolean; abort?: () => void }>();
private activeIncomingStreams = 0;
private activeOutgoingStreams = 0;
// Metrics
private metrics = {
@@ -79,6 +87,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
heartbeat: true,
heartbeatInterval: 5000,
heartbeatTimeout: 10000,
maxConcurrentStreams: 32,
...options
};
@@ -303,7 +312,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
// Track metrics
this.metrics.messagesReceived++;
this.metrics.bytesReceived += JSON.stringify(message).length;
// Handle heartbeat and send response
if (message.type === '__heartbeat__') {
this.lastHeartbeat = Date.now();
@@ -325,6 +334,105 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
return;
}
// Handle streaming control messages
if (message.type === '__stream_init__') {
const streamId = (message.payload as any)?.streamId as string;
const meta = (message.payload as any)?.meta as Record<string, any> | undefined;
if (typeof streamId === 'string' && streamId.length) {
// Enforce max concurrent incoming streams
if (this.activeIncomingStreams >= (this.options.maxConcurrentStreams || Infinity)) {
const response: IIpcMessageEnvelope = {
id: plugins.crypto.randomUUID(),
type: '__stream_error__',
timestamp: Date.now(),
payload: { streamId, error: 'Max concurrent streams exceeded' },
headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined
};
this.transport.send(response).catch(() => {});
return;
}
const pass = new plugins.stream.PassThrough();
this.incomingStreams.set(streamId, pass);
if (meta) this.incomingStreamMeta.set(streamId, meta);
this.activeIncomingStreams++;
// Emit a high-level stream event
const headersClientId = message.headers?.clientId;
const eventPayload = {
streamId,
meta: meta || {},
headers: message.headers || {},
clientId: headersClientId,
};
// Emit as ('stream', info, readable)
this.emit('stream', eventPayload, pass);
}
return;
}
if (message.type === '__stream_chunk__') {
const streamId = (message.payload as any)?.streamId as string;
const chunkB64 = (message.payload as any)?.chunk as string;
const pass = this.incomingStreams.get(streamId);
if (pass && typeof chunkB64 === 'string') {
try {
const chunk = Buffer.from(chunkB64, 'base64');
pass.write(chunk);
} catch (e) {
// If decode fails, destroy stream
pass.destroy(e as Error);
this.incomingStreams.delete(streamId);
this.incomingStreamMeta.delete(streamId);
}
}
return;
}
if (message.type === '__stream_end__') {
const streamId = (message.payload as any)?.streamId as string;
const pass = this.incomingStreams.get(streamId);
if (pass) {
pass.end();
this.incomingStreams.delete(streamId);
this.incomingStreamMeta.delete(streamId);
this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1);
}
return;
}
if (message.type === '__stream_error__') {
const streamId = (message.payload as any)?.streamId as string;
const errMsg = (message.payload as any)?.error as string;
const pass = this.incomingStreams.get(streamId);
if (pass) {
pass.destroy(new Error(errMsg || 'stream error'));
this.incomingStreams.delete(streamId);
this.incomingStreamMeta.delete(streamId);
this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1);
}
return;
}
if (message.type === '__stream_cancel__') {
const streamId = (message.payload as any)?.streamId as string;
// Cancel outgoing stream with same id if present
const ctrl = this.outgoingStreams.get(streamId);
if (ctrl) {
ctrl.cancelled = true;
try { ctrl.abort?.(); } catch {}
this.outgoingStreams.delete(streamId);
this.activeOutgoingStreams = Math.max(0, this.activeOutgoingStreams - 1);
}
// Also cancel any incoming stream if tracked
const pass = this.incomingStreams.get(streamId);
if (pass) {
try { pass.destroy(new Error('stream cancelled')); } catch {}
this.incomingStreams.delete(streamId);
this.incomingStreamMeta.delete(streamId);
this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1);
}
return;
}
// Handle request/response
if (message.correlationId && this.pendingRequests.has(message.correlationId)) {
const pending = this.pendingRequests.get(message.correlationId)!;
@@ -468,7 +576,7 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
* Register a message handler
*/
public on(event: string, handler: (payload: any) => any | Promise<any>): this {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout' || event === 'clientDisconnected') {
if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout' || event === 'clientDisconnected' || event === 'stream') {
// Special handling for channel events
super.on(event, handler);
} else {
@@ -530,3 +638,129 @@ export class IpcChannel<TRequest = any, TResponse = any> extends plugins.EventEm
};
}
}
/**
* Streaming helpers
*/
export interface IStreamSendOptions {
headers?: Record<string, any>;
chunkSize?: number; // bytes, default 64k
streamId?: string;
meta?: Record<string, any>;
}
export type ReadableLike = NodeJS.ReadableStream | plugins.stream.Readable;
// Extend IpcChannel with a sendStream method
export interface IpcChannel<TRequest, TResponse> {
sendStream(readable: ReadableLike, options?: IStreamSendOptions): Promise<void>;
cancelOutgoingStream(streamId: string, headers?: Record<string, any>): Promise<void>;
cancelIncomingStream(streamId: string, headers?: Record<string, any>): Promise<void>;
}
IpcChannel.prototype.sendStream = async function(this: IpcChannel, readable: ReadableLike, options?: IStreamSendOptions): Promise<void> {
const streamId = options?.streamId || (plugins.crypto.randomUUID ? plugins.crypto.randomUUID() : `${Date.now()}-${Math.random()}`);
const headers = options?.headers || {};
const chunkSize = Math.max(1024, Math.min(options?.chunkSize || 64 * 1024, (this as any).options.maxMessageSize || 8 * 1024 * 1024));
const self: any = this;
// Enforce max concurrent outgoing streams (reserve a slot synchronously)
if (self.activeOutgoingStreams >= (self.options.maxConcurrentStreams || Infinity)) {
throw new Error('Max concurrent streams exceeded');
}
self.activeOutgoingStreams++;
self.outgoingStreams.set(streamId, {
cancelled: false,
abort: () => {
try { (readable as any).destroy?.(new Error('stream cancelled')); } catch {}
}
});
try {
// Send init after reserving slot
await (this as any).sendMessage('__stream_init__', { streamId, meta: options?.meta || {} }, headers);
} catch (e) {
self.outgoingStreams.delete(streamId);
self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1);
throw e;
}
const readChunkAndSend = async (buf: Buffer) => {
// Slice into chunkSize frames if needed
for (let offset = 0; offset < buf.length; offset += chunkSize) {
const ctrl = self.outgoingStreams.get(streamId);
if (ctrl?.cancelled) {
return;
}
const slice = buf.subarray(offset, Math.min(offset + chunkSize, buf.length));
const chunkB64 = slice.toString('base64');
await (this as any).sendMessage('__stream_chunk__', { streamId, chunk: chunkB64 }, headers);
}
};
await new Promise<void>((resolve, reject) => {
let sending = Promise.resolve();
readable.on('data', (chunk: any) => {
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
// Ensure sequential sending to avoid write races
sending = sending.then(() => readChunkAndSend(buf));
sending.catch(reject);
});
readable.on('end', async () => {
try {
await sending;
await (this as any).sendMessage('__stream_end__', { streamId }, headers);
self.outgoingStreams.delete(streamId);
self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1);
resolve();
} catch (e) {
self.outgoingStreams.delete(streamId);
self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1);
reject(e);
}
});
readable.on('error', async (err: Error) => {
try {
await sending.catch(() => {});
await (this as any).sendMessage('__stream_error__', { streamId, error: err.message }, headers);
} finally {
self.outgoingStreams.delete(streamId);
self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1);
reject(err);
}
});
// In case the stream is already ended
const r = readable as any;
if (r.readableEnded) {
(async () => {
await (this as any).sendMessage('__stream_end__', { streamId }, headers);
self.outgoingStreams.delete(streamId);
self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1);
resolve();
})().catch(reject);
}
});
};
IpcChannel.prototype.cancelOutgoingStream = async function(this: IpcChannel, streamId: string, headers?: Record<string, any>): Promise<void> {
const self: any = this;
const ctrl = self.outgoingStreams.get(streamId);
if (ctrl) {
ctrl.cancelled = true;
try { ctrl.abort?.(); } catch {}
self.outgoingStreams.delete(streamId);
self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1);
}
await (this as any).sendMessage('__stream_cancel__', { streamId }, headers || {});
};
IpcChannel.prototype.cancelIncomingStream = async function(this: IpcChannel, streamId: string, headers?: Record<string, any>): Promise<void> {
const self: any = this;
const pass = self.incomingStreams.get(streamId);
if (pass) {
try { pass.destroy(new Error('stream cancelled')); } catch {}
self.incomingStreams.delete(streamId);
self.incomingStreamMeta.delete(streamId);
self.activeIncomingStreams = Math.max(0, self.activeIncomingStreams - 1);
}
await (this as any).sendMessage('__stream_cancel__', { streamId }, headers || {});
};

View File

@@ -122,25 +122,27 @@ export class IpcClient extends plugins.EventEmitter {
// If waitForReady is specified, wait for server socket to exist first
if (connectOptions.waitForReady) {
const waitTimeout = connectOptions.waitTimeout || 10000;
// For Unix domain sockets / named pipes: wait explicitly using helper that probes with clientOnly
if (this.options.socketPath) {
const { SmartIpc } = await import('./index.js');
await (SmartIpc as any).waitForServer({ socketPath: this.options.socketPath, timeoutMs: waitTimeout });
await attemptConnection();
return;
}
// Fallback (e.g., TCP): retry-connect loop
const startTime = Date.now();
while (Date.now() - startTime < waitTimeout) {
try {
// Try to connect
await attemptConnection();
return; // Success!
} catch (error) {
// If it's a connection refused error, server might not be ready yet
if ((error as any).message?.includes('ECONNREFUSED') ||
(error as any).message?.includes('ENOENT')) {
if ((error as any).message?.includes('ECONNREFUSED')) {
await new Promise(resolve => setTimeout(resolve, 100));
continue;
}
// Other errors should be thrown
throw error;
}
}
throw new Error(`Server not ready after ${waitTimeout}ms`);
} else {
// Normal connection attempt
@@ -233,6 +235,13 @@ export class IpcClient extends plugins.EventEmitter {
this.emit('reconnecting', info);
});
// Forward streaming events
// Emitted as ('stream', info, readable)
// info contains { streamId, meta, headers, clientId }
this.channel.on('stream', (info: any, readable: plugins.stream.Readable) => {
this.emit('stream', info, readable);
});
// Handle messages
this.channel.on('message', (message) => {
// Check if we have a handler for this message type
@@ -361,4 +370,40 @@ export class IpcClient extends plugins.EventEmitter {
public getStats(): any {
return this.channel.getStats();
}
/**
* Send a Node.js readable stream to the server
*/
public async sendStream(readable: plugins.stream.Readable | NodeJS.ReadableStream, options?: { headers?: Record<string, any>; chunkSize?: number; streamId?: string; meta?: Record<string, any> }): Promise<void> {
const headers = { ...(options?.headers || {}), clientId: this.clientId };
await (this as any).channel.sendStream(readable as any, { ...options, headers });
}
/**
* Send a file to the server via streaming
*/
public async sendFile(filePath: string, options?: { headers?: Record<string, any>; chunkSize?: number; streamId?: string; meta?: Record<string, any> }): Promise<void> {
const fs = plugins.fs;
const path = plugins.path;
const stat = fs.statSync(filePath);
const meta = {
...(options?.meta || {}),
type: 'file',
basename: path.basename(filePath),
size: stat.size,
mtimeMs: stat.mtimeMs
};
const rs = fs.createReadStream(filePath);
await this.sendStream(rs, { ...options, meta });
}
/** Cancel an outgoing stream by id */
public async cancelOutgoingStream(streamId: string): Promise<void> {
await (this as any).channel.cancelOutgoingStream(streamId, { clientId: this.clientId });
}
/** Cancel an incoming stream by id */
public async cancelIncomingStream(streamId: string): Promise<void> {
await (this as any).channel.cancelIncomingStream(streamId, { clientId: this.clientId });
}
}

View File

@@ -200,6 +200,12 @@ export class IpcServer extends plugins.EventEmitter {
this.emit('error', error, 'server');
});
// Forward streaming events to server level
this.primaryChannel.on('stream', (info: any, readable: plugins.stream.Readable) => {
// Emit ('stream', info, readable)
this.emit('stream', info, readable);
});
this.primaryChannel.on('heartbeatTimeout', (error) => {
// Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false)
this.emit('heartbeatTimeout', error, 'server');
@@ -396,6 +402,52 @@ export class IpcServer extends plugins.EventEmitter {
await client.channel.sendMessage(type, payload, routedHeaders);
}
/**
* Send a Node.js readable stream to a specific client
*/
public async sendStreamToClient(clientId: string, readable: plugins.stream.Readable | NodeJS.ReadableStream, options?: { headers?: Record<string, any>; chunkSize?: number; streamId?: string; meta?: Record<string, any> }): Promise<void> {
const client = this.clients.get(clientId);
if (!client) {
throw new Error(`Client ${clientId} not found`);
}
const headers = { ...(options?.headers || {}), clientId };
await (client.channel as any).sendStream(readable as any, { ...options, headers });
}
/**
* Send a file to a specific client via streaming
*/
public async sendFileToClient(clientId: string, filePath: string, options?: { headers?: Record<string, any>; chunkSize?: number; streamId?: string; meta?: Record<string, any> }): Promise<void> {
const client = this.clients.get(clientId);
if (!client) {
throw new Error(`Client ${clientId} not found`);
}
const fs = plugins.fs;
const path = plugins.path;
const stat = fs.statSync(filePath);
const meta = {
...(options?.meta || {}),
type: 'file',
basename: path.basename(filePath),
size: stat.size,
mtimeMs: stat.mtimeMs
};
const rs = fs.createReadStream(filePath);
await this.sendStreamToClient(clientId, rs, { ...options, meta });
}
/** Cancel a stream incoming from a client (server side) */
public async cancelIncomingStreamFromClient(clientId: string, streamId: string): Promise<void> {
if (!this.primaryChannel) return;
await (this.primaryChannel as any).cancelIncomingStream(streamId, { clientId });
}
/** Cancel a server->client outgoing stream */
public async cancelOutgoingStreamToClient(clientId: string, streamId: string): Promise<void> {
if (!this.primaryChannel) return;
await (this.primaryChannel as any).cancelOutgoingStream(streamId, { clientId });
}
/**
* Send request to specific client and wait for response
*/

View File

@@ -6,6 +6,7 @@ export * from './classes.ipcclient.js';
import { IpcServer } from './classes.ipcserver.js';
import { IpcClient } from './classes.ipcclient.js';
import { IpcChannel } from './classes.ipcchannel.js';
import { stream as nodeStream, fs as nodeFs, path as nodePath } from './smartipc.plugins.js';
import type { IIpcServerOptions } from './classes.ipcserver.js';
import type { IIpcClientOptions, IConnectRetryConfig } from './classes.ipcclient.js';
import type { IIpcChannelOptions } from './classes.ipcchannel.js';
@@ -129,3 +130,21 @@ export class SmartIpc {
// Export the main class as default
export default SmartIpc;
/**
* Helper: pipe an incoming SmartIPC readable stream to a file path.
* Ensures directory exists; resolves on finish.
*/
export async function pipeStreamToFile(readable: NodeJS.ReadableStream, filePath: string): Promise<void> {
// Ensure directory exists
try {
nodeFs.mkdirSync(nodePath.dirname(filePath), { recursive: true });
} catch {}
await new Promise<void>((resolve, reject) => {
const ws = nodeFs.createWriteStream(filePath);
ws.on('finish', () => resolve());
ws.on('error', reject);
readable.on('error', reject);
(readable as any).pipe(ws);
});
}

View File

@@ -11,6 +11,7 @@ import * as os from 'os';
import * as path from 'path';
import * as fs from 'fs';
import * as crypto from 'crypto';
import * as stream from 'stream';
import { EventEmitter } from 'events';
export { net, os, path, fs, crypto, EventEmitter };
export { net, os, path, fs, crypto, stream, EventEmitter };