From d161d6613a018380f83bdcfdf17dcb5f84b1b873 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Sat, 11 Nov 2023 20:56:46 +0100 Subject: [PATCH] fix(core): update --- test/test.streamfunction.ts | 2 +- ts/00_commitinfo_data.ts | 2 +- ts/smartstream.classes.smartduplex.ts | 15 +++++++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/test/test.streamfunction.ts b/test/test.streamfunction.ts index b1ae09f..eb781d0 100644 --- a/test/test.streamfunction.ts +++ b/test/test.streamfunction.ts @@ -29,7 +29,7 @@ tap.test('should handle a read stream', async (tools) => { }, }) ]); - // await streamWrapper.run(); + await streamWrapper.run(); }); tap.test('should create a valid Intake', async (tools) => { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 8fcbe73..9b2f99a 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.17', + version: '3.0.18', description: 'simplifies access to node streams' } diff --git a/ts/smartstream.classes.smartduplex.ts b/ts/smartstream.classes.smartduplex.ts index cbfa6a5..858a45d 100644 --- a/ts/smartstream.classes.smartduplex.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -130,6 +130,9 @@ export class SmartDuplex extends Duplex { } public notBackpressured = true; + public get backpressured(): boolean { + return !this.notBackpressured; + } public push(chunkArg?: TOutput | null): boolean { const result = super.push(chunkArg); if (!result && this.handleBackpressure) { @@ -161,9 +164,17 @@ export class SmartDuplex extends Duplex { try { const modifiedChunk = await this.writeFunction(chunk, tools); if (modifiedChunk) { - this.push(modifiedChunk); + this.push(modifiedChunk) + if (this.backpressured && this.handleBackpressure) { + this.once('drain', () => { + callback(); + }); + } else { + callback(); + } + } else { + callback(); } - callback(); } catch (err) { callback(err); }