import { tap, expect } from '@push.rocks/tapbundle'; import { SmartDuplex } from '../ts/index.js'; tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => { const done = toolsArg.defer(); const stream1 = new SmartDuplex({ name: 'stream1', objectMode: true, writeFunction: async (chunk, tools) => { await new Promise((resolve) => setTimeout(resolve, 10)); return chunk; }, }); const stream2 = new SmartDuplex({ name: 'stream2', objectMode: true, writeFunction: async (chunk, tools) => { await new Promise((resolve) => setTimeout(resolve, 20)); await tools.push(chunk); }, }); const stream3 = new SmartDuplex({ objectMode: true, name: 'stream3', writeFunction: async (chunk, tools) => { await new Promise((resolve) => setTimeout(resolve, 200)); }, }); stream1.pipe(stream2).pipe(stream3); let backpressured = false; for (let i = 0; i < 200; i++) { const canContinue = stream1.write(`Chunk ${i}`, 'utf8'); if (!canContinue) { backpressured = true; } } stream1.end(); stream3.on('finish', () => { if (!backpressured) { throw new Error('No backpressure was observed.'); } else { done.resolve(); } }); await done.promise; }); export default tap.start();