54 lines
1.6 KiB
TypeScript
54 lines
1.6 KiB
TypeScript
import * as plugins from './smartstream.plugins.js';
|
|
|
|
export class StreamIntake<T> extends plugins.stream.Readable {
|
|
private signalEndBoolean = false;
|
|
private chunkStore: T[] = [];
|
|
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
|
|
private pushedNextDeferred = plugins.smartpromise.defer();
|
|
|
|
constructor(options?: plugins.stream.ReadableOptions) {
|
|
super({ ...options, objectMode: true }); // Ensure that we are in object mode.
|
|
this.pushNextObservable.push('please push next');
|
|
}
|
|
|
|
_read(size: number): void {
|
|
// console.log('get next');
|
|
const pushChunk = (): void => {
|
|
if (this.chunkStore.length > 0) {
|
|
// If push returns false, then we should stop reading
|
|
if (!this.push(this.chunkStore.shift())) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (this.chunkStore.length === 0) {
|
|
if (this.signalEndBoolean) {
|
|
// If we're done, push null to signal the end of the stream
|
|
this.push(null);
|
|
} else {
|
|
// Ask for more data and wait
|
|
this.pushNextObservable.push('please push next');
|
|
this.pushedNextDeferred.promise.then(() => {
|
|
this.pushedNextDeferred = plugins.smartpromise.defer(); // Reset the deferred
|
|
pushChunk(); // Try pushing the next chunk
|
|
});
|
|
}
|
|
}
|
|
};
|
|
|
|
pushChunk();
|
|
}
|
|
|
|
public pushData(chunkData: T) {
|
|
this.chunkStore.push(chunkData);
|
|
this.pushedNextDeferred.resolve();
|
|
}
|
|
|
|
public signalEnd() {
|
|
this.signalEndBoolean = true;
|
|
this.pushedNextDeferred.resolve();
|
|
this.pushNextObservable.signalComplete();
|
|
this.push(null);
|
|
}
|
|
}
|