From 22d18dc21f2660252ab2c26bcb7df6de7a26e793 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Mon, 3 Jun 2024 14:59:40 +0200 Subject: [PATCH] fix(core): update --- ts/00_commitinfo_data.ts | 2 +- ts/smartstream.classes.streamintake.ts | 39 ++++++++++++++++++++++++++ ts_web/00_commitinfo_data.ts | 2 +- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 8ae4d38..9afff3a 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.41', + version: '3.0.42', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts/smartstream.classes.streamintake.ts b/ts/smartstream.classes.streamintake.ts index 99f10ad..91cdef2 100644 --- a/ts/smartstream.classes.streamintake.ts +++ b/ts/smartstream.classes.streamintake.ts @@ -1,6 +1,45 @@ import * as plugins from './smartstream.plugins.js'; export class StreamIntake extends plugins.stream.Readable { + // STATIC + public static fromStream(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): StreamIntake { + const intakeStream = new StreamIntake(options); + + if (inputStream instanceof plugins.stream.Readable) { + inputStream.on('data', (chunk: U) => { + intakeStream.pushData(chunk); + }); + + inputStream.on('end', () => { + intakeStream.signalEnd(); + }); + + inputStream.on('error', (err: Error) => { + intakeStream.destroy(err); + }); + } else { + const reader = (inputStream as ReadableStream).getReader(); + + const readChunk = () => { + reader.read().then(({ done, value }) => { + if (done) { + intakeStream.signalEnd(); + } else { + intakeStream.pushData(value); + readChunk(); + } + }).catch((err) => { + intakeStream.destroy(err); + }); + }; + + readChunk(); + } + + return intakeStream; + } + + // INSTANCE private signalEndBoolean = false; private chunkStore: T[] = []; public pushNextObservable = new plugins.smartrx.ObservableIntake(); diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 8ae4d38..9afff3a 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.41', + version: '3.0.42', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' }