import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; export interface IStreamTools { truncate: () => void; push: (pipeObject: any) => Promise; } export interface IStreamWriteFunction { (chunkArg: T, toolsArg: IStreamTools): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamTools): Promise; } export interface ISmartDuplexOptions extends DuplexOptions { /** * wether to print debug logs */ debug?: boolean; /** * the name of the stream */ name?: string; /** * a function that is being called to read more stuff from whereever to be processed by the stream * @returns */ readFunction?: () => Promise; /** * the write function is called for every chunk that is being written to the stream * it can push or return chunks (but does not have to) to be written to the readable side of the stream */ writeFunction?: IStreamWriteFunction; /** * a final function that is run at the end of the stream */ finalFunction?: IStreamFinalFunction; } export class SmartDuplex extends Duplex { // STATIC static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions): SmartDuplex { const smartDuplex = new SmartDuplex(options); process.nextTick(() => { smartDuplex.push(buffer); smartDuplex.push(null); // Signal the end of the data }); return smartDuplex; } // INSTANCE private backpressuredArray: plugins.lik.BackpressuredArray; public options: ISmartDuplexOptions; private observableSubscription?: plugins.smartrx.rxjs.Subscription; private debugLog(messageArg: string) { if (this.options.debug) { console.log(messageArg); } } constructor(optionsArg?: ISmartDuplexOptions) { super(Object.assign({ highWaterMark: 1, }, optionsArg)); this.options = optionsArg; this.backpressuredArray = new plugins.lik.BackpressuredArray(this.options.highWaterMark || 1) } public async _read(size: number): Promise { this.debugLog(`${this.options.name}: read was called`); await this.backpressuredArray.waitForItems(); this.debugLog(`${this.options.name}: successfully waited for items.`); if (this.options.readFunction) { await this.options.readFunction(); } let canPushMore = true; while(this.backpressuredArray.data.length > 0 && canPushMore) { const nextChunk = this.backpressuredArray.shift(); canPushMore = this.push(nextChunk); } } public async backpressuredPush (pushArg: TOutput) { const canPushMore = this.backpressuredArray.push(pushArg); if (!canPushMore) { this.debugLog(`${this.options.name}: cannot push more`); await this.backpressuredArray.waitForSpace(); this.debugLog(`${this.options.name}: can push more again`); } return canPushMore; }; private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap>(); // Ensure the _write method types the chunk as TInput and encodes TOutput public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { if (!this.options.writeFunction) { return callback(new Error('No stream function provided')); } let isTruncated = false; const tools: IStreamTools = { truncate: () => { this.push(null); isTruncated = true; callback(); }, push: async (pushArg: TOutput) => { return await this.backpressuredPush(pushArg); } }; try { const writeDeferred = plugins.smartpromise.defer(); this.asyncWritePromiseObjectmap.add(writeDeferred.promise); const modifiedChunk = await this.options.writeFunction(chunk, tools); if (isTruncated) { return; } if (modifiedChunk) { await tools.push(modifiedChunk); } callback(); writeDeferred.resolve(); writeDeferred.promise.then(() => { this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); }); } catch (err) { callback(err); } } public async _final(callback: (error?: Error | null) => void) { await Promise.all(this.asyncWritePromiseObjectmap.getArray()); if (this.options.finalFunction) { const tools: IStreamTools = { truncate: () => callback(), push: async (pipeObject) => { return this.backpressuredArray.push(pipeObject); }, }; try { const finalChunk = await this.options.finalFunction(tools); if (finalChunk) { this.backpressuredArray.push(finalChunk); } } catch (err) { this.backpressuredArray.push(null); callback(err); return; } } this.backpressuredArray.push(null); callback(); } }