import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; export interface IStreamTools { truncate: () => void; push: (pipeObject: any) => void; } export interface IStreamWriteFunction { (chunkArg: T, toolsArg: IStreamTools): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamTools): Promise; } export interface ISmartDuplexOptions extends DuplexOptions { handleBackpressure?: boolean; readFunction?: () => Promise; writeFunction?: IStreamWriteFunction; finalFunction?: IStreamFinalFunction; // Add other custom options if necessary } export class SmartDuplex extends Duplex { // STATIC static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions): SmartDuplex { const smartDuplex = new SmartDuplex(options); process.nextTick(() => { smartDuplex.push(buffer); smartDuplex.push(null); // Signal the end of the data }); return smartDuplex; } static fromObservable( observable: plugins.smartrx.rxjs.Observable, options?: ISmartDuplexOptions ): SmartDuplex { const smartStream = new SmartDuplex(options); smartStream.observableSubscription = observable.subscribe({ next: (data) => { if (!smartStream.push(data) && smartStream.handleBackpressure) { // Pause the observable if the stream buffer is full smartStream.observableSubscription?.unsubscribe(); smartStream.once('drain', () => { // Resume the observable when the stream buffer is drained smartStream.observableSubscription?.unsubscribe(); smartStream.observableSubscription = observable.subscribe((data) => { smartStream.push(data); }); }); } }, error: (err) => { smartStream.emit('error', err); }, complete: () => { smartStream.push(null); // Signal the end of the data }, }); return smartStream; } static fromReplaySubject( replaySubject: plugins.smartrx.rxjs.ReplaySubject, options?: DuplexOptions ): SmartDuplex { const smartStream = new SmartDuplex(options); let isBackpressured = false; // Subscribe to the ReplaySubject const subscription = replaySubject.subscribe({ next: (data) => { const canPush = smartStream.push(data); if (!canPush) { // If push returns false, pause the subscription because of backpressure isBackpressured = true; subscription.unsubscribe(); } }, error: (err) => { smartStream.emit('error', err); }, complete: () => { smartStream.push(null); // End the stream when the ReplaySubject completes }, }); // Listen for 'drain' event to resume the subscription if it was paused smartStream.on('drain', () => { if (isBackpressured) { isBackpressured = false; // Resubscribe to the ReplaySubject since we previously paused smartStream.observableSubscription = replaySubject.subscribe({ next: (data) => { if (!smartStream.push(data)) { smartStream.observableSubscription?.unsubscribe(); isBackpressured = true; } }, // No need to repeat error and complete handling here because it's already set up above }); } }); return smartStream; } // INSTANCE private readFunction?: () => Promise; private handleBackpressure: boolean; private writeFunction?: IStreamWriteFunction; private finalFunction?: IStreamFinalFunction; private observableSubscription?: plugins.smartrx.rxjs.Subscription; constructor(optionsArg?: ISmartDuplexOptions) { super(optionsArg); this.readFunction = optionsArg?.readFunction; this.writeFunction = optionsArg?.writeFunction; this.finalFunction = optionsArg?.finalFunction; this.handleBackpressure = optionsArg?.handleBackpressure ?? true; } public async _read(size: number): Promise { if (this.readFunction) { await this.readFunction(); } } private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap>(); // Ensure the _write method types the chunk as TInput and encodes TOutput public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { if (!this.writeFunction) { return callback(new Error('No stream function provided')); } const tools: IStreamTools = { truncate: () => { this.push(null); callback(); }, push: (pushArg: TOutput) => this.push(pushArg), }; try { const writeDeferred = plugins.smartpromise.defer(); this.asyncWritePromiseObjectmap.add(writeDeferred.promise); const modifiedChunk = await this.writeFunction(chunk, tools); if (modifiedChunk) { const drainDeferred = plugins.smartpromise.defer(); this.once('drain', () => { drainDeferred.resolve(); }); const canPushMore = this.push(modifiedChunk); if (!canPushMore) { await drainDeferred.promise; console.log('jojojo'); callback(); writeDeferred.resolve(); } else { callback(); writeDeferred.resolve(); } } else { callback(); writeDeferred.resolve(); } writeDeferred.resolve(); writeDeferred.promise.then(() => { this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); }); } catch (err) { callback(err); } } public async _final(callback: (error?: Error | null) => void) { await Promise.all(this.asyncWritePromiseObjectmap.getArray()); if (this.finalFunction) { const tools: IStreamTools = { truncate: () => callback(), push: (pipeObject) => this.push(pipeObject), }; try { const finalChunk = await this.finalFunction(tools); if (finalChunk) { this.push(finalChunk); } } catch (err) { this.push(null); callback(err); return; } } this.push(null); callback(); } }