smartstream/ts_web/classes.webduplexstream.ts

135 lines
4.0 KiB
TypeScript
Raw Permalink Normal View History

2024-06-02 16:42:42 +02:00
import * as plugins from './plugins.js';
// ========================================
// Interfaces for Read functionality
2024-06-02 16:42:42 +02:00
// ========================================
export interface IStreamToolsRead<TInput, TOutput> {
done: () => void;
write: (writeArg: TInput) => Promise<void>;
2024-06-02 16:42:42 +02:00
}
/**
* The read function is called when data needs to be read into the stream.
2024-06-02 16:42:42 +02:00
*/
export interface IStreamReadFunction<TInput, TOutput> {
(toolsArg: IStreamToolsRead<TInput, TOutput>): Promise<void>;
}
// ========================================
// Interfaces for Write functionality
2024-06-02 16:42:42 +02:00
// ========================================
export interface IStreamToolsWrite<TInput, TOutput> {
truncate: () => void;
push: (pushArg: TOutput) => void;
}
/**
* The write function is called whenever a chunk is written to the stream.
2024-06-02 16:42:42 +02:00
*/
export interface IStreamWriteFunction<TInput, TOutput> {
(chunkArg: TInput, toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<any>;
}
export interface IStreamFinalFunction<TInput, TOutput> {
(toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<TOutput | void>;
2024-06-02 16:42:42 +02:00
}
export interface WebDuplexStreamOptions<TInput, TOutput> {
readFunction?: IStreamReadFunction<TInput, TOutput>;
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
finalFunction?: IStreamFinalFunction<TInput, TOutput>;
}
export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStream<TInput, TOutput> {
// INSTANCE
options: WebDuplexStreamOptions<TInput, TOutput>;
constructor(optionsArg: WebDuplexStreamOptions<TInput, TOutput>) {
super({
async start(controller) {
// Optionally initialize any state here
},
2024-06-02 16:42:42 +02:00
async transform(chunk, controller) {
if (optionsArg?.writeFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pushArg: TOutput) => controller.enqueue(pushArg),
};
try {
const writeReturnChunk = await optionsArg.writeFunction(chunk, tools);
if (writeReturnChunk !== undefined && writeReturnChunk !== null) {
controller.enqueue(writeReturnChunk);
}
} catch (err) {
controller.error(err);
}
2024-06-02 16:42:42 +02:00
} else {
// If no writeFunction is provided, pass the chunk through
controller.enqueue(chunk as unknown as TOutput);
2024-06-02 16:42:42 +02:00
}
},
async flush(controller) {
if (optionsArg?.finalFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pushArg) => controller.enqueue(pushArg),
2024-06-02 16:42:42 +02:00
};
try {
const finalChunk = await optionsArg.finalFunction(tools);
if (finalChunk) {
controller.enqueue(finalChunk);
}
} catch (err) {
controller.error(err);
} finally {
controller.terminate();
}
2024-06-02 16:42:42 +02:00
} else {
controller.terminate();
}
},
2024-06-02 16:42:42 +02:00
});
this.options = optionsArg;
// Start producing data if readFunction is provided
if (this.options.readFunction) {
this._startReading();
}
2024-06-02 16:42:42 +02:00
}
private async _startReading() {
const writable = this.writable;
const writer = writable.getWriter();
const tools: IStreamToolsRead<TInput, TOutput> = {
done: () => writer.close(),
write: async (writeArg) => await writer.write(writeArg),
};
try {
await this.options.readFunction(tools);
} catch (err) {
writer.abort(err);
} finally {
writer.releaseLock();
}
}
// Static method example (adjust as needed)
static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream<Uint8Array, Uint8Array> {
const stream = new WebDuplexStream<Uint8Array, Uint8Array>({
writeFunction: async (chunk, { push }) => {
push(chunk); // Directly push the chunk as is
return null;
},
2024-06-02 16:42:42 +02:00
});
const writer = stream.writable.getWriter();
writer.write(uint8Array).then(() => writer.close());
return stream;
2024-06-02 16:42:42 +02:00
}
}