import { expect, tap } from '@push.rocks/tapbundle'; import * as fs from 'fs'; import * as smartstream from '../ts/index.js'; import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // ============================================= // Constructor // ============================================= tap.test('SmartDuplex: should construct with no options', async () => { const duplex = new SmartDuplex(); expect(duplex).toBeInstanceOf(SmartDuplex); }); tap.test('SmartDuplex: should construct with options', async () => { const duplex = new SmartDuplex({ objectMode: true, writeFunction: async (chunk) => chunk, }); expect(duplex).toBeInstanceOf(SmartDuplex); }); // ============================================= // fromBuffer // ============================================= tap.test('SmartDuplex: should create from a Buffer', async () => { const bufferData = Buffer.from('This is a test buffer'); const stream = SmartDuplex.fromBuffer(bufferData, {}); let receivedData = Buffer.alloc(0); return new Promise((resolve) => { stream.on('data', (chunk: Buffer) => { receivedData = Buffer.concat([receivedData, chunk]); }); stream.on('end', () => { expect(receivedData.toString()).toEqual(bufferData.toString()); resolve(); }); }); }); // ============================================= // writeFunction // ============================================= tap.test('SmartDuplex: should transform chunks via writeFunction', async (tools) => { const results: string[] = []; const transform = new SmartDuplex({ objectMode: true, writeFunction: async (chunk) => { return chunk.toUpperCase(); }, }); const done = tools.defer(); transform.on('data', (chunk: string) => { results.push(chunk); }); transform.on('end', () => { expect(results).toContain('HELLO'); expect(results).toContain('WORLD'); done.resolve(); }); transform.write('hello'); transform.write('world'); transform.end(); await done.promise; }); tap.test('SmartDuplex: writeFunction returning undefined should not push', async (tools) => { const results: any[] = []; const transform = new SmartDuplex({ objectMode: true, writeFunction: async () => { return undefined; }, }); const done = tools.defer(); transform.on('data', (chunk: any) => { results.push(chunk); }); transform.on('end', () => { expect(results.length).toEqual(0); done.resolve(); }); transform.write('hello'); transform.end(); await done.promise; }); // ============================================= // tools.push — multiple outputs // ============================================= tap.test('SmartDuplex: should emit multiple chunks via tools.push', async (tools) => { const results: string[] = []; const splitter = new SmartDuplex({ objectMode: true, writeFunction: async (chunk, streamTools) => { const words = chunk.split(' '); for (const word of words) { await streamTools.push(word); } }, }); const done = tools.defer(); splitter.on('data', (chunk: string) => results.push(chunk)); splitter.on('end', () => { expect(results).toContain('hello'); expect(results).toContain('beautiful'); expect(results).toContain('world'); done.resolve(); }); splitter.write('hello beautiful world'); splitter.end(); await done.promise; }); // ============================================= // finalFunction // ============================================= tap.test('SmartDuplex: should emit final chunk via finalFunction', async (tools) => { const results: string[] = []; let count = 0; const aggregator = new SmartDuplex({ objectMode: true, writeFunction: async () => { count++; return undefined; }, finalFunction: async () => { return `total: ${count}`; }, }); const done = tools.defer(); aggregator.on('data', (chunk: string) => results.push(chunk)); aggregator.on('end', () => { expect(results.length).toEqual(1); expect(results[0]).toEqual('total: 2'); done.resolve(); }); aggregator.write('a'); aggregator.write('b'); aggregator.end(); await done.promise; }); tap.test('SmartDuplex: finalFunction can push multiple chunks via tools.push', async (tools) => { const results: string[] = []; const stream = new SmartDuplex({ objectMode: true, writeFunction: async (chunk) => chunk, finalFunction: async (streamTools) => { await streamTools.push('final1'); await streamTools.push('final2'); }, }); const done = tools.defer(); stream.on('data', (chunk: string) => results.push(chunk)); stream.on('end', () => { expect(results).toContain('hello'); expect(results).toContain('final1'); expect(results).toContain('final2'); done.resolve(); }); stream.write('hello'); stream.end(); await done.promise; }); // ============================================= // truncate // ============================================= tap.test('SmartDuplex: should truncate stream early', async (tools) => { const results: string[] = []; const limiter = new SmartDuplex({ objectMode: true, writeFunction: async (chunk, streamTools) => { if (chunk === 'STOP') { streamTools.truncate(); return undefined; } return chunk; }, }); const done = tools.defer(); limiter.on('data', (chunk: string) => results.push(chunk)); limiter.on('end', () => { expect(results).toContain('a'); expect(results).toContain('b'); expect(results).not.toContain('STOP'); done.resolve(); }); limiter.write('a'); limiter.write('b'); // Write STOP on next tick to allow previous writes to flush process.nextTick(() => { limiter.write('STOP'); }); await done.promise; }); // ============================================= // Error handling // ============================================= tap.test('SmartDuplex: should emit error when writeFunction throws', async (tools) => { const stream = new SmartDuplex({ objectMode: true, writeFunction: async () => { throw new Error('write error'); }, }); const done = tools.defer(); stream.on('error', (err) => { expect(err.message).toEqual('write error'); done.resolve(); }); stream.write('test'); await done.promise; }); tap.test('SmartDuplex: should error when no writeFunction and data is written', async (tools) => { const stream = new SmartDuplex({ objectMode: true, }); const done = tools.defer(); stream.on('error', (err) => { expect(err.message).toEqual('No stream function provided'); done.resolve(); }); stream.write('test'); await done.promise; }); // ============================================= // fromWebReadableStream // ============================================= tap.test('SmartDuplex: should create from a Web ReadableStream', async (tools) => { const chunks = ['hello', 'world', 'foo']; const webReadable = new ReadableStream({ start(controller) { for (const chunk of chunks) { controller.enqueue(chunk); } controller.close(); } }); const duplex = SmartDuplex.fromWebReadableStream(webReadable); const results: string[] = []; const done = tools.defer(); duplex.on('data', (chunk: string) => results.push(chunk)); duplex.on('end', () => { expect(results).toEqual(chunks); done.resolve(); }); await done.promise; }); // ============================================= // getWebStreams // ============================================= tap.test('SmartDuplex: should provide web streams via getWebStreams()', async () => { const duplex = new SmartDuplex({ objectMode: true, writeFunction: async (chunk) => { return chunk.toUpperCase(); }, }); const { readable, writable } = await duplex.getWebStreams(); const writer = writable.getWriter(); const reader = readable.getReader(); await writer.write('hello'); await writer.write('world'); await writer.close(); const results: string[] = []; while (true) { const { value, done } = await reader.read(); if (done) break; results.push(value); } expect(results).toContain('HELLO'); expect(results).toContain('WORLD'); }); // ============================================= // Debug mode // ============================================= tap.test('SmartDuplex: debug mode should not crash', async (tools) => { const stream = new SmartDuplex({ name: 'DebugStream', debug: true, objectMode: true, writeFunction: async (chunk) => chunk, }); const done = tools.defer(); stream.on('data', () => {}); stream.on('end', () => done.resolve()); stream.write('test'); stream.end(); await done.promise; }); // ============================================= // Pipe with file read // ============================================= tap.test('SmartDuplex: should handle a read stream pipeline', async () => { const streamWrapper = new smartstream.StreamWrapper([ fs.createReadStream('./test/assets/readabletext.txt'), new smartstream.SmartDuplex({ writeFunction: async (chunkStringArg: Buffer, streamTools) => { const result = chunkStringArg.toString().substr(0, 100); streamTools.push('wow =========== \n'); return Buffer.from(result); }, finalFunction: async () => { return Buffer.from('this is the end'); }, }), new smartstream.SmartDuplex({ writeFunction: async (chunkStringArg) => { // consume data }, finalFunction: async (streamTools) => { streamTools.push(null); }, }) ]); await streamWrapper.run(); }); export default tap.start();