From a9230ca7901ad331582d96b04472b9effc80c815 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Sat, 11 Nov 2023 20:30:42 +0100 Subject: [PATCH] fix(core): update --- test/test.smartstream.ts | 8 ++++-- ts/00_commitinfo_data.ts | 2 +- ts/smartstream.classes.smartduplex.ts | 38 ++++++++++++++++----------- ts/smartstream.functions.ts | 2 +- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/test/test.smartstream.ts b/test/test.smartstream.ts index 745350b..5e31fe7 100644 --- a/test/test.smartstream.ts +++ b/test/test.smartstream.ts @@ -5,7 +5,9 @@ import * as fs from 'fs'; tap.test('should create a SmartStream from a Buffer', async () => { const bufferData = Buffer.from('This is a test buffer'); - const smartStream = SmartDuplex.fromBuffer(bufferData); + const smartStream = SmartDuplex.fromBuffer(bufferData, { + handleBackpressure: false, + }); let receivedData = Buffer.alloc(0); @@ -25,7 +27,9 @@ tap.test('should create a SmartStream from an Observable', async () => { const observableData = 'Observable test data'; const testObservable = smartrx.rxjs.of(Buffer.from(observableData)); - const smartStream = SmartDuplex.fromObservable(testObservable); + const smartStream = SmartDuplex.fromObservable(testObservable, { + handleBackpressure: false, + }); let receivedData = Buffer.alloc(0); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 7f92f82..098c582 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.15', + version: '3.0.16', description: 'simplifies access to node streams' } diff --git a/ts/smartstream.classes.smartduplex.ts b/ts/smartstream.classes.smartduplex.ts index 9f770b1..739ceb7 100644 --- a/ts/smartstream.classes.smartduplex.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -14,7 +14,7 @@ export interface IStreamFinalFunction { (toolsArg: IStreamTools): Promise; } -export interface SmartStreamOptions extends DuplexOptions { +export interface ISmartDuplexOptions extends DuplexOptions { handleBackpressure?: boolean; readFunction?: () => Promise; writeFunction?: IStreamWriteFunction; @@ -24,23 +24,23 @@ export interface SmartStreamOptions extends DuplexOptions { export class SmartDuplex extends Duplex { // STATIC - static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartDuplex { - const smartStream = new SmartDuplex(options); + static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions): SmartDuplex { + const smartDuplex = new SmartDuplex(options); process.nextTick(() => { - smartStream.push(buffer); - smartStream.push(null); // Signal the end of the data + smartDuplex.push(buffer); + smartDuplex.push(null); // Signal the end of the data }); - return smartStream; + return smartDuplex; } static fromObservable( observable: plugins.smartrx.rxjs.Observable, - options?: DuplexOptions + options?: ISmartDuplexOptions ): SmartDuplex { const smartStream = new SmartDuplex(options); smartStream.observableSubscription = observable.subscribe({ next: (data) => { - if (!smartStream.push(data)) { + if (!smartStream.push(data) && smartStream.handleBackpressure) { // Pause the observable if the stream buffer is full smartStream.observableSubscription?.unsubscribe(); smartStream.once('drain', () => { @@ -115,7 +115,7 @@ export class SmartDuplex extends Duplex { private finalFunction?: IStreamFinalFunction; private observableSubscription?: plugins.smartrx.rxjs.Subscription; - constructor(optionsArg?: SmartStreamOptions) { + constructor(optionsArg?: ISmartDuplexOptions) { super(optionsArg); this.readFunction = optionsArg?.readFunction; this.writeFunction = optionsArg?.writeFunction; @@ -129,6 +129,18 @@ export class SmartDuplex extends Duplex { } } + public push(chunkArg?: TOutput | null): boolean { + const result = super.push(chunkArg); + if (!result && this.handleBackpressure) { + this.pause(); + // Listen for 'drain' event to resume + this.once('drain', () => { + this.resume(); // Resume the source of data + }); + } + return result; + } + // Ensure the _write method types the chunk as TInput and encodes TOutput public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { if (!this.writeFunction) { @@ -146,13 +158,7 @@ export class SmartDuplex extends Duplex { try { const modifiedChunk = await this.writeFunction(chunk, tools); if (modifiedChunk) { - if (!this.push(modifiedChunk) && this.handleBackpressure) { - this.pause(); - // Listen for 'drain' event to resume - this.once('drain', () => { - this.resume(); // Resume the source of data - }); - } + this.push(modifiedChunk); } callback(); } catch (err) { diff --git a/ts/smartstream.functions.ts b/ts/smartstream.functions.ts index d7ff9ac..88afca7 100644 --- a/ts/smartstream.functions.ts +++ b/ts/smartstream.functions.ts @@ -15,7 +15,7 @@ export function createTransformFunction( const result = await asyncFunction(chunkArg); return result; } - }) + }); return smartDuplexStream; } \ No newline at end of file