smartstream/ts/smartstream.classes.streamintake.ts
2024-05-17 18:40:32 +02:00

53 lines
1.6 KiB
TypeScript

import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> extends plugins.stream.Readable {
private signalEndBoolean = false;
private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
private pushedNextDeferred = plugins.smartpromise.defer();
constructor(options?: plugins.stream.ReadableOptions) {
super({ ...options, objectMode: true }); // Ensure that we are in object mode.
this.pushNextObservable.push('please push next');
}
_read(size: number): void {
// console.log('get next');
const pushChunk = (): void => {
while (this.chunkStore.length > 0) {
// If push returns false, then we should stop reading
if (!this.push(this.chunkStore.shift())) {
return;
}
}
if (this.chunkStore.length === 0) {
if (this.signalEndBoolean) {
// If we're done, push null to signal the end of the stream
this.push(null);
} else {
// Ask for more data and wait
this.pushNextObservable.push('please push next');
this.pushedNextDeferred.promise.then(() => {
this.pushedNextDeferred = plugins.smartpromise.defer(); // Reset the deferred
pushChunk(); // Try pushing the next chunk
});
}
}
};
pushChunk();
}
public pushData(chunkData: T) {
this.chunkStore.push(chunkData);
this.pushedNextDeferred.resolve();
}
public signalEnd() {
this.signalEndBoolean = true;
this.pushedNextDeferred.resolve();
this.pushNextObservable.signalComplete();
}
}