2022-06-07 14:16:14 +00:00
|
|
|
import * as plugins from './smartstream.plugins.js';
|
|
|
|
|
2023-11-03 12:55:56 +00:00
|
|
|
export class StreamIntake<T> extends plugins.stream.Readable {
|
2024-06-03 12:59:40 +00:00
|
|
|
// STATIC
|
2024-06-04 16:58:08 +00:00
|
|
|
public static async fromStream<U>(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): Promise<StreamIntake<U>> {
|
2024-06-03 12:59:40 +00:00
|
|
|
const intakeStream = new StreamIntake<U>(options);
|
|
|
|
|
|
|
|
if (inputStream instanceof plugins.stream.Readable) {
|
|
|
|
inputStream.on('data', (chunk: U) => {
|
|
|
|
intakeStream.pushData(chunk);
|
|
|
|
});
|
|
|
|
|
|
|
|
inputStream.on('end', () => {
|
|
|
|
intakeStream.signalEnd();
|
|
|
|
});
|
|
|
|
|
|
|
|
inputStream.on('error', (err: Error) => {
|
|
|
|
intakeStream.destroy(err);
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
const reader = (inputStream as ReadableStream).getReader();
|
|
|
|
|
|
|
|
const readChunk = () => {
|
|
|
|
reader.read().then(({ done, value }) => {
|
|
|
|
if (done) {
|
|
|
|
intakeStream.signalEnd();
|
|
|
|
} else {
|
|
|
|
intakeStream.pushData(value);
|
|
|
|
readChunk();
|
|
|
|
}
|
|
|
|
}).catch((err) => {
|
|
|
|
intakeStream.destroy(err);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
readChunk();
|
|
|
|
}
|
|
|
|
|
|
|
|
return intakeStream;
|
|
|
|
}
|
|
|
|
|
|
|
|
// INSTANCE
|
2022-06-07 14:16:14 +00:00
|
|
|
private signalEndBoolean = false;
|
|
|
|
private chunkStore: T[] = [];
|
|
|
|
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
|
|
|
|
private pushedNextDeferred = plugins.smartpromise.defer();
|
|
|
|
|
2023-11-03 12:55:56 +00:00
|
|
|
constructor(options?: plugins.stream.ReadableOptions) {
|
|
|
|
super({ ...options, objectMode: true }); // Ensure that we are in object mode.
|
2022-06-07 14:16:14 +00:00
|
|
|
this.pushNextObservable.push('please push next');
|
|
|
|
}
|
|
|
|
|
2023-11-03 12:55:56 +00:00
|
|
|
_read(size: number): void {
|
|
|
|
// console.log('get next');
|
|
|
|
const pushChunk = (): void => {
|
2024-05-17 16:40:32 +00:00
|
|
|
while (this.chunkStore.length > 0) {
|
2023-11-03 12:55:56 +00:00
|
|
|
// If push returns false, then we should stop reading
|
|
|
|
if (!this.push(this.chunkStore.shift())) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (this.chunkStore.length === 0) {
|
|
|
|
if (this.signalEndBoolean) {
|
|
|
|
// If we're done, push null to signal the end of the stream
|
|
|
|
this.push(null);
|
|
|
|
} else {
|
|
|
|
// Ask for more data and wait
|
|
|
|
this.pushNextObservable.push('please push next');
|
|
|
|
this.pushedNextDeferred.promise.then(() => {
|
|
|
|
this.pushedNextDeferred = plugins.smartpromise.defer(); // Reset the deferred
|
|
|
|
pushChunk(); // Try pushing the next chunk
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
pushChunk();
|
2022-06-07 14:16:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public pushData(chunkData: T) {
|
|
|
|
this.chunkStore.push(chunkData);
|
|
|
|
this.pushedNextDeferred.resolve();
|
|
|
|
}
|
|
|
|
|
|
|
|
public signalEnd() {
|
|
|
|
this.signalEndBoolean = true;
|
|
|
|
this.pushedNextDeferred.resolve();
|
|
|
|
this.pushNextObservable.signalComplete();
|
|
|
|
}
|
|
|
|
}
|