From 9421c652a2e465c61705747a05c27cb99f73a538 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Mon, 13 Nov 2023 18:19:11 +0100 Subject: [PATCH] fix(core): update --- ts/00_commitinfo_data.ts | 2 +- ts/smartstream.classes.smartduplex.ts | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 95f92ab..cadfcb7 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.21', + version: '3.0.22', description: 'simplifies access to node streams' } diff --git a/ts/smartstream.classes.smartduplex.ts b/ts/smartstream.classes.smartduplex.ts index bdecf2c..3b4b951 100644 --- a/ts/smartstream.classes.smartduplex.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -3,7 +3,7 @@ import { Duplex, type DuplexOptions } from 'stream'; export interface IStreamTools { truncate: () => void; - push: (pipeObject: any) => void; + push: (pipeObject: any) => Promise; } export interface IStreamWriteFunction { @@ -80,8 +80,13 @@ export class SmartDuplex extends Duplex { isTruncated = true; callback(); }, - push: (pushArg: TOutput) => { - this.backpressuredArray.push(pushArg); + push: async (pushArg: TOutput) => { + const canPushMore = this.backpressuredArray.push(pushArg); + if (!canPushMore) { + this.debugLog(`${this.options.name}: cannot push more`); + await this.backpressuredArray.waitForSpace(); + this.debugLog(`${this.options.name}: can push more again`); + } }, }; @@ -93,12 +98,7 @@ export class SmartDuplex extends Duplex { return; } if (modifiedChunk) { - const canPushMore = this.backpressuredArray.push(modifiedChunk); - if (!canPushMore) { - this.debugLog(`${this.options.name}: cannot push more`); - await this.backpressuredArray.waitForSpace(); - this.debugLog(`${this.options.name}: can push more again`); - } + await tools.push(modifiedChunk); } callback(); writeDeferred.resolve(); @@ -115,7 +115,9 @@ export class SmartDuplex extends Duplex { if (this.options.finalFunction) { const tools: IStreamTools = { truncate: () => callback(), - push: (pipeObject) => this.push(pipeObject), + push: async (pipeObject) => { + this.push(pipeObject); + }, }; try {