fix(core): update

This commit is contained in:
Philipp Kunz 2024-06-03 14:59:40 +02:00
parent 1cb6f727af
commit 22d18dc21f
3 changed files with 41 additions and 2 deletions

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.41',
version: '3.0.42',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -1,6 +1,45 @@
import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> extends plugins.stream.Readable {
// STATIC
public static fromStream<U>(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): StreamIntake<U> {
const intakeStream = new StreamIntake<U>(options);
if (inputStream instanceof plugins.stream.Readable) {
inputStream.on('data', (chunk: U) => {
intakeStream.pushData(chunk);
});
inputStream.on('end', () => {
intakeStream.signalEnd();
});
inputStream.on('error', (err: Error) => {
intakeStream.destroy(err);
});
} else {
const reader = (inputStream as ReadableStream).getReader();
const readChunk = () => {
reader.read().then(({ done, value }) => {
if (done) {
intakeStream.signalEnd();
} else {
intakeStream.pushData(value);
readChunk();
}
}).catch((err) => {
intakeStream.destroy(err);
});
};
readChunk();
}
return intakeStream;
}
// INSTANCE
private signalEndBoolean = false;
private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.41',
version: '3.0.42',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}