145 lines
4.1 KiB
TypeScript
145 lines
4.1 KiB
TypeScript
import { expect, tap } from '@push.rocks/tapbundle';
|
|
import { WebDuplexStream } from '../ts_web/index.js';
|
|
|
|
// Helper: collect all chunks from a readable
|
|
async function collectAll<T>(reader: ReadableStreamDefaultReader<T>): Promise<T[]> {
|
|
const results: T[] = [];
|
|
while (true) {
|
|
const { value, done } = await reader.read();
|
|
if (done) break;
|
|
results.push(value);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
// =============================================
|
|
// Basic transform
|
|
// =============================================
|
|
|
|
tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => {
|
|
const stream = new WebDuplexStream<number, number>({
|
|
writeFunction: async (chunk, { push }) => {
|
|
push(chunk * 2);
|
|
},
|
|
});
|
|
|
|
const writer = stream.writable.getWriter();
|
|
const reader = stream.readable.getReader();
|
|
|
|
// Read and write concurrently to avoid backpressure deadlock
|
|
const readPromise = collectAll(reader);
|
|
await writer.write(5);
|
|
await writer.write(10);
|
|
await writer.close();
|
|
const results = await readPromise;
|
|
|
|
expect(results).toContain(10);
|
|
expect(results).toContain(20);
|
|
});
|
|
|
|
// =============================================
|
|
// writeFunction return value
|
|
// =============================================
|
|
|
|
tap.test('WebDuplexStream: should enqueue returned non-null values', async () => {
|
|
const stream = new WebDuplexStream<string, string>({
|
|
writeFunction: async (chunk) => {
|
|
return chunk.toUpperCase();
|
|
},
|
|
});
|
|
|
|
const writer = stream.writable.getWriter();
|
|
const reader = stream.readable.getReader();
|
|
|
|
const readPromise = collectAll(reader);
|
|
await writer.write('hello');
|
|
await writer.close();
|
|
const results = await readPromise;
|
|
|
|
expect(results[0]).toEqual('HELLO');
|
|
});
|
|
|
|
// =============================================
|
|
// fromUInt8Array
|
|
// =============================================
|
|
|
|
tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => {
|
|
const data = new Uint8Array([1, 2, 3, 4, 5]);
|
|
const stream = WebDuplexStream.fromUInt8Array(data);
|
|
const reader = stream.readable.getReader();
|
|
|
|
const { value } = await reader.read();
|
|
expect(value).toBeTruthy();
|
|
expect(value.length).toEqual(5);
|
|
});
|
|
|
|
// =============================================
|
|
// readFunction
|
|
// =============================================
|
|
|
|
tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => {
|
|
const stream = new WebDuplexStream<string, string>({
|
|
readFunction: async (tools) => {
|
|
await tools.write('chunk1');
|
|
await tools.write('chunk2');
|
|
tools.done();
|
|
},
|
|
writeFunction: async (chunk, { push }) => {
|
|
push(chunk.toUpperCase());
|
|
},
|
|
});
|
|
|
|
const reader = stream.readable.getReader();
|
|
const results = await collectAll(reader);
|
|
|
|
expect(results).toContain('CHUNK1');
|
|
expect(results).toContain('CHUNK2');
|
|
});
|
|
|
|
// =============================================
|
|
// finalFunction
|
|
// =============================================
|
|
|
|
tap.test('WebDuplexStream: finalFunction should emit final data', async () => {
|
|
const stream = new WebDuplexStream<string, string>({
|
|
writeFunction: async (chunk) => {
|
|
return chunk;
|
|
},
|
|
finalFunction: async (tools) => {
|
|
tools.push('final');
|
|
return undefined;
|
|
},
|
|
});
|
|
|
|
const writer = stream.writable.getWriter();
|
|
const reader = stream.readable.getReader();
|
|
|
|
const readPromise = collectAll(reader);
|
|
await writer.write('hello');
|
|
await writer.close();
|
|
const results = await readPromise;
|
|
|
|
expect(results).toContain('hello');
|
|
expect(results).toContain('final');
|
|
});
|
|
|
|
// =============================================
|
|
// No writeFunction = passthrough
|
|
// =============================================
|
|
|
|
tap.test('WebDuplexStream: no writeFunction should passthrough', async () => {
|
|
const stream = new WebDuplexStream<string, string>({});
|
|
|
|
const writer = stream.writable.getWriter();
|
|
const reader = stream.readable.getReader();
|
|
|
|
const readPromise = collectAll(reader);
|
|
await writer.write('pass');
|
|
await writer.close();
|
|
const results = await readPromise;
|
|
|
|
expect(results[0]).toEqual('pass');
|
|
});
|
|
|
|
export default tap.start();
|