import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; export interface SmartStreamOptions extends DuplexOptions { // You can add more custom options relevant to TInput and TOutput if necessary } export class SmartStream extends Duplex { private observableSubscription?: plugins.smartrx.rxjs.Subscription; private asyncChunkModifier?: (chunk: TInput) => Promise; constructor(options?: SmartStreamOptions, asyncChunkModifierArg?: (chunk: TInput) => Promise) { super(options); this.asyncChunkModifier = asyncChunkModifierArg; } // Ensure the _write method types the chunk as TInput and encodes TOutput public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { try { if (this.asyncChunkModifier) { const modifiedChunk = await this.asyncChunkModifier(chunk); if (!this.push(modifiedChunk)) { // Handle backpressure here if necessary } } else { if (!this.push(chunk as unknown as TOutput)) { // Handle backpressure here if necessary } } callback(); } catch (err) { callback(err); } } static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartStream { const smartStream = new SmartStream(options); process.nextTick(() => { smartStream.push(buffer); smartStream.push(null); // Signal the end of the data }); return smartStream; } static fromObservable(observable: plugins.smartrx.rxjs.Observable, options?: DuplexOptions): SmartStream { const smartStream = new SmartStream(options); smartStream.observableSubscription = observable.subscribe({ next: (data) => { if (!smartStream.push(data)) { // Pause the observable if the stream buffer is full smartStream.observableSubscription?.unsubscribe(); smartStream.once('drain', () => { // Resume the observable when the stream buffer is drained smartStream.observableSubscription?.unsubscribe(); smartStream.observableSubscription = observable.subscribe(data => { smartStream.push(data); }); }); } }, error: (err) => { smartStream.emit('error', err); }, complete: () => { smartStream.push(null); // Signal the end of the data } }); return smartStream; } static fromReplaySubject(replaySubject: plugins.smartrx.rxjs.ReplaySubject, options?: DuplexOptions): SmartStream { const smartStream = new SmartStream(options); let isBackpressured = false; // Subscribe to the ReplaySubject const subscription = replaySubject.subscribe({ next: (data) => { const canPush = smartStream.push(data); if (!canPush) { // If push returns false, pause the subscription because of backpressure isBackpressured = true; subscription.unsubscribe(); } }, error: (err) => { smartStream.emit('error', err); }, complete: () => { smartStream.push(null); // End the stream when the ReplaySubject completes } }); // Listen for 'drain' event to resume the subscription if it was paused smartStream.on('drain', () => { if (isBackpressured) { isBackpressured = false; // Resubscribe to the ReplaySubject since we previously paused smartStream.observableSubscription = replaySubject.subscribe({ next: (data) => { if (!smartStream.push(data)) { smartStream.observableSubscription?.unsubscribe(); isBackpressured = true; } }, // No need to repeat error and complete handling here because it's already set up above }); } }); return smartStream; } }