import * as plugins from './plugins.js'; // ======================================== // READ // ======================================== export interface IStreamToolsRead { done: () => void; write: (writeArg: TInput) => void; } /** * the read function is called anytime * -> the WebDuplexStream is being read from * and at the same time if nothing is enqueued */ export interface IStreamReadFunction { (toolsArg: IStreamToolsRead): Promise; } // ======================================== // WRITE // ======================================== export interface IStreamToolsWrite { truncate: () => void; push: (pushArg: TOutput) => void; } /** * the write function can return something. * It is called anytime a chunk is written to the stream. */ export interface IStreamWriteFunction { (chunkArg: TInput, toolsArg: IStreamToolsWrite): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamToolsWrite): Promise; } export interface WebDuplexStreamOptions { readFunction?: IStreamReadFunction; writeFunction?: IStreamWriteFunction; finalFunction?: IStreamFinalFunction; } export class WebDuplexStream extends TransformStream { static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream { const stream = new WebDuplexStream({ writeFunction: async (chunk, { push }) => { push(chunk); // Directly push the chunk as is return null; } }); const writer = stream.writable.getWriter(); writer.write(uint8Array).then(() => writer.close()); return stream; } // INSTANCE options: WebDuplexStreamOptions; constructor(optionsArg: WebDuplexStreamOptions) { super({ async transform(chunk, controller) { // Transformation logic remains unchanged if (optionsArg?.writeFunction) { const tools: IStreamToolsWrite = { truncate: () => controller.terminate(), push: (pushArg: TOutput) => controller.enqueue(pushArg), }; optionsArg.writeFunction(chunk, tools) .then(writeReturnChunk => { // the write return chunk is optional // just in case the write function returns something other than void. if (writeReturnChunk) { controller.enqueue(writeReturnChunk); } }) .catch(err => controller.error(err)); } else { controller.error(new Error('No write function provided')); } }, async flush(controller) { // Flush logic remains unchanged if (optionsArg?.finalFunction) { const tools: IStreamToolsWrite = { truncate: () => controller.terminate(), push: (pipeObject) => controller.enqueue(pipeObject), }; optionsArg.finalFunction(tools) .then(finalChunk => { if (finalChunk) { controller.enqueue(finalChunk); } }) .catch(err => controller.error(err)) .finally(() => controller.terminate()); } else { controller.terminate(); } } }); this.options = optionsArg; } // Method to create a custom readable stream that integrates the readFunction // readFunction is executed whenever the stream is being read from and nothing is enqueued getCustomReadableStream() { const readableStream = this.readable; const options = this.options; const customReadable = new ReadableStream({ async pull(controller) { const reader = readableStream.getReader(); // Check the current state of the original stream const { value, done } = await reader.read(); reader.releaseLock(); if (done) { // If the original stream is done, close the custom readable stream controller.close(); } else { if (value) { // If there is data in the original stream, enqueue it and do not execute the readFunction controller.enqueue(value); } else if (options.readFunction) { // If the original stream is empty, execute the readFunction and read again await options.readFunction({ done: () => controller.close(), write: (writeArg) => controller.enqueue(writeArg), }); const newReader = readableStream.getReader(); const { value: newValue, done: newDone } = await newReader.read(); newReader.releaseLock(); if (newDone) { controller.close(); } else { controller.enqueue(newValue); } } } } }); return customReadable; } }