67 lines
1.6 KiB
TypeScript
67 lines
1.6 KiB
TypeScript
import * as plugins from './smartstream.plugins.js';
|
|
|
|
export class StreamIntake<T> {
|
|
private signalEndBoolean = false;
|
|
private chunkStore: T[] = [];
|
|
|
|
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
|
|
|
|
private pushedNextDeferred = plugins.smartpromise.defer();
|
|
|
|
private readableStream = plugins.from2.obj(async (size, next) => {
|
|
// console.log('get next');
|
|
// execute without backpressure
|
|
while (this.chunkStore.length > 0) {
|
|
next(null, this.chunkStore.shift());
|
|
}
|
|
if (this.signalEndBoolean) {
|
|
next(null, null);
|
|
}
|
|
|
|
// lets trigger backpressure handling
|
|
this.pushNextObservable.push('please push next');
|
|
await this.pushedNextDeferred.promise;
|
|
this.pushedNextDeferred = plugins.smartpromise.defer();
|
|
|
|
// execute with backpressure
|
|
while (this.chunkStore.length > 0) {
|
|
next(null, this.chunkStore.shift());
|
|
}
|
|
if (this.signalEndBoolean) {
|
|
next(null, null);
|
|
}
|
|
});
|
|
|
|
constructor() {
|
|
this.pushNextObservable.push('please push next');
|
|
}
|
|
|
|
/**
|
|
* returns a new style readble stream
|
|
*/
|
|
public getReadable() {
|
|
const readable = new plugins.stream.Readable({
|
|
objectMode: true,
|
|
});
|
|
return readable.wrap(this.readableStream);
|
|
}
|
|
|
|
/**
|
|
* returns an oldstyle readble stream
|
|
*/
|
|
public getReadableStream() {
|
|
return this.readableStream;
|
|
}
|
|
|
|
public pushData(chunkData: T) {
|
|
this.chunkStore.push(chunkData);
|
|
this.pushedNextDeferred.resolve();
|
|
}
|
|
|
|
public signalEnd() {
|
|
this.signalEndBoolean = true;
|
|
this.pushedNextDeferred.resolve();
|
|
this.pushNextObservable.signalComplete();
|
|
}
|
|
}
|