fix(core): update

This commit is contained in:
Philipp Kunz 2023-11-13 17:52:11 +01:00
parent 116a281c6c
commit 00d1455367
7 changed files with 75 additions and 68 deletions

View File

@ -25,7 +25,6 @@
"@git.zone/tsbuild": "^2.1.66",
"@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.84",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartfile": "^11.0.0",
"@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.9.0"

View File

@ -25,9 +25,6 @@ devDependencies:
'@git.zone/tstest':
specifier: ^1.0.84
version: 1.0.84(@types/node@20.9.0)(sinon@17.0.1)
'@push.rocks/smartdelay':
specifier: ^3.0.5
version: 3.0.5
'@push.rocks/smartfile':
specifier: ^11.0.0
version: 11.0.0

View File

@ -46,3 +46,5 @@ hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow

View File

@ -1,62 +0,0 @@
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay';
async function testBackpressure() {
const stream1 = new SmartDuplex({
name: 'stream1',
objectMode: true,
handleBackpressure: true,
writeFunction: async (chunk, tools) => {
console.log(`processed chunk ${chunk} in stream 1`);
return chunk; // Fast processing
}
});
const stream2 = new SmartDuplex({
name: 'stream2',
objectMode: true,
handleBackpressure: true,
writeFunction: async (chunk, tools) => {
await new Promise(resolve => setTimeout(resolve, 1)); // Slow processing
console.log(`processed chunk ${chunk} in stream 2`);
return chunk;
}
}); // This stream processes data more slowly
const stream3 = new SmartDuplex({
objectMode: true,
name: 'stream3',
handleBackpressure: true,
writeFunction: async (chunk, tools) => {
await new Promise(resolve => setTimeout(resolve, 200)); // Slow processing
console.log(`processed chunk ${chunk} in stream 3`);
}
});
stream1.pipe(stream2).pipe(stream3);
let backpressured = false;
for (let i = 0; i < 1000; i++) {
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
if (!canContinue) {
backpressured = true;
console.log(`Backpressure at chunk ${i}`);
}
}
stream1.end();
stream1.on('finish', () => {
console.log('Stream 1 finished processing.');
});
stream2.on('finish', () => {
console.log('Stream 2 finished processing.');
});
stream3.on('finish', () => {
console.log('Stream 3 finished processing.');
if (!backpressured) {
console.log('No backpressure was observed.');
}
});
}
testBackpressure();

70
test/test.backpressure.ts Normal file
View File

@ -0,0 +1,70 @@
import { tap, expect } from '@push.rocks/tapbundle';
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
tap.test('should run backpressure test', async (toolsArg) => {
const done = toolsArg.defer();
async function testBackpressure() {
const stream1 = new SmartDuplex({
name: 'stream1',
objectMode: true,
handleBackpressure: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
console.log(`processed chunk ${chunk} in stream 1`);
return chunk; // Fast processing
},
});
const stream2 = new SmartDuplex({
name: 'stream2',
objectMode: true,
handleBackpressure: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
console.log(`processed chunk ${chunk} in stream 2`);
return chunk;
},
}); // This stream processes data more slowly
const stream3 = new SmartDuplex({
objectMode: true,
name: 'stream3',
handleBackpressure: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 100)); // Slow processing
console.log(`processed chunk ${chunk} in stream 3`);
},
});
stream1.pipe(stream2).pipe(stream3);
let backpressured = false;
for (let i = 0; i < 200; i++) {
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
if (!canContinue) {
backpressured = true;
console.log(`Backpressure at chunk ${i}`);
}
}
stream1.end();
stream1.on('finish', () => {
console.log('Stream 1 finished processing.');
});
stream2.on('finish', () => {
console.log('Stream 2 finished processing.');
});
stream3.on('finish', () => {
console.log('Stream 3 finished processing.');
if (!backpressured) {
throw new Error('No backpressure was observed.');
} else {
done.resolve();
}
});
}
testBackpressure();
await done.promise;
});
await tap.start();

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.20',
version: '3.0.21',
description: 'simplifies access to node streams'
}

View File

@ -51,8 +51,9 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
}
public async _read(size: number): Promise<void> {
await this.backpressuredArray.waitForItems();
this.debugLog(`${this.options.name}: read was called`);
await this.backpressuredArray.waitForItems();
this.debugLog(`${this.options.name}: successfully waited for items.`);
if (this.options.readFunction) {
await this.options.readFunction();
}