import { expect, tap } from '@push.rocks/tapbundle'; import * as fs from 'fs'; import { StreamIntake, SmartDuplex } from '../ts/index.js'; import * as stream from 'stream'; // ============================================= // Basic StreamIntake // ============================================= tap.test('StreamIntake: should push data and signal end', async (tools) => { const intake = new StreamIntake(); const results: string[] = []; intake.pipe( new SmartDuplex({ objectMode: true, writeFunction: async (chunk) => { results.push(chunk); return chunk; }, }) ); const done = tools.defer(); let counter = 0; intake.pushNextObservable.subscribe(() => { if (counter < 5) { counter++; intake.pushData(`item-${counter}`); } else { intake.signalEnd(); done.resolve(); } }); await done.promise; // Give streams time to flush await new Promise((resolve) => setTimeout(resolve, 100)); expect(results.length).toBeGreaterThan(0); }); tap.test('StreamIntake: should pipe to a writable', async (tools) => { const intake = new StreamIntake(); intake .pipe( new SmartDuplex({ objectMode: true, writeFunction: async (chunk: string) => { return chunk; }, }) ) .pipe(fs.createWriteStream('./test/assets/writabletext.txt')); const done = tools.defer(); let counter = 0; intake.pushNextObservable.subscribe(() => { if (counter < 10) { counter++; intake.pushData('data\n'); } else { intake.signalEnd(); done.resolve(); } }); await done.promise; }); // ============================================= // StreamIntake.fromStream (Node Readable) // ============================================= tap.test('StreamIntake: fromStream should wrap a Node readable', async (tools) => { const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt'); const intake = await StreamIntake.fromStream(nodeReadable); const chunks: Buffer[] = []; const done = tools.defer(); intake.on('data', (chunk: Buffer) => { chunks.push(chunk); }); intake.on('end', () => { expect(chunks.length).toBeGreaterThan(0); const content = Buffer.concat(chunks).toString(); expect(content.length).toBeGreaterThan(0); done.resolve(); }); await done.promise; }); // ============================================= // StreamIntake.fromStream (Web ReadableStream) // ============================================= tap.test('StreamIntake: fromStream should wrap a Web ReadableStream', async (tools) => { const data = ['chunk1', 'chunk2', 'chunk3']; const webReadable = new ReadableStream({ start(controller) { for (const item of data) { controller.enqueue(item); } controller.close(); }, }); const intake = await StreamIntake.fromStream(webReadable); const results: string[] = []; const done = tools.defer(); intake.on('data', (chunk: string) => { results.push(chunk); }); intake.on('end', () => { expect(results).toEqual(data); done.resolve(); }); await done.promise; }); export default tap.start();