import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; export class SmartStream extends Duplex { private observableSubscription?: plugins.smartrx.rxjs.Subscription; constructor(options?: DuplexOptions) { super(options); } _read(size: number) { // Implement if you need custom behavior, otherwise leave it empty } _write(chunk: any, encoding: string, callback: (error?: Error | null) => void) { // Implement if you need custom behavior callback(); } static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartStream { const smartStream = new SmartStream(options); process.nextTick(() => { smartStream.push(buffer); smartStream.push(null); // Signal the end of the data }); return smartStream; } static fromObservable(observable: plugins.smartrx.rxjs.Observable, options?: DuplexOptions): SmartStream { const smartStream = new SmartStream(options); smartStream.observableSubscription = observable.subscribe({ next: (data) => { if (!smartStream.push(data)) { // Pause the observable if the stream buffer is full smartStream.observableSubscription?.unsubscribe(); smartStream.once('drain', () => { // Resume the observable when the stream buffer is drained smartStream.observableSubscription?.unsubscribe(); smartStream.observableSubscription = observable.subscribe(data => { smartStream.push(data); }); }); } }, error: (err) => { smartStream.emit('error', err); }, complete: () => { smartStream.push(null); // Signal the end of the data } }); return smartStream; } }