26 lines
815 B
TypeScript
26 lines
815 B
TypeScript
import { Transform, type TransformCallback, type TransformOptions } from 'stream';
|
|
|
|
export interface AsyncTransformFunction<TInput, TOutput> {
|
|
(chunkArg: TInput): Promise<TOutput>;
|
|
}
|
|
|
|
export function createTransformFunction<TInput, TOutput>(
|
|
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
|
|
options?: TransformOptions
|
|
): Transform {
|
|
const transformStream = new Transform({
|
|
...options,
|
|
objectMode: true, // Ensure we operate in object mode
|
|
async transform(chunk: TInput, encoding: string, callback: TransformCallback) {
|
|
try {
|
|
const transformed = await asyncFunction(chunk);
|
|
this.push(transformed);
|
|
callback();
|
|
} catch (error) {
|
|
callback(error instanceof Error ? error : new Error(String(error)));
|
|
}
|
|
}
|
|
});
|
|
|
|
return transformStream;
|
|
} |