From 1a92aa663025987c20584f55ebfe9cf85ac3cf36 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Mon, 14 Oct 2024 01:31:58 +0200 Subject: [PATCH] fix(core): Fix incorrect backpressure logic in VirtualStream class --- changelog.md | 6 ++++++ ts/00_commitinfo_data.ts | 2 +- ts/classes.virtualstream.ts | 15 +++++++++++---- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/changelog.md b/changelog.md index 066e461..e5e0179 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2024-10-14 - 3.1.2 - fix(core) +Fix incorrect backpressure logic in VirtualStream class + +- Corrected the logic for determining backpressure status by checking the available space in the receiveBackpressuredArray. +- Introduced a looping mechanism to wait when the other side is backpressured before sending more data. + ## 2024-10-14 - 3.1.1 - fix(virtualstream) Fix handling of virtual streams for proper shutdown diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 6150e16..5749d3b 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@api.global/typedrequest', - version: '3.1.1', + version: '3.1.2', description: 'A TypeScript library for making typed requests towards APIs, including facilities for handling requests, routing, and virtual stream handling.' } diff --git a/ts/classes.virtualstream.ts b/ts/classes.virtualstream.ts index 37e4179..2d887a8 100644 --- a/ts/classes.virtualstream.ts +++ b/ts/classes.virtualstream.ts @@ -162,7 +162,7 @@ export class VirtualStream implements plugins.typedRequestInterf cycle: 'request', mainPurpose: 'feedback', next: this.sendBackpressuredArray.data.length > 0, - backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(), + backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }, response: null, }).catch(() => { @@ -178,6 +178,12 @@ export class VirtualStream implements plugins.typedRequestInterf // do work loop while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) { + if (otherSideIsBackpressured) { + while (otherSideIsBackpressured) { + await plugins.smartdelay.delayFor(50); + await getFeedback(); + } + } let dataArg: typeof this.sendBackpressuredArray.data[0]; if (this.sendBackpressuredArray.data.length > 0) { dataArg = this.sendBackpressuredArray.shift(); @@ -204,7 +210,8 @@ export class VirtualStream implements plugins.typedRequestInterf if (streamTr && streamTr.response && streamTr.response.chunkData) { this.receiveBackpressuredArray.push(streamTr.response.chunkData); } - thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable(); + otherSideIsBackpressured = streamTr && streamTr.response && streamTr.response.backpressure; + thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable(); // lets care about looping otherSideHasNext = streamTr && streamTr.response && streamTr.response.next; @@ -235,7 +242,7 @@ export class VirtualStream implements plugins.typedRequestInterf mainPurpose: 'keepAlive', keepAlive: this.keepAlive, next: this.sendBackpressuredArray.data.length > 0, - backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(), + backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }; } @@ -247,7 +254,7 @@ export class VirtualStream implements plugins.typedRequestInterf cycle: 'response', mainPurpose: 'feedback', next: this.sendBackpressuredArray.data.length > 0, - backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(), + backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }; }