Compare commits

...

8 Commits

Author SHA1 Message Date
a490f521ab 3.0.43 2024-06-03 15:29:15 +02:00
59027782dc fix(core): update 2024-06-03 15:29:14 +02:00
8c7dd7970c 3.0.42 2024-06-03 14:59:41 +02:00
22d18dc21f fix(core): update 2024-06-03 14:59:40 +02:00
1cb6f727af 3.0.41 2024-06-03 10:27:08 +02:00
824c44d165 fix(core): update 2024-06-03 10:27:07 +02:00
3e062103f8 3.0.40 2024-06-02 23:40:52 +02:00
6451e93c12 fix(smartduplex): now has a .getWebStreams method, that exposes a web streams compatible API 2024-06-02 23:40:52 +02:00
7 changed files with 91 additions and 70 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstream", "name": "@push.rocks/smartstream",
"version": "3.0.39", "version": "3.0.43",
"private": false, "private": false,
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.", "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"type": "module", "type": "module",

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstream', name: '@push.rocks/smartstream',
version: '3.0.39', version: '3.0.43',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
} }

View File

@ -157,4 +157,53 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
this.backpressuredArray.push(null); this.backpressuredArray.push(null);
callback(); callback();
} }
public async getWebStreams(): Promise<{ readable: ReadableStream, writable: WritableStream }> {
const duplex = this;
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
}
} }

View File

@ -1,6 +1,45 @@
import * as plugins from './smartstream.plugins.js'; import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> extends plugins.stream.Readable { export class StreamIntake<T> extends plugins.stream.Readable {
// STATIC
public static async fromStream<U>(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): StreamIntake<U> {
const intakeStream = new StreamIntake<U>(options);
if (inputStream instanceof plugins.stream.Readable) {
inputStream.on('data', (chunk: U) => {
intakeStream.pushData(chunk);
});
inputStream.on('end', () => {
intakeStream.signalEnd();
});
inputStream.on('error', (err: Error) => {
intakeStream.destroy(err);
});
} else {
const reader = (inputStream as ReadableStream).getReader();
const readChunk = () => {
reader.read().then(({ done, value }) => {
if (done) {
intakeStream.signalEnd();
} else {
intakeStream.pushData(value);
readChunk();
}
}).catch((err) => {
intakeStream.destroy(err);
});
};
readChunk();
}
return intakeStream;
}
// INSTANCE
private signalEndBoolean = false; private signalEndBoolean = false;
private chunkStore: T[] = []; private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>(); public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstream', name: '@push.rocks/smartstream',
version: '3.0.39', version: '3.0.43',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
} }

View File

@ -1,64 +0,0 @@
export interface IDuplexStream {
read(): any;
write(chunk: any, callback?: (error?: Error | null) => void): boolean;
on(event: string, listener: (...args: any[]) => void): this;
once(event: string, listener: (...args: any[]) => void): this;
end(callback?: () => void): void;
destroy(error?: Error): void;
}
export interface IReadableStreamOptions {
highWaterMark?: number;
}
export interface IWritableStreamOptions {
highWaterMark?: number;
}
export function convertDuplexToWebStream(duplex: IDuplexStream): { readable: ReadableStream, writable: WritableStream } {
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
}

View File

@ -1,5 +1,2 @@
import './plugins.js'; import './plugins.js';
export * from './classes.webduplexstream.js'; export * from './classes.webduplexstream.js';
export {
convertDuplexToWebStream,
} from './convert.js';