fix(streams): tighten stream typings and guard optional runtime paths for duplex and wrapper utilities
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.4.1',
|
||||
version: '3.4.2',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@ export interface IStreamTools {
|
||||
}
|
||||
|
||||
export interface IStreamWriteFunction<T, rT> {
|
||||
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
|
||||
(chunkArg: T, toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
|
||||
}
|
||||
|
||||
export interface IStreamFinalFunction<rT> {
|
||||
(toolsArg: IStreamTools): Promise<rT>;
|
||||
(toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
|
||||
}
|
||||
|
||||
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
||||
@@ -92,7 +92,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>;
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput | null>;
|
||||
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||
private _consumerWantsData = false;
|
||||
private _readFunctionRunning = false;
|
||||
@@ -114,7 +114,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
)
|
||||
);
|
||||
this.options = safeOptions;
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput | null>(
|
||||
this.options.highWaterMark || 1
|
||||
);
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ export class StreamWrapper {
|
||||
}
|
||||
|
||||
// combine the stream
|
||||
let finalStream = null;
|
||||
let finalStream: plugins.stream.Duplex | null = null;
|
||||
let firstIteration: boolean = true;
|
||||
for (const stream of streamExecutionArray) {
|
||||
if (firstIteration === true) {
|
||||
@@ -71,12 +71,17 @@ export class StreamWrapper {
|
||||
for (const customEventObject of this.customEventObjectArray) {
|
||||
stream.on(customEventObject.eventName, customEventObject.eventFunction);
|
||||
}
|
||||
if (!firstIteration) {
|
||||
if (!firstIteration && finalStream) {
|
||||
finalStream = finalStream.pipe(stream);
|
||||
}
|
||||
firstIteration = false;
|
||||
}
|
||||
|
||||
if (!finalStream) {
|
||||
done.resolve();
|
||||
return done.promise;
|
||||
}
|
||||
|
||||
this.streamStartedDeferred.resolve();
|
||||
|
||||
let resolved = false;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { type TransformOptions } from 'stream';
|
||||
import { type DuplexOptions } from 'stream';
|
||||
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
|
||||
|
||||
export interface AsyncTransformFunction<TInput, TOutput> {
|
||||
@@ -7,7 +7,7 @@ export interface AsyncTransformFunction<TInput, TOutput> {
|
||||
|
||||
export function createTransformFunction<TInput, TOutput>(
|
||||
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
|
||||
options?: TransformOptions
|
||||
options?: DuplexOptions
|
||||
): SmartDuplex {
|
||||
const smartDuplexStream = new SmartDuplex({
|
||||
...options,
|
||||
|
||||
Reference in New Issue
Block a user