import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; export interface IStreamTools { truncate: () => void; push: (pipeObject: any) => Promise; } export interface IStreamWriteFunction { (chunkArg: T, toolsArg: IStreamTools): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamTools): Promise; } export interface ISmartDuplexOptions extends DuplexOptions { /** * wether to print debug logs */ debug?: boolean; /** * the name of the stream */ name?: string; /** * a function that is being called to read more stuff from whereever to be processed by the stream * @returns */ readFunction?: () => Promise; /** * the write function is called for every chunk that is being written to the stream * it can push or return chunks (but does not have to) to be written to the readable side of the stream */ writeFunction?: IStreamWriteFunction; /** * a final function that is run at the end of the stream */ finalFunction?: IStreamFinalFunction; } export class SmartDuplex extends Duplex { // STATIC static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions): SmartDuplex { const smartDuplex = new SmartDuplex(options); process.nextTick(() => { smartDuplex.push(buffer); smartDuplex.push(null); // Signal the end of the data }); return smartDuplex; } // INSTANCE private backpressuredArray: plugins.lik.BackpressuredArray; // an array that only takes a defined amount of items public options: ISmartDuplexOptions; private observableSubscription?: plugins.smartrx.rxjs.Subscription; private debugLog(messageArg: string) { // optional debug log if (this.options.debug) { console.log(messageArg); } } constructor(optionsArg?: ISmartDuplexOptions) { super( Object.assign( { highWaterMark: 1, }, optionsArg ) ); this.options = optionsArg; this.backpressuredArray = new plugins.lik.BackpressuredArray( this.options.highWaterMark || 1 ); } public async _read(size: number): Promise { this.debugLog(`${this.options.name}: read was called`); await this.backpressuredArray.waitForItems(); this.debugLog(`${this.options.name}: successfully waited for items.`); if (this.options.readFunction) { await this.options.readFunction(); } let canPushMore = true; while (this.backpressuredArray.data.length > 0 && canPushMore) { const nextChunk = this.backpressuredArray.shift(); canPushMore = this.push(nextChunk); } } public async backpressuredPush(pushArg: TOutput) { const canPushMore = this.backpressuredArray.push(pushArg); if (!canPushMore) { this.debugLog(`${this.options.name}: cannot push more`); await this.backpressuredArray.waitForSpace(); this.debugLog(`${this.options.name}: can push more again`); } return canPushMore; } private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap>(); // Ensure the _write method types the chunk as TInput and encodes TOutput public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { if (!this.options.writeFunction) { return callback(new Error('No stream function provided')); } let isTruncated = false; const tools: IStreamTools = { truncate: () => { this.push(null); isTruncated = true; callback(); }, push: async (pushArg: TOutput) => { return await this.backpressuredPush(pushArg); }, }; try { const writeDeferred = plugins.smartpromise.defer(); this.asyncWritePromiseObjectmap.add(writeDeferred.promise); const modifiedChunk = await this.options.writeFunction(chunk, tools); if (isTruncated) { return; } if (modifiedChunk) { await tools.push(modifiedChunk); } callback(); writeDeferred.resolve(); writeDeferred.promise.then(() => { this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); }); } catch (err) { callback(err); } } public async _final(callback: (error?: Error | null) => void) { await Promise.all(this.asyncWritePromiseObjectmap.getArray()); if (this.options.finalFunction) { const tools: IStreamTools = { truncate: () => callback(), push: async (pipeObject) => { return this.backpressuredArray.push(pipeObject); }, }; try { const finalChunk = await this.options.finalFunction(tools); if (finalChunk) { this.backpressuredArray.push(finalChunk); } } catch (err) { this.backpressuredArray.push(null); callback(err); return; } } this.backpressuredArray.push(null); callback(); } public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> { const duplex = this; const readable = new ReadableStream({ start(controller) { duplex.on('readable', () => { let chunk; while (null !== (chunk = duplex.read())) { controller.enqueue(chunk); } }); duplex.on('end', () => { controller.close(); }); }, cancel(reason) { duplex.destroy(new Error(reason)); }, }); const writable = new WritableStream({ write(chunk) { return new Promise((resolve, reject) => { const isBackpressured = !duplex.write(chunk, (error) => { if (error) { reject(error); } else { resolve(); } }); if (isBackpressured) { duplex.once('drain', resolve); } }); }, close() { return new Promise((resolve, reject) => { duplex.end(resolve); }); }, abort(reason) { duplex.destroy(new Error(reason)); }, }); return { readable, writable }; } }