From 00d1455367e37b6a8baa99585683638c4f5dbae0 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Mon, 13 Nov 2023 17:52:11 +0100 Subject: [PATCH] fix(core): update --- package.json | 1 - pnpm-lock.yaml | 3 -- test/assets/writabletext.txt | 2 + test/backpressure.ts | 62 ------------------------ test/test.backpressure.ts | 70 +++++++++++++++++++++++++++ ts/00_commitinfo_data.ts | 2 +- ts/smartstream.classes.smartduplex.ts | 3 +- 7 files changed, 75 insertions(+), 68 deletions(-) delete mode 100644 test/backpressure.ts create mode 100644 test/test.backpressure.ts diff --git a/package.json b/package.json index a2e0ea4..fec132f 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,6 @@ "@git.zone/tsbuild": "^2.1.66", "@git.zone/tsrun": "^1.2.44", "@git.zone/tstest": "^1.0.84", - "@push.rocks/smartdelay": "^3.0.5", "@push.rocks/smartfile": "^11.0.0", "@push.rocks/tapbundle": "^5.0.15", "@types/node": "^20.9.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ac925c6..27ad4d2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -25,9 +25,6 @@ devDependencies: '@git.zone/tstest': specifier: ^1.0.84 version: 1.0.84(@types/node@20.9.0)(sinon@17.0.1) - '@push.rocks/smartdelay': - specifier: ^3.0.5 - version: 3.0.5 '@push.rocks/smartfile': specifier: ^11.0.0 version: 11.0.0 diff --git a/test/assets/writabletext.txt b/test/assets/writabletext.txt index a3a236a..e922c03 100644 --- a/test/assets/writabletext.txt +++ b/test/assets/writabletext.txt @@ -46,3 +46,5 @@ hi+wow hi+wow hi+wow hi+wow +hi+wow +hi+wow diff --git a/test/backpressure.ts b/test/backpressure.ts deleted file mode 100644 index c5c7812..0000000 --- a/test/backpressure.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js'; - -import * as smartdelay from '@push.rocks/smartdelay'; - -async function testBackpressure() { - const stream1 = new SmartDuplex({ - name: 'stream1', - objectMode: true, - handleBackpressure: true, - writeFunction: async (chunk, tools) => { - console.log(`processed chunk ${chunk} in stream 1`); - return chunk; // Fast processing - } - }); - const stream2 = new SmartDuplex({ - name: 'stream2', - objectMode: true, - handleBackpressure: true, - writeFunction: async (chunk, tools) => { - await new Promise(resolve => setTimeout(resolve, 1)); // Slow processing - console.log(`processed chunk ${chunk} in stream 2`); - return chunk; - } - }); // This stream processes data more slowly - const stream3 = new SmartDuplex({ - objectMode: true, - name: 'stream3', - handleBackpressure: true, - writeFunction: async (chunk, tools) => { - await new Promise(resolve => setTimeout(resolve, 200)); // Slow processing - console.log(`processed chunk ${chunk} in stream 3`); - } - }); - - stream1.pipe(stream2).pipe(stream3); - - let backpressured = false; - for (let i = 0; i < 1000; i++) { - const canContinue = stream1.write(`Chunk ${i}`, 'utf8'); - if (!canContinue) { - backpressured = true; - console.log(`Backpressure at chunk ${i}`); - } - } - - stream1.end(); - - stream1.on('finish', () => { - console.log('Stream 1 finished processing.'); - }); - stream2.on('finish', () => { - console.log('Stream 2 finished processing.'); - }); - stream3.on('finish', () => { - console.log('Stream 3 finished processing.'); - if (!backpressured) { - console.log('No backpressure was observed.'); - } - }); -} - -testBackpressure(); \ No newline at end of file diff --git a/test/test.backpressure.ts b/test/test.backpressure.ts new file mode 100644 index 0000000..d071a47 --- /dev/null +++ b/test/test.backpressure.ts @@ -0,0 +1,70 @@ +import { tap, expect } from '@push.rocks/tapbundle'; +import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js'; + +tap.test('should run backpressure test', async (toolsArg) => { + const done = toolsArg.defer(); + async function testBackpressure() { + const stream1 = new SmartDuplex({ + name: 'stream1', + objectMode: true, + handleBackpressure: true, + writeFunction: async (chunk, tools) => { + await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing + console.log(`processed chunk ${chunk} in stream 1`); + return chunk; // Fast processing + }, + }); + const stream2 = new SmartDuplex({ + name: 'stream2', + objectMode: true, + handleBackpressure: true, + writeFunction: async (chunk, tools) => { + await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing + console.log(`processed chunk ${chunk} in stream 2`); + return chunk; + }, + }); // This stream processes data more slowly + const stream3 = new SmartDuplex({ + objectMode: true, + name: 'stream3', + handleBackpressure: true, + writeFunction: async (chunk, tools) => { + await new Promise((resolve) => setTimeout(resolve, 100)); // Slow processing + console.log(`processed chunk ${chunk} in stream 3`); + }, + }); + + stream1.pipe(stream2).pipe(stream3); + + let backpressured = false; + for (let i = 0; i < 200; i++) { + const canContinue = stream1.write(`Chunk ${i}`, 'utf8'); + if (!canContinue) { + backpressured = true; + console.log(`Backpressure at chunk ${i}`); + } + } + + stream1.end(); + + stream1.on('finish', () => { + console.log('Stream 1 finished processing.'); + }); + stream2.on('finish', () => { + console.log('Stream 2 finished processing.'); + }); + stream3.on('finish', () => { + console.log('Stream 3 finished processing.'); + if (!backpressured) { + throw new Error('No backpressure was observed.'); + } else { + done.resolve(); + } + }); + } + + testBackpressure(); + await done.promise; +}); + +await tap.start(); \ No newline at end of file diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 37e9019..95f92ab 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.20', + version: '3.0.21', description: 'simplifies access to node streams' } diff --git a/ts/smartstream.classes.smartduplex.ts b/ts/smartstream.classes.smartduplex.ts index 357617b..bdecf2c 100644 --- a/ts/smartstream.classes.smartduplex.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -51,8 +51,9 @@ export class SmartDuplex extends Duplex { } public async _read(size: number): Promise { - await this.backpressuredArray.waitForItems(); this.debugLog(`${this.options.name}: read was called`); + await this.backpressuredArray.waitForItems(); + this.debugLog(`${this.options.name}: successfully waited for items.`); if (this.options.readFunction) { await this.options.readFunction(); }