import * as plugins from './plugins.js'; // ======================================== // Interfaces for Read functionality // ======================================== export interface IStreamToolsRead { done: () => void; write: (writeArg: TInput) => Promise; } /** * The read function is called when data needs to be read into the stream. */ export interface IStreamReadFunction { (toolsArg: IStreamToolsRead): Promise; } // ======================================== // Interfaces for Write functionality // ======================================== export interface IStreamToolsWrite { truncate: () => void; push: (pushArg: TOutput) => void; } /** * The write function is called whenever a chunk is written to the stream. */ export interface IStreamWriteFunction { (chunkArg: TInput, toolsArg: IStreamToolsWrite): Promise; } export interface IStreamFinalFunction { (toolsArg: IStreamToolsWrite): Promise; } export interface WebDuplexStreamOptions { readFunction?: IStreamReadFunction; writeFunction?: IStreamWriteFunction; finalFunction?: IStreamFinalFunction; } export class WebDuplexStream extends TransformStream { // INSTANCE options: WebDuplexStreamOptions; constructor(optionsArg: WebDuplexStreamOptions) { super({ async start(controller) { // Optionally initialize any state here }, async transform(chunk, controller) { if (optionsArg?.writeFunction) { const tools: IStreamToolsWrite = { truncate: () => controller.terminate(), push: (pushArg: TOutput) => controller.enqueue(pushArg), }; try { const writeReturnChunk = await optionsArg.writeFunction(chunk, tools); if (writeReturnChunk !== undefined && writeReturnChunk !== null) { controller.enqueue(writeReturnChunk); } } catch (err) { controller.error(err); } } else { // If no writeFunction is provided, pass the chunk through controller.enqueue(chunk as unknown as TOutput); } }, async flush(controller) { if (optionsArg?.finalFunction) { const tools: IStreamToolsWrite = { truncate: () => controller.terminate(), push: (pushArg) => controller.enqueue(pushArg), }; try { const finalChunk = await optionsArg.finalFunction(tools); if (finalChunk) { controller.enqueue(finalChunk); } } catch (err) { controller.error(err); } finally { controller.terminate(); } } else { controller.terminate(); } }, }); this.options = optionsArg; // Start producing data if readFunction is provided if (this.options.readFunction) { this._startReading(); } } private async _startReading() { const writable = this.writable; const writer = writable.getWriter(); const tools: IStreamToolsRead = { done: () => writer.close(), write: async (writeArg) => await writer.write(writeArg), }; try { await this.options.readFunction(tools); } catch (err) { writer.abort(err); } finally { writer.releaseLock(); } } // Static method example (adjust as needed) static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream { const stream = new WebDuplexStream({ writeFunction: async (chunk, { push }) => { push(chunk); // Directly push the chunk as is return null; }, }); const writer = stream.writable.getWriter(); writer.write(uint8Array).then(() => writer.close()); return stream; } }