26 lines
815 B
TypeScript
26 lines
815 B
TypeScript
|
import { Transform, type TransformCallback, type TransformOptions } from 'stream';
|
||
|
|
||
|
export interface AsyncTransformFunction<TInput, TOutput> {
|
||
|
(chunkArg: TInput): Promise<TOutput>;
|
||
|
}
|
||
|
|
||
|
export function createTransformFunction<TInput, TOutput>(
|
||
|
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
|
||
|
options?: TransformOptions
|
||
|
): Transform {
|
||
|
const transformStream = new Transform({
|
||
|
...options,
|
||
|
objectMode: true, // Ensure we operate in object mode
|
||
|
async transform(chunk: TInput, encoding: string, callback: TransformCallback) {
|
||
|
try {
|
||
|
const transformed = await asyncFunction(chunk);
|
||
|
this.push(transformed);
|
||
|
callback();
|
||
|
} catch (error) {
|
||
|
callback(error instanceof Error ? error : new Error(String(error)));
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
|
||
|
return transformStream;
|
||
|
}
|