fix(smartduplex): now has a .getWebStreams method, that exposes a web streams compatible API

This commit is contained in:
2024-06-02 23:40:52 +02:00
parent 70cf93595c
commit 6451e93c12
5 changed files with 51 additions and 69 deletions

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.39',
version: '3.0.40',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -157,4 +157,53 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
this.backpressuredArray.push(null);
callback();
}
public getWebStreams(): { readable: ReadableStream, writable: WritableStream } {
const duplex = this;
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
}
}