From 994b1d20fb6a4218ddde411b02fcd888bfae7bb9 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 30 Aug 2025 23:02:49 +0000 Subject: [PATCH] feat(streaming): Add streaming support: chunked stream transfers, file send/receive, stream events and helpers --- changelog.md | 13 +++ readme.md | 85 ++++++++++++++ test/test.streaming.ts | 218 +++++++++++++++++++++++++++++++++++ ts/00_commitinfo_data.ts | 2 +- ts/classes.ipcchannel.ts | 238 ++++++++++++++++++++++++++++++++++++++- ts/classes.ipcclient.ts | 59 ++++++++-- ts/classes.ipcserver.ts | 52 +++++++++ ts/index.ts | 19 ++++ ts/smartipc.plugins.ts | 3 +- 9 files changed, 678 insertions(+), 11 deletions(-) create mode 100644 test/test.streaming.ts diff --git a/changelog.md b/changelog.md index bec3dcc..347c5e0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,18 @@ # Changelog +## 2025-08-30 - 2.3.0 - feat(streaming) +Add streaming support: chunked stream transfers, file send/receive, stream events and helpers + +- Implement chunked streaming protocol in IpcChannel (init / chunk / end / error / cancel messages) +- Add sendStream, cancelOutgoingStream and cancelIncomingStream methods to IpcChannel +- Expose high-level streaming API on client: sendStream, sendFile, cancelOutgoingStream, cancelIncomingStream +- Expose high-level streaming API on server: sendStreamToClient, sendFileToClient, cancelIncomingStreamFromClient, cancelOutgoingStreamToClient +- Emit 'stream' events from channels/servers/clients with (info, readable) where info includes streamId, meta, headers and clientId +- Add maxConcurrentStreams option (default 32) and enforce concurrent stream limits for incoming/outgoing +- Add SmartIpc.pipeStreamToFile helper to persist incoming streams to disk +- Export stream in smartipc.plugins and update README with streaming usage and examples +- Add comprehensive streaming tests (test/test.streaming.ts) covering large payloads, file transfer, cancellation and concurrency limits + ## 2025-08-29 - 2.2.2 - fix(ipc) Propagate per-client disconnects, add proper routing for targeted messages, and remove unused node-ipc deps diff --git a/readme.md b/readme.md index 67c6506..32e38fa 100644 --- a/readme.md +++ b/readme.md @@ -16,6 +16,7 @@ SmartIPC delivers bulletproof Inter-Process Communication for Node.js applicatio - **CI/Test Ready** - Built-in helpers and race condition prevention for testing - **Observable** - Real-time metrics, connection tracking, and health monitoring - **Multiple Patterns** - Request/Response, Pub/Sub, and Fire-and-Forget messaging +- **Streaming Support** - Efficient, backpressure‑aware streaming for large data and files ## 📦 Installation @@ -184,6 +185,90 @@ await publisher.publish('user.login', { ## 💪 Advanced Features +### 📦 Streaming Large Data & Files + +SmartIPC supports efficient, backpressure-aware streaming of large payloads using chunked messages. Streams work both directions and emit a high-level `stream` event for consumption. + +Client → Server streaming: + +```typescript +// Server side: receive stream +server.on('stream', async (info, readable) => { + if (info.meta?.type === 'file') { + console.log('Receiving file', info.meta.basename, 'from', info.clientId); + } + // Pipe to disk or process chunks + await SmartIpc.pipeStreamToFile(readable, '/tmp/incoming.bin'); +}); + +// Client side: send a stream +const readable = fs.createReadStream('/path/to/local.bin'); +await client.sendStream(readable, { + meta: { type: 'file', basename: 'local.bin' }, + chunkSize: 64 * 1024 // optional, defaults to 64k +}); +``` + +Server → Client streaming: + +```typescript +client.on('stream', async (info, readable) => { + console.log('Got stream from server', info.meta); + await SmartIpc.pipeStreamToFile(readable, '/tmp/from-server.bin'); +}); + +await server.sendStreamToClient(client.getClientId(), fs.createReadStream('/path/server.bin'), { + meta: { type: 'file', basename: 'server.bin' } +}); +``` + +High-level helpers for files: + +```typescript +// Client → Server +await client.sendFile('/path/to/bigfile.iso'); + +// Server → Client +await server.sendFileToClient(clientId, '/path/to/backup.tar'); + +// Save an incoming stream to a file (both sides) +server.on('stream', async (info, readable) => { + await SmartIpc.pipeStreamToFile(readable, '/data/uploaded/' + info.meta?.basename); +}); +``` + +Events & metadata: + +- `channel/server/client` emit `stream` with `(info, readable)` +- `info` contains: `streamId`, `meta` (your metadata, e.g., filename/size), `headers`, and `clientId` (if available) + +API summary: + +- Client: `sendStream(readable, opts)`, `sendFile(filePath, opts)`, `cancelOutgoingStream(id)`, `cancelIncomingStream(id)` +- Server: `sendStreamToClient(clientId, readable, opts)`, `sendFileToClient(clientId, filePath, opts)`, `cancelIncomingStreamFromClient(clientId, id)`, `cancelOutgoingStreamToClient(clientId, id)` +- Utility: `SmartIpc.pipeStreamToFile(readable, filePath)` + +Concurrency and cancelation: + +```typescript +// Limit concurrent streams per connection +const server = SmartIpc.createServer({ + id: 'svc', socketPath: '/tmp/svc.sock', maxConcurrentStreams: 2 +}); + +// Cancel a stream from the receiver side +server.on('stream', (info, readable) => { + if (info.meta?.shouldCancel) { + (server as any).primaryChannel.cancelIncomingStream(info.streamId, { clientId: info.clientId }); + } +}); +``` + +Notes: +- Streaming uses chunked messages under the hood and respects socket backpressure. +- Include `meta` to share context like filename/size; it’s delivered with the `stream` event. +- Configure `maxConcurrentStreams` (default: 32) to guard resources. + ### 🏁 Server Readiness Detection Eliminate race conditions in tests and production: diff --git a/test/test.streaming.ts b/test/test.streaming.ts new file mode 100644 index 0000000..42a7137 --- /dev/null +++ b/test/test.streaming.ts @@ -0,0 +1,218 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as smartipc from '../ts/index.js'; +import * as smartdelay from '@push.rocks/smartdelay'; +import * as plugins from '../ts/smartipc.plugins.js'; +import * as fs from 'fs'; +import * as path from 'path'; + +let server: smartipc.IpcServer; +let client: smartipc.IpcClient; + +tap.test('setup TCP server and client (streaming)', async () => { + server = smartipc.SmartIpc.createServer({ + id: 'stream-test-server', + host: '127.0.0.1', + port: 19876, + heartbeat: false + }); + await server.start(); + + client = smartipc.SmartIpc.createClient({ + id: 'stream-test-server', + host: '127.0.0.1', + port: 19876, + clientId: 'stream-client-1', + heartbeat: false + }); + await client.connect(); + expect(client.getIsConnected()).toBeTrue(); +}); + +tap.test('client -> server streaming large payload', async () => { + // Create ~5MB buffer + const size = 5 * 1024 * 1024 + 123; // add some non-chunk-aligned bytes + const data = Buffer.alloc(size); + for (let i = 0; i < size; i++) data[i] = i % 251; + + const received: Buffer[] = []; + const done = new Promise((resolve, reject) => { + server.on('stream', (info: any, readable: plugins.stream.Readable) => { + // only handle our test stream + if (info?.meta?.direction === 'client-to-server') { + readable.on('data', chunk => received.push(Buffer.from(chunk))); + readable.on('end', resolve); + readable.on('error', reject); + } + }); + }); + + // Send stream from client + const readable = plugins.stream.Readable.from(data); + await client.sendStream(readable, { meta: { direction: 'client-to-server' }, chunkSize: 64 * 1024 }); + + await done; + const result = Buffer.concat(received); + expect(result.length).toEqual(data.length); + expect(result.equals(data)).toBeTrue(); +}); + +tap.test('server -> client streaming large payload', async () => { + const size = 6 * 1024 * 1024 + 7; + const data = Buffer.alloc(size); + for (let i = 0; i < size; i++) data[i] = (i * 7) % 255; + + const received: Buffer[] = []; + const done = new Promise((resolve, reject) => { + client.on('stream', (info: any, readable: plugins.stream.Readable) => { + if (info?.meta?.direction === 'server-to-client') { + readable.on('data', chunk => received.push(Buffer.from(chunk))); + readable.on('end', resolve); + readable.on('error', reject); + } + }); + }); + + const readable = plugins.stream.Readable.from(data); + await server.sendStreamToClient('stream-client-1', readable, { meta: { direction: 'server-to-client' }, chunkSize: 64 * 1024 }); + + await done; + const result = Buffer.concat(received); + expect(result.length).toEqual(data.length); + expect(result.equals(data)).toBeTrue(); +}); + +tap.test('client -> server file transfer to disk', async () => { + const baseTmp1 = path.join(process.cwd(), '.nogit', 'tmp'); + fs.mkdirSync(baseTmp1, { recursive: true }); + const tmpDir = fs.mkdtempSync(path.join(baseTmp1, 'tmp-')); + const srcPath = path.join(tmpDir, 'src.bin'); + const dstPath = path.join(tmpDir, 'dst.bin'); + + // Prepare file ~1MB + const size = 1024 * 1024 + 333; + const buf = Buffer.alloc(size); + for (let i = 0; i < size; i++) buf[i] = (i * 11) % 255; + fs.writeFileSync(srcPath, buf); + + const done = new Promise((resolve, reject) => { + server.on('stream', async (info: any, readable: plugins.stream.Readable) => { + if (info?.meta?.type === 'file' && info?.meta?.basename === 'src.bin') { + try { + await smartipc.pipeStreamToFile(readable, dstPath); + resolve(); + } catch (e) { + reject(e); + } + } + }); + }); + + await client.sendFile(srcPath); + await done; + const out = fs.readFileSync(dstPath); + expect(out.equals(buf)).toBeTrue(); +}); + +tap.test('server -> client file transfer to disk', async () => { + const baseTmp2 = path.join(process.cwd(), '.nogit', 'tmp'); + fs.mkdirSync(baseTmp2, { recursive: true }); + const tmpDir = fs.mkdtempSync(path.join(baseTmp2, 'tmp-')); + const srcPath = path.join(tmpDir, 'serverfile.bin'); + const dstPath = path.join(tmpDir, 'clientfile.bin'); + + const size = 512 * 1024 + 77; + const buf = Buffer.alloc(size); + for (let i = 0; i < size; i++) buf[i] = (i * 17) % 251; + fs.writeFileSync(srcPath, buf); + + const done = new Promise((resolve, reject) => { + client.on('stream', async (info: any, readable: plugins.stream.Readable) => { + if (info?.meta?.type === 'file' && info?.meta?.basename === 'serverfile.bin') { + try { + await smartipc.pipeStreamToFile(readable, dstPath); + resolve(); + } catch (e) { + reject(e); + } + } + }); + }); + + await server.sendFileToClient('stream-client-1', srcPath); + await done; + const out = fs.readFileSync(dstPath); + expect(out.equals(buf)).toBeTrue(); +}); + +tap.test('receiver cancels an incoming stream', async () => { + // Create a slow readable that emits many chunks + const bigChunk = Buffer.alloc(128 * 1024, 1); + let pushed = 0; + const readable = new plugins.stream.Readable({ + read() { + setTimeout(() => { + if (pushed > 200) { + this.push(null); + } else { + this.push(bigChunk); + pushed++; + } + }, 5); + } + }); + + let cancelled = false; + const cancelPromise = new Promise((resolve) => { + server.on('stream', (info: any, r: plugins.stream.Readable) => { + if (info?.meta?.direction === 'client-to-server-cancel') { + // cancel after first chunk + r.once('data', async () => { + cancelled = true; + // send cancel back to sender + await (server as any).primaryChannel.cancelIncomingStream(info.streamId, { clientId: info.clientId }); + resolve(); + }); + r.on('error', () => { /* ignore cancellation error */ }); + // drain to trigger data + r.resume(); + } + }); + }); + + const sendPromise = client + .sendStream(readable, { meta: { direction: 'client-to-server-cancel' } }) + .catch(() => { /* expected due to cancel */ }); + await cancelPromise; + expect(cancelled).toBeTrue(); + await sendPromise; +}); + +tap.test('enforce maxConcurrentStreams option', async () => { + // Setup separate low-limit server/client + const srv = smartipc.SmartIpc.createServer({ id: 'limit-srv', host: '127.0.0.1', port: 19999, heartbeat: false, maxConcurrentStreams: 1 }); + await srv.start(); + const cli = smartipc.SmartIpc.createClient({ id: 'limit-srv', host: '127.0.0.1', port: 19999, clientId: 'limit-client', heartbeat: false, maxConcurrentStreams: 1 }); + await cli.connect(); + + const r1 = plugins.stream.Readable.from(Buffer.alloc(256 * 1024)); + const r2 = plugins.stream.Readable.from(Buffer.alloc(256 * 1024)); + const p1 = cli.sendStream(r1, { meta: { n: 1 } }); + let threw = false; + try { + await cli.sendStream(r2, { meta: { n: 2 } }); + } catch (e) { + threw = true; + } + expect(threw).toBeTrue(); + await p1; + await cli.disconnect(); + await srv.stop(); +}); + +tap.test('cleanup streaming test', async () => { + await client.disconnect(); + await server.stop(); + await smartdelay.delayFor(50); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 1ef50a8..15b3561 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartipc', - version: '2.2.2', + version: '2.3.0', description: 'A library for node inter process communication, providing an easy-to-use API for IPC.' } diff --git a/ts/classes.ipcchannel.ts b/ts/classes.ipcchannel.ts index 65ade83..5996494 100644 --- a/ts/classes.ipcchannel.ts +++ b/ts/classes.ipcchannel.ts @@ -26,6 +26,8 @@ export interface IIpcChannelOptions extends IIpcTransportOptions { heartbeatInitialGracePeriodMs?: number; /** Throw on heartbeat timeout (default: true, set false to emit event instead) */ heartbeatThrowOnTimeout?: boolean; + /** Maximum concurrent streams (incoming/outgoing) */ + maxConcurrentStreams?: number; } /** @@ -54,6 +56,12 @@ export class IpcChannel extends plugins.EventEm private connectionStartTime: number = Date.now(); private isReconnecting = false; private isClosing = false; + // Streaming state + private incomingStreams = new Map(); + private incomingStreamMeta = new Map>(); + private outgoingStreams = new Map void }>(); + private activeIncomingStreams = 0; + private activeOutgoingStreams = 0; // Metrics private metrics = { @@ -79,6 +87,7 @@ export class IpcChannel extends plugins.EventEm heartbeat: true, heartbeatInterval: 5000, heartbeatTimeout: 10000, + maxConcurrentStreams: 32, ...options }; @@ -303,7 +312,7 @@ export class IpcChannel extends plugins.EventEm // Track metrics this.metrics.messagesReceived++; this.metrics.bytesReceived += JSON.stringify(message).length; - + // Handle heartbeat and send response if (message.type === '__heartbeat__') { this.lastHeartbeat = Date.now(); @@ -325,6 +334,105 @@ export class IpcChannel extends plugins.EventEm return; } + // Handle streaming control messages + if (message.type === '__stream_init__') { + const streamId = (message.payload as any)?.streamId as string; + const meta = (message.payload as any)?.meta as Record | undefined; + if (typeof streamId === 'string' && streamId.length) { + // Enforce max concurrent incoming streams + if (this.activeIncomingStreams >= (this.options.maxConcurrentStreams || Infinity)) { + const response: IIpcMessageEnvelope = { + id: plugins.crypto.randomUUID(), + type: '__stream_error__', + timestamp: Date.now(), + payload: { streamId, error: 'Max concurrent streams exceeded' }, + headers: message.headers?.clientId ? { clientId: message.headers.clientId } : undefined + }; + this.transport.send(response).catch(() => {}); + return; + } + const pass = new plugins.stream.PassThrough(); + this.incomingStreams.set(streamId, pass); + if (meta) this.incomingStreamMeta.set(streamId, meta); + this.activeIncomingStreams++; + // Emit a high-level stream event + const headersClientId = message.headers?.clientId; + const eventPayload = { + streamId, + meta: meta || {}, + headers: message.headers || {}, + clientId: headersClientId, + }; + // Emit as ('stream', info, readable) + this.emit('stream', eventPayload, pass); + } + return; + } + + if (message.type === '__stream_chunk__') { + const streamId = (message.payload as any)?.streamId as string; + const chunkB64 = (message.payload as any)?.chunk as string; + const pass = this.incomingStreams.get(streamId); + if (pass && typeof chunkB64 === 'string') { + try { + const chunk = Buffer.from(chunkB64, 'base64'); + pass.write(chunk); + } catch (e) { + // If decode fails, destroy stream + pass.destroy(e as Error); + this.incomingStreams.delete(streamId); + this.incomingStreamMeta.delete(streamId); + } + } + return; + } + + if (message.type === '__stream_end__') { + const streamId = (message.payload as any)?.streamId as string; + const pass = this.incomingStreams.get(streamId); + if (pass) { + pass.end(); + this.incomingStreams.delete(streamId); + this.incomingStreamMeta.delete(streamId); + this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1); + } + return; + } + + if (message.type === '__stream_error__') { + const streamId = (message.payload as any)?.streamId as string; + const errMsg = (message.payload as any)?.error as string; + const pass = this.incomingStreams.get(streamId); + if (pass) { + pass.destroy(new Error(errMsg || 'stream error')); + this.incomingStreams.delete(streamId); + this.incomingStreamMeta.delete(streamId); + this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1); + } + return; + } + + if (message.type === '__stream_cancel__') { + const streamId = (message.payload as any)?.streamId as string; + // Cancel outgoing stream with same id if present + const ctrl = this.outgoingStreams.get(streamId); + if (ctrl) { + ctrl.cancelled = true; + try { ctrl.abort?.(); } catch {} + this.outgoingStreams.delete(streamId); + this.activeOutgoingStreams = Math.max(0, this.activeOutgoingStreams - 1); + } + // Also cancel any incoming stream if tracked + const pass = this.incomingStreams.get(streamId); + if (pass) { + try { pass.destroy(new Error('stream cancelled')); } catch {} + this.incomingStreams.delete(streamId); + this.incomingStreamMeta.delete(streamId); + this.activeIncomingStreams = Math.max(0, this.activeIncomingStreams - 1); + } + return; + } + // Handle request/response if (message.correlationId && this.pendingRequests.has(message.correlationId)) { const pending = this.pendingRequests.get(message.correlationId)!; @@ -468,7 +576,7 @@ export class IpcChannel extends plugins.EventEm * Register a message handler */ public on(event: string, handler: (payload: any) => any | Promise): this { - if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout' || event === 'clientDisconnected') { + if (event === 'message' || event === 'connect' || event === 'disconnect' || event === 'error' || event === 'reconnecting' || event === 'drain' || event === 'heartbeatTimeout' || event === 'clientDisconnected' || event === 'stream') { // Special handling for channel events super.on(event, handler); } else { @@ -530,3 +638,129 @@ export class IpcChannel extends plugins.EventEm }; } } + +/** + * Streaming helpers + */ +export interface IStreamSendOptions { + headers?: Record; + chunkSize?: number; // bytes, default 64k + streamId?: string; + meta?: Record; +} + +export type ReadableLike = NodeJS.ReadableStream | plugins.stream.Readable; + +// Extend IpcChannel with a sendStream method +export interface IpcChannel { + sendStream(readable: ReadableLike, options?: IStreamSendOptions): Promise; + cancelOutgoingStream(streamId: string, headers?: Record): Promise; + cancelIncomingStream(streamId: string, headers?: Record): Promise; +} + +IpcChannel.prototype.sendStream = async function(this: IpcChannel, readable: ReadableLike, options?: IStreamSendOptions): Promise { + const streamId = options?.streamId || (plugins.crypto.randomUUID ? plugins.crypto.randomUUID() : `${Date.now()}-${Math.random()}`); + const headers = options?.headers || {}; + const chunkSize = Math.max(1024, Math.min(options?.chunkSize || 64 * 1024, (this as any).options.maxMessageSize || 8 * 1024 * 1024)); + const self: any = this; + + // Enforce max concurrent outgoing streams (reserve a slot synchronously) + if (self.activeOutgoingStreams >= (self.options.maxConcurrentStreams || Infinity)) { + throw new Error('Max concurrent streams exceeded'); + } + self.activeOutgoingStreams++; + self.outgoingStreams.set(streamId, { + cancelled: false, + abort: () => { + try { (readable as any).destroy?.(new Error('stream cancelled')); } catch {} + } + }); + try { + // Send init after reserving slot + await (this as any).sendMessage('__stream_init__', { streamId, meta: options?.meta || {} }, headers); + } catch (e) { + self.outgoingStreams.delete(streamId); + self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); + throw e; + } + + const readChunkAndSend = async (buf: Buffer) => { + // Slice into chunkSize frames if needed + for (let offset = 0; offset < buf.length; offset += chunkSize) { + const ctrl = self.outgoingStreams.get(streamId); + if (ctrl?.cancelled) { + return; + } + const slice = buf.subarray(offset, Math.min(offset + chunkSize, buf.length)); + const chunkB64 = slice.toString('base64'); + await (this as any).sendMessage('__stream_chunk__', { streamId, chunk: chunkB64 }, headers); + } + }; + + await new Promise((resolve, reject) => { + let sending = Promise.resolve(); + readable.on('data', (chunk: any) => { + const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + // Ensure sequential sending to avoid write races + sending = sending.then(() => readChunkAndSend(buf)); + sending.catch(reject); + }); + readable.on('end', async () => { + try { + await sending; + await (this as any).sendMessage('__stream_end__', { streamId }, headers); + self.outgoingStreams.delete(streamId); + self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); + resolve(); + } catch (e) { + self.outgoingStreams.delete(streamId); + self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); + reject(e); + } + }); + readable.on('error', async (err: Error) => { + try { + await sending.catch(() => {}); + await (this as any).sendMessage('__stream_error__', { streamId, error: err.message }, headers); + } finally { + self.outgoingStreams.delete(streamId); + self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); + reject(err); + } + }); + // In case the stream is already ended + const r = readable as any; + if (r.readableEnded) { + (async () => { + await (this as any).sendMessage('__stream_end__', { streamId }, headers); + self.outgoingStreams.delete(streamId); + self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); + resolve(); + })().catch(reject); + } + }); +}; + +IpcChannel.prototype.cancelOutgoingStream = async function(this: IpcChannel, streamId: string, headers?: Record): Promise { + const self: any = this; + const ctrl = self.outgoingStreams.get(streamId); + if (ctrl) { + ctrl.cancelled = true; + try { ctrl.abort?.(); } catch {} + self.outgoingStreams.delete(streamId); + self.activeOutgoingStreams = Math.max(0, self.activeOutgoingStreams - 1); + } + await (this as any).sendMessage('__stream_cancel__', { streamId }, headers || {}); +}; + +IpcChannel.prototype.cancelIncomingStream = async function(this: IpcChannel, streamId: string, headers?: Record): Promise { + const self: any = this; + const pass = self.incomingStreams.get(streamId); + if (pass) { + try { pass.destroy(new Error('stream cancelled')); } catch {} + self.incomingStreams.delete(streamId); + self.incomingStreamMeta.delete(streamId); + self.activeIncomingStreams = Math.max(0, self.activeIncomingStreams - 1); + } + await (this as any).sendMessage('__stream_cancel__', { streamId }, headers || {}); +}; diff --git a/ts/classes.ipcclient.ts b/ts/classes.ipcclient.ts index 24ddd4f..94e9689 100644 --- a/ts/classes.ipcclient.ts +++ b/ts/classes.ipcclient.ts @@ -122,25 +122,27 @@ export class IpcClient extends plugins.EventEmitter { // If waitForReady is specified, wait for server socket to exist first if (connectOptions.waitForReady) { const waitTimeout = connectOptions.waitTimeout || 10000; + // For Unix domain sockets / named pipes: wait explicitly using helper that probes with clientOnly + if (this.options.socketPath) { + const { SmartIpc } = await import('./index.js'); + await (SmartIpc as any).waitForServer({ socketPath: this.options.socketPath, timeoutMs: waitTimeout }); + await attemptConnection(); + return; + } + // Fallback (e.g., TCP): retry-connect loop const startTime = Date.now(); - while (Date.now() - startTime < waitTimeout) { try { - // Try to connect await attemptConnection(); return; // Success! } catch (error) { - // If it's a connection refused error, server might not be ready yet - if ((error as any).message?.includes('ECONNREFUSED') || - (error as any).message?.includes('ENOENT')) { + if ((error as any).message?.includes('ECONNREFUSED')) { await new Promise(resolve => setTimeout(resolve, 100)); continue; } - // Other errors should be thrown throw error; } } - throw new Error(`Server not ready after ${waitTimeout}ms`); } else { // Normal connection attempt @@ -233,6 +235,13 @@ export class IpcClient extends plugins.EventEmitter { this.emit('reconnecting', info); }); + // Forward streaming events + // Emitted as ('stream', info, readable) + // info contains { streamId, meta, headers, clientId } + this.channel.on('stream', (info: any, readable: plugins.stream.Readable) => { + this.emit('stream', info, readable); + }); + // Handle messages this.channel.on('message', (message) => { // Check if we have a handler for this message type @@ -361,4 +370,40 @@ export class IpcClient extends plugins.EventEmitter { public getStats(): any { return this.channel.getStats(); } + + /** + * Send a Node.js readable stream to the server + */ + public async sendStream(readable: plugins.stream.Readable | NodeJS.ReadableStream, options?: { headers?: Record; chunkSize?: number; streamId?: string; meta?: Record }): Promise { + const headers = { ...(options?.headers || {}), clientId: this.clientId }; + await (this as any).channel.sendStream(readable as any, { ...options, headers }); + } + + /** + * Send a file to the server via streaming + */ + public async sendFile(filePath: string, options?: { headers?: Record; chunkSize?: number; streamId?: string; meta?: Record }): Promise { + const fs = plugins.fs; + const path = plugins.path; + const stat = fs.statSync(filePath); + const meta = { + ...(options?.meta || {}), + type: 'file', + basename: path.basename(filePath), + size: stat.size, + mtimeMs: stat.mtimeMs + }; + const rs = fs.createReadStream(filePath); + await this.sendStream(rs, { ...options, meta }); + } + + /** Cancel an outgoing stream by id */ + public async cancelOutgoingStream(streamId: string): Promise { + await (this as any).channel.cancelOutgoingStream(streamId, { clientId: this.clientId }); + } + + /** Cancel an incoming stream by id */ + public async cancelIncomingStream(streamId: string): Promise { + await (this as any).channel.cancelIncomingStream(streamId, { clientId: this.clientId }); + } } diff --git a/ts/classes.ipcserver.ts b/ts/classes.ipcserver.ts index 171f5ee..98d8dfa 100644 --- a/ts/classes.ipcserver.ts +++ b/ts/classes.ipcserver.ts @@ -200,6 +200,12 @@ export class IpcServer extends plugins.EventEmitter { this.emit('error', error, 'server'); }); + // Forward streaming events to server level + this.primaryChannel.on('stream', (info: any, readable: plugins.stream.Readable) => { + // Emit ('stream', info, readable) + this.emit('stream', info, readable); + }); + this.primaryChannel.on('heartbeatTimeout', (error) => { // Forward heartbeatTimeout event (when heartbeatThrowOnTimeout is false) this.emit('heartbeatTimeout', error, 'server'); @@ -396,6 +402,52 @@ export class IpcServer extends plugins.EventEmitter { await client.channel.sendMessage(type, payload, routedHeaders); } + /** + * Send a Node.js readable stream to a specific client + */ + public async sendStreamToClient(clientId: string, readable: plugins.stream.Readable | NodeJS.ReadableStream, options?: { headers?: Record; chunkSize?: number; streamId?: string; meta?: Record }): Promise { + const client = this.clients.get(clientId); + if (!client) { + throw new Error(`Client ${clientId} not found`); + } + const headers = { ...(options?.headers || {}), clientId }; + await (client.channel as any).sendStream(readable as any, { ...options, headers }); + } + + /** + * Send a file to a specific client via streaming + */ + public async sendFileToClient(clientId: string, filePath: string, options?: { headers?: Record; chunkSize?: number; streamId?: string; meta?: Record }): Promise { + const client = this.clients.get(clientId); + if (!client) { + throw new Error(`Client ${clientId} not found`); + } + const fs = plugins.fs; + const path = plugins.path; + const stat = fs.statSync(filePath); + const meta = { + ...(options?.meta || {}), + type: 'file', + basename: path.basename(filePath), + size: stat.size, + mtimeMs: stat.mtimeMs + }; + const rs = fs.createReadStream(filePath); + await this.sendStreamToClient(clientId, rs, { ...options, meta }); + } + + /** Cancel a stream incoming from a client (server side) */ + public async cancelIncomingStreamFromClient(clientId: string, streamId: string): Promise { + if (!this.primaryChannel) return; + await (this.primaryChannel as any).cancelIncomingStream(streamId, { clientId }); + } + + /** Cancel a server->client outgoing stream */ + public async cancelOutgoingStreamToClient(clientId: string, streamId: string): Promise { + if (!this.primaryChannel) return; + await (this.primaryChannel as any).cancelOutgoingStream(streamId, { clientId }); + } + /** * Send request to specific client and wait for response */ diff --git a/ts/index.ts b/ts/index.ts index d8ae02f..e5e7de5 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -6,6 +6,7 @@ export * from './classes.ipcclient.js'; import { IpcServer } from './classes.ipcserver.js'; import { IpcClient } from './classes.ipcclient.js'; import { IpcChannel } from './classes.ipcchannel.js'; +import { stream as nodeStream, fs as nodeFs, path as nodePath } from './smartipc.plugins.js'; import type { IIpcServerOptions } from './classes.ipcserver.js'; import type { IIpcClientOptions, IConnectRetryConfig } from './classes.ipcclient.js'; import type { IIpcChannelOptions } from './classes.ipcchannel.js'; @@ -129,3 +130,21 @@ export class SmartIpc { // Export the main class as default export default SmartIpc; + +/** + * Helper: pipe an incoming SmartIPC readable stream to a file path. + * Ensures directory exists; resolves on finish. + */ +export async function pipeStreamToFile(readable: NodeJS.ReadableStream, filePath: string): Promise { + // Ensure directory exists + try { + nodeFs.mkdirSync(nodePath.dirname(filePath), { recursive: true }); + } catch {} + await new Promise((resolve, reject) => { + const ws = nodeFs.createWriteStream(filePath); + ws.on('finish', () => resolve()); + ws.on('error', reject); + readable.on('error', reject); + (readable as any).pipe(ws); + }); +} diff --git a/ts/smartipc.plugins.ts b/ts/smartipc.plugins.ts index 7d03050..a412826 100644 --- a/ts/smartipc.plugins.ts +++ b/ts/smartipc.plugins.ts @@ -11,6 +11,7 @@ import * as os from 'os'; import * as path from 'path'; import * as fs from 'fs'; import * as crypto from 'crypto'; +import * as stream from 'stream'; import { EventEmitter } from 'events'; -export { net, os, path, fs, crypto, EventEmitter }; +export { net, os, path, fs, crypto, stream, EventEmitter };