import { expect, tap } from '@push.rocks/tapbundle'; import { WebDuplexStream } from '../ts_web/index.js'; // Helper: collect all chunks from a readable async function collectAll(reader: ReadableStreamDefaultReader): Promise { const results: T[] = []; while (true) { const { value, done } = await reader.read(); if (done) break; results.push(value); } return results; } // ============================================= // Basic transform // ============================================= tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => { const stream = new WebDuplexStream({ writeFunction: async (chunk, { push }) => { push(chunk * 2); }, }); const writer = stream.writable.getWriter(); const reader = stream.readable.getReader(); // Read and write concurrently to avoid backpressure deadlock const readPromise = collectAll(reader); await writer.write(5); await writer.write(10); await writer.close(); const results = await readPromise; expect(results).toContain(10); expect(results).toContain(20); }); // ============================================= // writeFunction return value // ============================================= tap.test('WebDuplexStream: should enqueue returned non-null values', async () => { const stream = new WebDuplexStream({ writeFunction: async (chunk) => { return chunk.toUpperCase(); }, }); const writer = stream.writable.getWriter(); const reader = stream.readable.getReader(); const readPromise = collectAll(reader); await writer.write('hello'); await writer.close(); const results = await readPromise; expect(results[0]).toEqual('HELLO'); }); // ============================================= // fromUInt8Array // ============================================= tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => { const data = new Uint8Array([1, 2, 3, 4, 5]); const stream = WebDuplexStream.fromUInt8Array(data); const reader = stream.readable.getReader(); const { value } = await reader.read(); expect(value).toBeTruthy(); expect(value.length).toEqual(5); }); // ============================================= // readFunction // ============================================= tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => { const stream = new WebDuplexStream({ readFunction: async (tools) => { await tools.write('chunk1'); await tools.write('chunk2'); tools.done(); }, writeFunction: async (chunk, { push }) => { push(chunk.toUpperCase()); }, }); const reader = stream.readable.getReader(); const results = await collectAll(reader); expect(results).toContain('CHUNK1'); expect(results).toContain('CHUNK2'); }); // ============================================= // finalFunction // ============================================= tap.test('WebDuplexStream: finalFunction should emit final data', async () => { const stream = new WebDuplexStream({ writeFunction: async (chunk) => { return chunk; }, finalFunction: async (tools) => { tools.push('final'); return undefined; }, }); const writer = stream.writable.getWriter(); const reader = stream.readable.getReader(); const readPromise = collectAll(reader); await writer.write('hello'); await writer.close(); const results = await readPromise; expect(results).toContain('hello'); expect(results).toContain('final'); }); // ============================================= // No writeFunction = passthrough // ============================================= tap.test('WebDuplexStream: no writeFunction should passthrough', async () => { const stream = new WebDuplexStream({}); const writer = stream.writable.getWriter(); const reader = stream.readable.getReader(); const readPromise = collectAll(reader); await writer.write('pass'); await writer.close(); const results = await readPromise; expect(results[0]).toEqual('pass'); }); export default tap.start();