import * as plugins from './plugins.js'; // ======================================== // Interfaces for Read functionality // ======================================== export interface IStreamToolsRead { done: () => void; write: (writeArg: TInput) => Promise; } /** * The read function is called when data needs to be read into the stream. */ export interface IStreamReadFunction { (toolsArg: IStreamToolsRead): Promise; } // ======================================== // Interfaces for Write functionality // ======================================== export interface IStreamToolsWrite { truncate: () => void; push: (pushArg: TOutput) => void; } /** * The write function is called whenever a chunk is written to the stream. */ export interface IStreamWriteFunction { (chunkArg: TInput, toolsArg: IStreamToolsWrite): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamToolsWrite): Promise; } export interface WebDuplexStreamOptions { readFunction?: IStreamReadFunction; writeFunction?: IStreamWriteFunction; finalFunction?: IStreamFinalFunction; } export class WebDuplexStream extends TransformStream { // INSTANCE options: WebDuplexStreamOptions; constructor(optionsArg: WebDuplexStreamOptions) { super({ async start(controller) { // Optionally initialize any state here }, async transform(chunk, controller) { if (optionsArg?.writeFunction) { const tools: IStreamToolsWrite = { truncate: () => controller.terminate(), push: (pushArg: TOutput) => controller.enqueue(pushArg), }; try { const writeReturnChunk = await optionsArg.writeFunction(chunk, tools); if (writeReturnChunk !== undefined && writeReturnChunk !== null) { controller.enqueue(writeReturnChunk); } } catch (err) { controller.error(err); } } else { // If no writeFunction is provided, pass the chunk through controller.enqueue(chunk as unknown as TOutput); } }, async flush(controller) { if (optionsArg?.finalFunction) { const tools: IStreamToolsWrite = { truncate: () => controller.terminate(), push: (pushArg) => controller.enqueue(pushArg), }; try { const finalChunk = await optionsArg.finalFunction(tools); if (finalChunk !== undefined && finalChunk !== null) { controller.enqueue(finalChunk as TOutput); } } catch (err) { controller.error(err); } } // TransformStream auto-closes readable after flush resolves — no terminate() needed }, }); this.options = optionsArg; // Start producing data if readFunction is provided if (this.options.readFunction) { this._startReading().catch((err) => { // Prevent unhandled rejection — the error is propagated through the writable side }); } } private async _startReading() { const writable = this.writable; const writer = writable.getWriter(); let doneSignaled = false; const tools: IStreamToolsRead = { done: () => { doneSignaled = true; }, write: async (writeArg) => await writer.write(writeArg), }; try { await this.options.readFunction(tools); if (doneSignaled) { await writer.close(); } } catch (err) { try { await writer.abort(err); } catch (_) { // Writer may already be in error state } } } // Static method example (adjust as needed) static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream { const stream = new WebDuplexStream({ writeFunction: async (chunk, { push }) => { push(chunk); // Directly push the chunk as is return null; }, }); const writer = stream.writable.getWriter(); writer.write(uint8Array).then(() => writer.close()).catch(() => {}); return stream; } }