From 7b1d2199e9e3a5eb88a72482fa3569fae6b3dce9 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Sun, 13 Oct 2024 20:20:31 +0200 Subject: [PATCH] fix(WebDuplexStream): Improved read/write interface and error handling in WebDuplexStream --- changelog.md | 8 ++ ts/00_commitinfo_data.ts | 2 +- ts_web/00_commitinfo_data.ts | 2 +- ts_web/classes.webduplexstream.ts | 193 +++++++++++++----------------- 4 files changed, 96 insertions(+), 109 deletions(-) diff --git a/changelog.md b/changelog.md index 1f0e2f2..4492c31 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2024-10-13 - 3.1.1 - fix(WebDuplexStream) +Improved read/write interface and error handling in WebDuplexStream + +- Enhanced the IStreamToolsRead and IStreamToolsWrite interfaces for better Promise handling +- Refined readFunction and writeFunction handling to accommodate asynchronous data processing and error propagation +- Added internal _startReading method to facilitate initial data handling if readFunction is present +- Maintained backward compatibility while ensuring data continuity when no writeFunction is specified + ## 2024-10-13 - 3.1.0 - feat(core) Add support for creating Web ReadableStream from a file diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 2853522..3de2185 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.1.0', + version: '3.1.1', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 2853522..3de2185 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.1.0', + version: '3.1.1', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts_web/classes.webduplexstream.ts b/ts_web/classes.webduplexstream.ts index db06a48..9a62ccc 100644 --- a/ts_web/classes.webduplexstream.ts +++ b/ts_web/classes.webduplexstream.ts @@ -1,25 +1,22 @@ import * as plugins from './plugins.js'; - // ======================================== -// READ +// Interfaces for Read functionality // ======================================== export interface IStreamToolsRead { done: () => void; - write: (writeArg: TInput) => void; + write: (writeArg: TInput) => Promise; } /** - * the read function is called anytime - * -> the WebDuplexStream is being read from - * and at the same time if nothing is enqueued + * The read function is called when data needs to be read into the stream. */ export interface IStreamReadFunction { (toolsArg: IStreamToolsRead): Promise; } // ======================================== -// WRITE +// Interfaces for Write functionality // ======================================== export interface IStreamToolsWrite { truncate: () => void; @@ -27,15 +24,14 @@ export interface IStreamToolsWrite { } /** - * the write function can return something. - * It is called anytime a chunk is written to the stream. + * The write function is called whenever a chunk is written to the stream. */ export interface IStreamWriteFunction { (chunkArg: TInput, toolsArg: IStreamToolsWrite): Promise; } export interface IStreamFinalFunction { - (toolsArg: IStreamToolsWrite): Promise; + (toolsArg: IStreamToolsWrite): Promise; } export interface WebDuplexStreamOptions { @@ -45,12 +41,90 @@ export interface WebDuplexStreamOptions { } export class WebDuplexStream extends TransformStream { + // INSTANCE + options: WebDuplexStreamOptions; + + constructor(optionsArg: WebDuplexStreamOptions) { + super({ + async start(controller) { + // Optionally initialize any state here + }, + async transform(chunk, controller) { + if (optionsArg?.writeFunction) { + const tools: IStreamToolsWrite = { + truncate: () => controller.terminate(), + push: (pushArg: TOutput) => controller.enqueue(pushArg), + }; + + try { + const writeReturnChunk = await optionsArg.writeFunction(chunk, tools); + if (writeReturnChunk !== undefined && writeReturnChunk !== null) { + controller.enqueue(writeReturnChunk); + } + } catch (err) { + controller.error(err); + } + } else { + // If no writeFunction is provided, pass the chunk through + controller.enqueue(chunk as unknown as TOutput); + } + }, + async flush(controller) { + if (optionsArg?.finalFunction) { + const tools: IStreamToolsWrite = { + truncate: () => controller.terminate(), + push: (pushArg) => controller.enqueue(pushArg), + }; + + try { + const finalChunk = await optionsArg.finalFunction(tools); + if (finalChunk) { + controller.enqueue(finalChunk); + } + } catch (err) { + controller.error(err); + } finally { + controller.terminate(); + } + } else { + controller.terminate(); + } + }, + }); + + this.options = optionsArg; + + // Start producing data if readFunction is provided + if (this.options.readFunction) { + this._startReading(); + } + } + + private async _startReading() { + const writable = this.writable; + const writer = writable.getWriter(); + + const tools: IStreamToolsRead = { + done: () => writer.close(), + write: async (writeArg) => await writer.write(writeArg), + }; + + try { + await this.options.readFunction(tools); + } catch (err) { + writer.abort(err); + } finally { + writer.releaseLock(); + } + } + + // Static method example (adjust as needed) static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream { const stream = new WebDuplexStream({ writeFunction: async (chunk, { push }) => { push(chunk); // Directly push the chunk as is return null; - } + }, }); const writer = stream.writable.getWriter(); @@ -58,99 +132,4 @@ export class WebDuplexStream extends TransformStrea return stream; } - - // INSTANCE - options: WebDuplexStreamOptions; - - constructor(optionsArg: WebDuplexStreamOptions) { - // here we call into the official web stream api - super({ - async transform(chunk, controller) { - // Transformation logic remains unchanged - if (optionsArg?.writeFunction) { - const tools: IStreamToolsWrite = { - truncate: () => controller.terminate(), - push: (pushArg: TOutput) => controller.enqueue(pushArg), - }; - - try { - const writeReturnChunk = await optionsArg.writeFunction(chunk, tools); - if (writeReturnChunk) { // return chunk is optional - controller.enqueue(writeReturnChunk); - } - } catch (err) { - controller.error(err); - } - } else { - controller.error(new Error('No write function provided')); - } - }, - async flush(controller) { - // Flush logic remains unchanged - if (optionsArg?.finalFunction) { - const tools: IStreamToolsWrite = { - truncate: () => controller.terminate(), - push: (pipeObject) => controller.enqueue(pipeObject), - }; - - optionsArg.finalFunction(tools) - .then(finalChunk => { - if (finalChunk) { - controller.enqueue(finalChunk); - } - }) - .catch(err => controller.error(err)) - .finally(() => controller.terminate()); - } else { - controller.terminate(); - } - } - }); - - this.options = optionsArg; - } - - // Method to create a custom readable stream that integrates the readFunction - // readFunction is executed whenever the stream is being read from and nothing is enqueued - getCustomReadableStream() { - const readableStream = this.readable; - const options = this.options; - const customReadable = new ReadableStream({ - async pull(controller) { - const reader = readableStream.getReader(); - - // Check the current state of the original stream - const { value, done } = await reader.read(); - reader.releaseLock(); - - if (done) { - // If the original stream is done, close the custom readable stream - controller.close(); - } else { - if (value) { - // If there is data in the original stream, enqueue it and do not execute the readFunction - controller.enqueue(value); - } else if (options.readFunction) { - // If the original stream is empty, execute the readFunction and read again - await options.readFunction({ - done: () => controller.close(), - write: (writeArg) => controller.enqueue(writeArg), - }); - - const newReader = readableStream.getReader(); - const { value: newValue, done: newDone } = await newReader.read(); - newReader.releaseLock(); - - if (newDone) { - controller.close(); - } else { - controller.enqueue(newValue); - } - } - } - } - }); - - return customReadable; - } -} +} \ No newline at end of file