From c37f62abec7f06335723088523e3f6031b9fe4a4 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Fri, 3 Nov 2023 13:55:56 +0100 Subject: [PATCH] BREAKING CHANGE(core): update --- test/test.streamfunction.ts | 4 +- ts/00_commitinfo_data.ts | 2 +- ts/smartstream.classes.smartstream.ts | 75 ++++++++++++++++++++++---- ts/smartstream.classes.streamintake.ts | 72 ++++++++++--------------- 4 files changed, 97 insertions(+), 56 deletions(-) diff --git a/test/test.streamfunction.ts b/test/test.streamfunction.ts index 86066bf..177ec43 100644 --- a/test/test.streamfunction.ts +++ b/test/test.streamfunction.ts @@ -33,9 +33,7 @@ tap.test('should handle a read stream', async (tools) => { tap.test('should create a valid Intake', async (tools) => { testIntake = new smartstream.StreamIntake(); - testIntake - .getReadable() - .pipe( + testIntake.pipe( smartstream.createDuplexStream( async (chunkString) => { await tools.delayFor(100); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index a2b025d..e8b941e 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '2.0.8', + version: '3.0.0', description: 'simplifies access to node streams' } diff --git a/ts/smartstream.classes.smartstream.ts b/ts/smartstream.classes.smartstream.ts index 32d72fc..8d61256 100644 --- a/ts/smartstream.classes.smartstream.ts +++ b/ts/smartstream.classes.smartstream.ts @@ -1,20 +1,35 @@ import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; +export interface SmartStreamOptions extends DuplexOptions { + // You can add more custom options relevant to TInput and TOutput if necessary +} -export class SmartStream extends Duplex { +export class SmartStream extends Duplex { private observableSubscription?: plugins.smartrx.rxjs.Subscription; + private asyncChunkModifier?: (chunk: TInput) => Promise; - constructor(options?: DuplexOptions) { + constructor(options?: SmartStreamOptions, asyncChunkModifierArg?: (chunk: TInput) => Promise) { super(options); + this.asyncChunkModifier = asyncChunkModifierArg; } - _read(size: number) { - // Implement if you need custom behavior, otherwise leave it empty - } - - _write(chunk: any, encoding: string, callback: (error?: Error | null) => void) { - // Implement if you need custom behavior - callback(); + // Ensure the _write method types the chunk as TInput and encodes TOutput + public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { + try { + if (this.asyncChunkModifier) { + const modifiedChunk = await this.asyncChunkModifier(chunk); + if (!this.push(modifiedChunk)) { + // Handle backpressure here if necessary + } + } else { + if (!this.push(chunk as unknown as TOutput)) { + // Handle backpressure here if necessary + } + } + callback(); + } catch (err) { + callback(err); + } } static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartStream { @@ -52,4 +67,46 @@ export class SmartStream extends Duplex { return smartStream; } + + static fromReplaySubject(replaySubject: plugins.smartrx.rxjs.ReplaySubject, options?: DuplexOptions): SmartStream { + const smartStream = new SmartStream(options); + let isBackpressured = false; + + // Subscribe to the ReplaySubject + const subscription = replaySubject.subscribe({ + next: (data) => { + const canPush = smartStream.push(data); + if (!canPush) { + // If push returns false, pause the subscription because of backpressure + isBackpressured = true; + subscription.unsubscribe(); + } + }, + error: (err) => { + smartStream.emit('error', err); + }, + complete: () => { + smartStream.push(null); // End the stream when the ReplaySubject completes + } + }); + + // Listen for 'drain' event to resume the subscription if it was paused + smartStream.on('drain', () => { + if (isBackpressured) { + isBackpressured = false; + // Resubscribe to the ReplaySubject since we previously paused + smartStream.observableSubscription = replaySubject.subscribe({ + next: (data) => { + if (!smartStream.push(data)) { + smartStream.observableSubscription?.unsubscribe(); + isBackpressured = true; + } + }, + // No need to repeat error and complete handling here because it's already set up above + }); + } + }); + + return smartStream; + } } diff --git a/ts/smartstream.classes.streamintake.ts b/ts/smartstream.classes.streamintake.ts index 34d92c8..2feb5e9 100644 --- a/ts/smartstream.classes.streamintake.ts +++ b/ts/smartstream.classes.streamintake.ts @@ -1,56 +1,42 @@ import * as plugins from './smartstream.plugins.js'; -export class StreamIntake { +export class StreamIntake extends plugins.stream.Readable { private signalEndBoolean = false; private chunkStore: T[] = []; - public pushNextObservable = new plugins.smartrx.ObservableIntake(); - private pushedNextDeferred = plugins.smartpromise.defer(); - private readableStream = plugins.from2.obj(async (size, next) => { + constructor(options?: plugins.stream.ReadableOptions) { + super({ ...options, objectMode: true }); // Ensure that we are in object mode. + this.pushNextObservable.push('please push next'); + } + + _read(size: number): void { // console.log('get next'); - // execute without backpressure - while (this.chunkStore.length > 0) { - next(null, this.chunkStore.shift()); - } - if (this.signalEndBoolean) { - next(null, null); - } + const pushChunk = (): void => { + if (this.chunkStore.length > 0) { + // If push returns false, then we should stop reading + if (!this.push(this.chunkStore.shift())) { + return; + } + } - // lets trigger backpressure handling - this.pushNextObservable.push('please push next'); - await this.pushedNextDeferred.promise; - this.pushedNextDeferred = plugins.smartpromise.defer(); + if (this.chunkStore.length === 0) { + if (this.signalEndBoolean) { + // If we're done, push null to signal the end of the stream + this.push(null); + } else { + // Ask for more data and wait + this.pushNextObservable.push('please push next'); + this.pushedNextDeferred.promise.then(() => { + this.pushedNextDeferred = plugins.smartpromise.defer(); // Reset the deferred + pushChunk(); // Try pushing the next chunk + }); + } + } + }; - // execute with backpressure - while (this.chunkStore.length > 0) { - next(null, this.chunkStore.shift()); - } - if (this.signalEndBoolean) { - next(null, null); - } - }); - - constructor() { - this.pushNextObservable.push('please push next'); - } - - /** - * returns a new style readble stream - */ - public getReadable() { - const readable = new plugins.stream.Readable({ - objectMode: true, - }); - return readable.wrap(this.readableStream); - } - - /** - * returns an oldstyle readble stream - */ - public getReadableStream() { - return this.readableStream; + pushChunk(); } public pushData(chunkData: T) {