Compare commits

..

4 Commits

Author SHA1 Message Date
a490f521ab 3.0.43 2024-06-03 15:29:15 +02:00
59027782dc fix(core): update 2024-06-03 15:29:14 +02:00
8c7dd7970c 3.0.42 2024-06-03 14:59:41 +02:00
22d18dc21f fix(core): update 2024-06-03 14:59:40 +02:00
4 changed files with 42 additions and 3 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartstream",
"version": "3.0.41",
"version": "3.0.43",
"private": false,
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"type": "module",

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.41',
version: '3.0.43',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -1,6 +1,45 @@
import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> extends plugins.stream.Readable {
// STATIC
public static async fromStream<U>(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): StreamIntake<U> {
const intakeStream = new StreamIntake<U>(options);
if (inputStream instanceof plugins.stream.Readable) {
inputStream.on('data', (chunk: U) => {
intakeStream.pushData(chunk);
});
inputStream.on('end', () => {
intakeStream.signalEnd();
});
inputStream.on('error', (err: Error) => {
intakeStream.destroy(err);
});
} else {
const reader = (inputStream as ReadableStream).getReader();
const readChunk = () => {
reader.read().then(({ done, value }) => {
if (done) {
intakeStream.signalEnd();
} else {
intakeStream.pushData(value);
readChunk();
}
}).catch((err) => {
intakeStream.destroy(err);
});
};
readChunk();
}
return intakeStream;
}
// INSTANCE
private signalEndBoolean = false;
private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.41',
version: '3.0.43',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}