import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; export interface IStreamTools { truncate: () => void; push: (pipeObject: any) => Promise; } export interface IStreamWriteFunction { (chunkArg: T, toolsArg: IStreamTools): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamTools): Promise; } export interface ISmartDuplexOptions extends DuplexOptions { /** * wether to print debug logs */ debug?: boolean; /** * the name of the stream */ name?: string; /** * a function that is being called to read more stuff from whereever to be processed by the stream * @returns */ readFunction?: () => Promise; /** * the write function is called for every chunk that is being written to the stream * it can push or return chunks (but does not have to) to be written to the readable side of the stream */ writeFunction?: IStreamWriteFunction; /** * a final function that is run at the end of the stream */ finalFunction?: IStreamFinalFunction; } export class SmartDuplex extends Duplex { // STATIC static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions): SmartDuplex { const smartDuplex = new SmartDuplex(options); process.nextTick(() => { smartDuplex.push(buffer); smartDuplex.push(null); // Signal the end of the data }); return smartDuplex; } public static fromWebReadableStream( readableStream: ReadableStream ): SmartDuplex { const smartDuplex = new SmartDuplex({ objectMode: true, }); // Acquire reader ONCE const reader = readableStream.getReader(); let reading = false; // Override _read to pull from the web reader smartDuplex._read = function (_size: number) { if (reading) return; reading = true; reader.read().then( ({ value, done }) => { reading = false; if (done) { smartDuplex.push(null); } else { smartDuplex.push(value); } }, (err) => { reading = false; smartDuplex.destroy(err); } ); }; // Cancel reader on destroy smartDuplex.on('close', () => { reader.cancel().catch(() => {}); }); return smartDuplex; } // INSTANCE private backpressuredArray: plugins.lik.BackpressuredArray; public options: ISmartDuplexOptions; private _consumerWantsData = false; private _readFunctionRunning = false; private debugLog(messageArg: string) { if (this.options.debug) { console.log(messageArg); } } constructor(optionsArg?: ISmartDuplexOptions) { const safeOptions = optionsArg || {} as ISmartDuplexOptions; super( Object.assign( { highWaterMark: 1, }, safeOptions ) ); this.options = safeOptions; this.backpressuredArray = new plugins.lik.BackpressuredArray( this.options.highWaterMark || 1 ); } /** * Synchronously drains items from the backpressuredArray into the readable side. * Stops when push() returns false (consumer is full) or array is empty. */ private _drainBackpressuredArray(): void { while (this.backpressuredArray.data.length > 0) { const nextChunk = this.backpressuredArray.shift(); if (nextChunk === null) { // EOF signal — push null to end readable side this.push(null); this._consumerWantsData = false; return; } const canPushMore = this.push(nextChunk); if (!canPushMore) { this._consumerWantsData = false; return; } } } // _read must NOT be async — Node.js ignores the return value public _read(size: number): void { this.debugLog(`${this.options.name}: read was called`); this._consumerWantsData = true; // Drain any buffered items first if (this.backpressuredArray.data.length > 0) { this._drainBackpressuredArray(); } // If readFunction exists and is not already running, start it if (this.options.readFunction && !this._readFunctionRunning) { this._readFunctionRunning = true; this.options.readFunction().then( () => { this._readFunctionRunning = false; }, (err) => { this._readFunctionRunning = false; this.destroy(err); } ); } } public async backpressuredPush(pushArg: TOutput) { const canPushMore = this.backpressuredArray.push(pushArg); // Try to drain if the consumer wants data if (this._consumerWantsData) { this._drainBackpressuredArray(); } if (!canPushMore) { this.debugLog(`${this.options.name}: cannot push more`); await this.backpressuredArray.waitForSpace(); this.debugLog(`${this.options.name}: can push more again`); } return canPushMore; } private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap>(); // _write must NOT be async — Node.js ignores the return value public _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { if (!this.options.writeFunction) { return callback(new Error('No stream function provided')); } let callbackCalled = false; const safeCallback = (err?: Error | null) => { if (!callbackCalled) { callbackCalled = true; callback(err); } }; let isTruncated = false; const tools: IStreamTools = { truncate: () => { isTruncated = true; safeCallback(); this.push(null); }, push: async (pushArg: TOutput) => { return await this.backpressuredPush(pushArg); }, }; const writeDeferred = plugins.smartpromise.defer(); this.asyncWritePromiseObjectmap.add(writeDeferred.promise); this.options.writeFunction(chunk, tools).then( (modifiedChunk) => { if (isTruncated) { writeDeferred.resolve(); this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); return; } const finish = () => { safeCallback(); writeDeferred.resolve(); this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); }; if (modifiedChunk !== undefined && modifiedChunk !== null) { this.backpressuredPush(modifiedChunk).then(finish, (err) => { safeCallback(err); writeDeferred.resolve(); this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); }); } else { finish(); } }, (err) => { safeCallback(err); writeDeferred.resolve(); this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); } ); } // _final must NOT be async — Node.js ignores the return value public _final(callback: (error?: Error | null) => void) { let callbackCalled = false; const safeCallback = (err?: Error | null) => { if (!callbackCalled) { callbackCalled = true; callback(err); } }; Promise.all(this.asyncWritePromiseObjectmap.getArray()).then(() => { if (this.options.finalFunction) { const tools: IStreamTools = { truncate: () => safeCallback(), push: async (pipeObject) => { return await this.backpressuredPush(pipeObject); }, }; this.options.finalFunction(tools).then( (finalChunk) => { const pushNull = () => { this.backpressuredArray.push(null); if (this._consumerWantsData) { this._drainBackpressuredArray(); } safeCallback(); }; if (finalChunk !== undefined && finalChunk !== null) { this.backpressuredPush(finalChunk).then(pushNull, (err) => { safeCallback(err); }); } else { pushNull(); } }, (err) => { this.backpressuredArray.push(null); if (this._consumerWantsData) { this._drainBackpressuredArray(); } safeCallback(err); } ); } else { this.backpressuredArray.push(null); if (this._consumerWantsData) { this._drainBackpressuredArray(); } safeCallback(); } }, (err) => { safeCallback(err); }); } public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> { const duplex = this; let readableClosed = false; const readable = new ReadableStream({ start(controller) { const onReadable = () => { let chunk; while (null !== (chunk = duplex.read())) { controller.enqueue(chunk); } }; const onEnd = () => { if (!readableClosed) { readableClosed = true; controller.close(); } cleanup(); }; const cleanup = () => { duplex.removeListener('readable', onReadable); duplex.removeListener('end', onEnd); }; duplex.on('readable', onReadable); duplex.on('end', onEnd); }, cancel(reason) { duplex.destroy(new Error(reason)); }, }); const writable = new WritableStream({ write(chunk) { return new Promise((resolve, reject) => { let resolved = false; const onDrain = () => { if (!resolved) { resolved = true; resolve(); } }; const isBackpressured = !duplex.write(chunk, (error) => { if (error) { if (!resolved) { resolved = true; duplex.removeListener('drain', onDrain); reject(error); } } else if (!isBackpressured && !resolved) { resolved = true; resolve(); } }); if (isBackpressured) { duplex.once('drain', onDrain); } }); }, close() { return new Promise((resolve, reject) => { duplex.end((err: Error | null) => { if (err) reject(err); else resolve(); }); }); }, abort(reason) { duplex.destroy(new Error(reason)); }, }); return { readable, writable }; } }