2023-11-13 16:52:11 +00:00
|
|
|
import { tap, expect } from '@push.rocks/tapbundle';
|
|
|
|
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
|
|
|
|
|
|
|
|
tap.test('should run backpressure test', async (toolsArg) => {
|
|
|
|
const done = toolsArg.defer();
|
|
|
|
async function testBackpressure() {
|
|
|
|
const stream1 = new SmartDuplex({
|
|
|
|
name: 'stream1',
|
|
|
|
objectMode: true,
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
|
|
|
console.log(`processed chunk ${chunk} in stream 1`);
|
|
|
|
return chunk; // Fast processing
|
|
|
|
},
|
|
|
|
});
|
|
|
|
const stream2 = new SmartDuplex({
|
|
|
|
name: 'stream2',
|
|
|
|
objectMode: true,
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
|
|
|
console.log(`processed chunk ${chunk} in stream 2`);
|
2023-11-13 18:06:02 +00:00
|
|
|
// await tools.push(chunk);
|
2023-11-13 16:52:11 +00:00
|
|
|
return chunk;
|
|
|
|
},
|
|
|
|
}); // This stream processes data more slowly
|
|
|
|
const stream3 = new SmartDuplex({
|
|
|
|
objectMode: true,
|
|
|
|
name: 'stream3',
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 100)); // Slow processing
|
|
|
|
console.log(`processed chunk ${chunk} in stream 3`);
|
|
|
|
},
|
|
|
|
});
|
|
|
|
|
|
|
|
stream1.pipe(stream2).pipe(stream3);
|
|
|
|
|
|
|
|
let backpressured = false;
|
|
|
|
for (let i = 0; i < 200; i++) {
|
|
|
|
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
|
|
|
if (!canContinue) {
|
|
|
|
backpressured = true;
|
|
|
|
console.log(`Backpressure at chunk ${i}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stream1.end();
|
|
|
|
|
|
|
|
stream1.on('finish', () => {
|
|
|
|
console.log('Stream 1 finished processing.');
|
|
|
|
});
|
|
|
|
stream2.on('finish', () => {
|
|
|
|
console.log('Stream 2 finished processing.');
|
|
|
|
});
|
|
|
|
stream3.on('finish', () => {
|
|
|
|
console.log('Stream 3 finished processing.');
|
|
|
|
if (!backpressured) {
|
|
|
|
throw new Error('No backpressure was observed.');
|
|
|
|
} else {
|
|
|
|
done.resolve();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
testBackpressure();
|
|
|
|
await done.promise;
|
|
|
|
});
|
|
|
|
|
|
|
|
await tap.start();
|