import * as plugins from './smartstream.plugins.js'; export class StreamIntake extends plugins.stream.Readable { // STATIC public static async fromStream(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): StreamIntake { const intakeStream = new StreamIntake(options); if (inputStream instanceof plugins.stream.Readable) { inputStream.on('data', (chunk: U) => { intakeStream.pushData(chunk); }); inputStream.on('end', () => { intakeStream.signalEnd(); }); inputStream.on('error', (err: Error) => { intakeStream.destroy(err); }); } else { const reader = (inputStream as ReadableStream).getReader(); const readChunk = () => { reader.read().then(({ done, value }) => { if (done) { intakeStream.signalEnd(); } else { intakeStream.pushData(value); readChunk(); } }).catch((err) => { intakeStream.destroy(err); }); }; readChunk(); } return intakeStream; } // INSTANCE private signalEndBoolean = false; private chunkStore: T[] = []; public pushNextObservable = new plugins.smartrx.ObservableIntake(); private pushedNextDeferred = plugins.smartpromise.defer(); constructor(options?: plugins.stream.ReadableOptions) { super({ ...options, objectMode: true }); // Ensure that we are in object mode. this.pushNextObservable.push('please push next'); } _read(size: number): void { // console.log('get next'); const pushChunk = (): void => { while (this.chunkStore.length > 0) { // If push returns false, then we should stop reading if (!this.push(this.chunkStore.shift())) { return; } } if (this.chunkStore.length === 0) { if (this.signalEndBoolean) { // If we're done, push null to signal the end of the stream this.push(null); } else { // Ask for more data and wait this.pushNextObservable.push('please push next'); this.pushedNextDeferred.promise.then(() => { this.pushedNextDeferred = plugins.smartpromise.defer(); // Reset the deferred pushChunk(); // Try pushing the next chunk }); } } }; pushChunk(); } public pushData(chunkData: T) { this.chunkStore.push(chunkData); this.pushedNextDeferred.resolve(); } public signalEnd() { this.signalEndBoolean = true; this.pushedNextDeferred.resolve(); this.pushNextObservable.signalComplete(); } }