diff --git a/changelog.md b/changelog.md index 174ac67..b21184f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2024-10-13 - 3.0.46 - fix(WebDuplexStream) +Fix errors in WebDuplexStream transformation and test logic + +- Corrected async handling in WebDuplexStream write function +- Fixed `WebDuplexStream` tests to properly handle asynchronous reading and writing + ## 2024-10-13 - 3.0.45 - fix(ts) Fixed formatting issues in SmartDuplex class diff --git a/test/test.ts_web.both.ts b/test/test.ts_web.both.ts index 952a6bc..c96e111 100644 --- a/test/test.ts_web.both.ts +++ b/test/test.ts_web.both.ts @@ -1,37 +1,37 @@ -import { expect, expectAsync, tap } from '@push.rocks/tapbundle'; +import { expect, tap } from '@push.rocks/tapbundle'; import * as webstream from '../ts_web/index.js'; -tap.test('WebDuplexStream', async (toolsArg) => { - const testDone = toolsArg.defer(); // Create a deferred object to control test completion. +tap.test('WebDuplexStream fromUInt8Array should read back the same Uint8Array', async () => { const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]); const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array); const reader = stream.readable.getReader(); let readUint8Array = new Uint8Array(); - reader.read().then(function processText({ done, value }) { - if (done) { - expect(readUint8Array).toEqual(inputUint8Array); - testDone.resolve(); // Correctly signal that the test is done. - return; + // Read from the stream + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (value) { + // Concatenate value to readUint8Array + const tempArray = new Uint8Array(readUint8Array.length + value.length); + tempArray.set(readUint8Array, 0); + tempArray.set(value, readUint8Array.length); + readUint8Array = tempArray; } - readUint8Array = new Uint8Array([...readUint8Array, ...value]); - return reader.read().then(processText); - }); + } - return testDone.promise; // Return the promise to properly wait for the test to complete. + expect(readUint8Array).toEqual(inputUint8Array); }); - -tap.test('should handle transform with a write function', async (toolsArg) => { - const testDone = toolsArg.defer(); +tap.test('WebDuplexStream should handle transform with a write function', async () => { const input = [1, 2, 3, 4, 5]; const expectedOutput = [2, 4, 6, 8, 10]; const transformStream = new webstream.WebDuplexStream({ - writeFunction: (chunk, { push }) => { - push(chunk * 2); // Push the doubled number into the stream - return Promise.resolve(); // Resolve the promise immediately + writeFunction: async (chunk, { push }) => { + // Push the doubled number into the stream + push(chunk * 2); }, }); @@ -40,31 +40,28 @@ tap.test('should handle transform with a write function', async (toolsArg) => { const output: number[] = []; - // Process the text and resolve the test once done. - const processText = async ({ done, value }) => { - if (done) { - expect(output).toEqual(expectedOutput); - testDone.resolve(); // Resolve the deferred test once all values have been read. - return; + // Read from the stream asynchronously + const readPromise = (async () => { + while (true) { + const { value, done } = await readableStream.read(); + if (done) break; + if (value !== undefined) { + output.push(value); + } } - if (value !== undefined) { - output.push(value); - } - // Continue reading and processing. - await readableStream.read().then(processText); - }; + })(); - // Start the read process before writing to the stream. - readableStream.read().then(processText); - - // Sequentially write to the stream and close when done. + // Write to the stream for (const num of input) { await writableStream.write(num); } await writableStream.close(); - return testDone.promise; // This will wait until the testDone is resolved before completing the test. + // Wait for the reading to complete + await readPromise; + + // Assert that the output matches the expected transformed data + expect(output).toEqual(expectedOutput); }); - -tap.start(); +tap.start(); \ No newline at end of file diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 6e335f7..562916a 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.45', + version: '3.0.46', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 6e335f7..562916a 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.45', + version: '3.0.46', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts_web/classes.webduplexstream.ts b/ts_web/classes.webduplexstream.ts index d6e7e7e..db06a48 100644 --- a/ts_web/classes.webduplexstream.ts +++ b/ts_web/classes.webduplexstream.ts @@ -63,6 +63,7 @@ export class WebDuplexStream extends TransformStrea options: WebDuplexStreamOptions; constructor(optionsArg: WebDuplexStreamOptions) { + // here we call into the official web stream api super({ async transform(chunk, controller) { // Transformation logic remains unchanged @@ -72,15 +73,14 @@ export class WebDuplexStream extends TransformStrea push: (pushArg: TOutput) => controller.enqueue(pushArg), }; - optionsArg.writeFunction(chunk, tools) - .then(writeReturnChunk => { - // the write return chunk is optional - // just in case the write function returns something other than void. - if (writeReturnChunk) { - controller.enqueue(writeReturnChunk); - } - }) - .catch(err => controller.error(err)); + try { + const writeReturnChunk = await optionsArg.writeFunction(chunk, tools); + if (writeReturnChunk) { // return chunk is optional + controller.enqueue(writeReturnChunk); + } + } catch (err) { + controller.error(err); + } } else { controller.error(new Error('No write function provided')); }