smartstream/ts/smartstream.classes.streamintake.ts

67 lines
1.6 KiB
TypeScript
Raw Normal View History

2022-06-07 14:16:14 +00:00
import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> {
private signalEndBoolean = false;
private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
private pushedNextDeferred = plugins.smartpromise.defer();
private readableStream = plugins.from2.obj(async (size, next) => {
// console.log('get next');
// execute without backpressure
while (this.chunkStore.length > 0) {
next(null, this.chunkStore.shift());
}
if (this.signalEndBoolean) {
next(null, null);
}
// lets trigger backpressure handling
this.pushNextObservable.push('please push next');
await this.pushedNextDeferred.promise;
this.pushedNextDeferred = plugins.smartpromise.defer();
// execute with backpressure
while (this.chunkStore.length > 0) {
next(null, this.chunkStore.shift());
}
if (this.signalEndBoolean) {
next(null, null);
}
});
constructor() {
this.pushNextObservable.push('please push next');
}
/**
* returns a new style readble stream
*/
public getReadable() {
const readable = new plugins.stream.Readable({
objectMode: true,
});
return readable.wrap(this.readableStream);
}
/**
* returns an oldstyle readble stream
*/
public getReadableStream() {
return this.readableStream;
}
public pushData(chunkData: T) {
this.chunkStore.push(chunkData);
this.pushedNextDeferred.resolve();
}
public signalEnd() {
this.signalEndBoolean = true;
this.pushedNextDeferred.resolve();
this.pushNextObservable.signalComplete();
}
}