fix(core): update

This commit is contained in:
Philipp Kunz 2023-11-11 19:47:20 +01:00
parent 12c9d8cc9d
commit 7b678cc856
2 changed files with 9 additions and 14 deletions

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstream', name: '@push.rocks/smartstream',
version: '3.0.14', version: '3.0.15',
description: 'simplifies access to node streams' description: 'simplifies access to node streams'
} }

View File

@ -1,4 +1,5 @@
import { Transform, type TransformCallback, type TransformOptions } from 'stream'; import { Transform, type TransformCallback, type TransformOptions } from 'stream';
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
export interface AsyncTransformFunction<TInput, TOutput> { export interface AsyncTransformFunction<TInput, TOutput> {
(chunkArg: TInput): Promise<TOutput>; (chunkArg: TInput): Promise<TOutput>;
@ -7,20 +8,14 @@ export interface AsyncTransformFunction<TInput, TOutput> {
export function createTransformFunction<TInput, TOutput>( export function createTransformFunction<TInput, TOutput>(
asyncFunction: AsyncTransformFunction<TInput, TOutput>, asyncFunction: AsyncTransformFunction<TInput, TOutput>,
options?: TransformOptions options?: TransformOptions
): Transform { ): SmartDuplex {
const transformStream = new Transform({ const smartDuplexStream = new SmartDuplex({
...options, ...options,
objectMode: true, // Ensure we operate in object mode writeFunction: async (chunkArg, toolsArg) => {
async transform(chunk: TInput, encoding: string, callback: TransformCallback) { const result = await asyncFunction(chunkArg);
try { return result;
const transformed = await asyncFunction(chunk);
this.push(transformed);
callback();
} catch (error) {
callback(error instanceof Error ? error : new Error(String(error)));
}
} }
}); })
return transformStream; return smartDuplexStream;
} }