Compare commits

...

4 Commits

Author SHA1 Message Date
1cb6f727af 3.0.41 2024-06-03 10:27:08 +02:00
824c44d165 fix(core): update 2024-06-03 10:27:07 +02:00
3e062103f8 3.0.40 2024-06-02 23:40:52 +02:00
6451e93c12 fix(smartduplex): now has a .getWebStreams method, that exposes a web streams compatible API 2024-06-02 23:40:52 +02:00
6 changed files with 52 additions and 70 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartstream",
"version": "3.0.39",
"version": "3.0.41",
"private": false,
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"type": "module",

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.39',
version: '3.0.41',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -157,4 +157,53 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
this.backpressuredArray.push(null);
callback();
}
public async getWebStreams(): Promise<{ readable: ReadableStream, writable: WritableStream }> {
const duplex = this;
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
}
}

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.39',
version: '3.0.41',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -1,64 +0,0 @@
export interface IDuplexStream {
read(): any;
write(chunk: any, callback?: (error?: Error | null) => void): boolean;
on(event: string, listener: (...args: any[]) => void): this;
once(event: string, listener: (...args: any[]) => void): this;
end(callback?: () => void): void;
destroy(error?: Error): void;
}
export interface IReadableStreamOptions {
highWaterMark?: number;
}
export interface IWritableStreamOptions {
highWaterMark?: number;
}
export function convertDuplexToWebStream(duplex: IDuplexStream): { readable: ReadableStream, writable: WritableStream } {
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
}

View File

@ -1,5 +1,2 @@
import './plugins.js';
export * from './classes.webduplexstream.js';
export {
convertDuplexToWebStream,
} from './convert.js';