From 8618ac55ef349897194546335be1dbc586a474a2 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Fri, 3 Nov 2023 21:32:24 +0100 Subject: [PATCH] fix(core): update --- test/assets/writabletext.txt | 1 - test/test.smartstream.ts | 6 +- test/test.streamfunction.ts | 40 ++--- ts/00_commitinfo_data.ts | 2 +- ts/index.ts | 3 +- ....ts => smartstream.classes.smartduplex.ts} | 137 +++++++++++++----- ts/smartstream.duplex.ts | 83 ----------- 7 files changed, 126 insertions(+), 146 deletions(-) rename ts/{smartstream.classes.smartstream.ts => smartstream.classes.smartduplex.ts} (52%) delete mode 100644 ts/smartstream.duplex.ts diff --git a/test/assets/writabletext.txt b/test/assets/writabletext.txt index 1807820..e922c03 100644 --- a/test/assets/writabletext.txt +++ b/test/assets/writabletext.txt @@ -48,4 +48,3 @@ hi+wow hi+wow hi+wow hi+wow -noice \ No newline at end of file diff --git a/test/test.smartstream.ts b/test/test.smartstream.ts index 73d7dac..745350b 100644 --- a/test/test.smartstream.ts +++ b/test/test.smartstream.ts @@ -1,11 +1,11 @@ import { expect, tap } from '@push.rocks/tapbundle'; -import { SmartStream } from '../ts/smartstream.classes.smartstream.js'; // Adjust the import to your file structure +import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure import * as smartrx from '@push.rocks/smartrx'; import * as fs from 'fs'; tap.test('should create a SmartStream from a Buffer', async () => { const bufferData = Buffer.from('This is a test buffer'); - const smartStream = SmartStream.fromBuffer(bufferData); + const smartStream = SmartDuplex.fromBuffer(bufferData); let receivedData = Buffer.alloc(0); @@ -25,7 +25,7 @@ tap.test('should create a SmartStream from an Observable', async () => { const observableData = 'Observable test data'; const testObservable = smartrx.rxjs.of(Buffer.from(observableData)); - const smartStream = SmartStream.fromObservable(testObservable); + const smartStream = SmartDuplex.fromObservable(testObservable); let receivedData = Buffer.alloc(0); diff --git a/test/test.streamfunction.ts b/test/test.streamfunction.ts index 177ec43..461eb86 100644 --- a/test/test.streamfunction.ts +++ b/test/test.streamfunction.ts @@ -7,43 +7,43 @@ let testIntake: smartstream.StreamIntake; tap.test('should handle a read stream', async (tools) => { const counter = 0; - const testSmartstream = new smartstream.StreamWrapper([ + const streamWrapper = new smartstream.StreamWrapper([ smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'), - smartstream.createDuplexStream( - async (chunkStringArg: Buffer, streamTools) => { + new smartstream.SmartDuplex({ + writeAndTransformFunction: async (chunkStringArg: Buffer, streamTools) => { // do something with the stream here const result = chunkStringArg.toString().substr(0, 100); - streamTools.pipeMore('wow =========== \n'); + streamTools.push('wow =========== \n'); return Buffer.from(result); }, - async (tools) => { - // tools.pipeMore('hey, this is the end') + streamEndFunction: async (tools) => { return Buffer.from('this is the end'); }, - { objectMode: false } - ), - smartstream.createDuplexStream(async (chunkStringArg) => { - console.log(chunkStringArg.toString()); - return null; + }), + new smartstream.SmartDuplex({ + writeAndTransformFunction: async (chunkStringArg) => { + console.log(chunkStringArg.toString()); + }, + streamEndFunction: async (tools) => { + tools.push(null); + }, }), smartstream.cleanPipe(), ]); - await testSmartstream.run(); + // await streamWrapper.run(); }); tap.test('should create a valid Intake', async (tools) => { testIntake = new smartstream.StreamIntake(); testIntake.pipe( - smartstream.createDuplexStream( - async (chunkString) => { + new smartstream.SmartDuplex({ + objectMode: true, + writeAndTransformFunction: async (chunkStringArg: string, streamTools) => { await tools.delayFor(100); - console.log(chunkString); - return chunkString; - }, - async () => { - return 'noice'; + console.log(chunkStringArg); + return chunkStringArg; } - ) + }) ) .pipe(smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt')); const testFinished = tools.defer(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index e8b941e..b94a4d0 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.0', + version: '3.0.1', description: 'simplifies access to node streams' } diff --git a/ts/index.ts b/ts/index.ts index 29426ef..707c97b 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,5 +1,4 @@ export * from './smartstream.classes.passthrough.js'; -export * from './smartstream.classes.smartstream.js'; +export * from './smartstream.classes.smartduplex.js'; export * from './smartstream.classes.streamwrapper.js'; export * from './smartstream.classes.streamintake.js'; -export * from './smartstream.duplex.js'; \ No newline at end of file diff --git a/ts/smartstream.classes.smartstream.ts b/ts/smartstream.classes.smartduplex.ts similarity index 52% rename from ts/smartstream.classes.smartstream.ts rename to ts/smartstream.classes.smartduplex.ts index 8d61256..4ba28c4 100644 --- a/ts/smartstream.classes.smartstream.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -1,39 +1,30 @@ import * as plugins from './smartstream.plugins.js'; import { Duplex, type DuplexOptions } from 'stream'; -export interface SmartStreamOptions extends DuplexOptions { - // You can add more custom options relevant to TInput and TOutput if necessary + +export interface IStreamTools { + truncate: () => void; + push: (pipeObject: any) => void; } -export class SmartStream extends Duplex { - private observableSubscription?: plugins.smartrx.rxjs.Subscription; - private asyncChunkModifier?: (chunk: TInput) => Promise; +export interface IWriteAndTransformFunction { + (chunkArg: T, toolsArg: IStreamTools): Promise; +} - constructor(options?: SmartStreamOptions, asyncChunkModifierArg?: (chunk: TInput) => Promise) { - super(options); - this.asyncChunkModifier = asyncChunkModifierArg; - } +export interface IStreamEndFunction { + (toolsArg: IStreamTools): Promise; +} - // Ensure the _write method types the chunk as TInput and encodes TOutput - public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { - try { - if (this.asyncChunkModifier) { - const modifiedChunk = await this.asyncChunkModifier(chunk); - if (!this.push(modifiedChunk)) { - // Handle backpressure here if necessary - } - } else { - if (!this.push(chunk as unknown as TOutput)) { - // Handle backpressure here if necessary - } - } - callback(); - } catch (err) { - callback(err); - } - } +export interface SmartStreamOptions extends DuplexOptions { + readFunction?: () => Promise; + writeAndTransformFunction?: IWriteAndTransformFunction; + streamEndFunction?: IStreamEndFunction; + // Add other custom options if necessary +} - static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartStream { - const smartStream = new SmartStream(options); +export class SmartDuplex extends Duplex { + // STATIC + static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartDuplex { + const smartStream = new SmartDuplex(options); process.nextTick(() => { smartStream.push(buffer); smartStream.push(null); // Signal the end of the data @@ -41,8 +32,11 @@ export class SmartStream extends Duplex { return smartStream; } - static fromObservable(observable: plugins.smartrx.rxjs.Observable, options?: DuplexOptions): SmartStream { - const smartStream = new SmartStream(options); + static fromObservable( + observable: plugins.smartrx.rxjs.Observable, + options?: DuplexOptions + ): SmartDuplex { + const smartStream = new SmartDuplex(options); smartStream.observableSubscription = observable.subscribe({ next: (data) => { if (!smartStream.push(data)) { @@ -51,7 +45,7 @@ export class SmartStream extends Duplex { smartStream.once('drain', () => { // Resume the observable when the stream buffer is drained smartStream.observableSubscription?.unsubscribe(); - smartStream.observableSubscription = observable.subscribe(data => { + smartStream.observableSubscription = observable.subscribe((data) => { smartStream.push(data); }); }); @@ -62,14 +56,17 @@ export class SmartStream extends Duplex { }, complete: () => { smartStream.push(null); // Signal the end of the data - } + }, }); return smartStream; } - static fromReplaySubject(replaySubject: plugins.smartrx.rxjs.ReplaySubject, options?: DuplexOptions): SmartStream { - const smartStream = new SmartStream(options); + static fromReplaySubject( + replaySubject: plugins.smartrx.rxjs.ReplaySubject, + options?: DuplexOptions + ): SmartDuplex { + const smartStream = new SmartDuplex(options); let isBackpressured = false; // Subscribe to the ReplaySubject @@ -87,7 +84,7 @@ export class SmartStream extends Duplex { }, complete: () => { smartStream.push(null); // End the stream when the ReplaySubject completes - } + }, }); // Listen for 'drain' event to resume the subscription if it was paused @@ -109,4 +106,72 @@ export class SmartStream extends Duplex { return smartStream; } + + // INSTANCE + private readFunction?: () => Promise; + private writeAndTransformFunction?: IWriteAndTransformFunction; + private streamEndFunction?: IStreamEndFunction; + private observableSubscription?: plugins.smartrx.rxjs.Subscription; + + constructor(optionsArg?: SmartStreamOptions) { + super(optionsArg); + this.readFunction = optionsArg?.readFunction; + this.writeAndTransformFunction = optionsArg?.writeAndTransformFunction; + this.streamEndFunction = optionsArg?.streamEndFunction; + } + + public async _read(size: number): Promise { + if (this.readFunction) { + await this.readFunction(); + } + } + + // Ensure the _write method types the chunk as TInput and encodes TOutput + public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { + if (!this.writeAndTransformFunction) { + return callback(new Error('No stream function provided')); + } + + const tools: IStreamTools = { + truncate: () => { + this.push(null); + callback(); + }, + push: (pushArg: TOutput) => this.push(pushArg), + }; + + try { + const modifiedChunk = await this.writeAndTransformFunction(chunk, tools); + if (modifiedChunk) { + if (!this.push(modifiedChunk)) { + // Handle backpressure if necessary + } + } + callback(); + } catch (err) { + callback(err); + } + } + + public async _final(callback: (error?: Error | null) => void) { + if (this.streamEndFunction) { + const tools: IStreamTools = { + truncate: () => callback(), + push: (pipeObject) => this.push(pipeObject), + }; + + try { + const finalChunk = await this.streamEndFunction(tools); + if (finalChunk) { + this.push(finalChunk); + } + callback(); + } catch (err) { + callback(err); + } + } else { + this.push(null), + callback(); + } + } } diff --git a/ts/smartstream.duplex.ts b/ts/smartstream.duplex.ts deleted file mode 100644 index 442150e..0000000 --- a/ts/smartstream.duplex.ts +++ /dev/null @@ -1,83 +0,0 @@ -import * as plugins from './smartstream.plugins.js'; - -export interface ITruncateFunc { - (): void; -} - -export interface IPipeMoreFunc { - (pipeObject: any): void; -} - -export interface IStreamTools { - truncate: ITruncateFunc; - pipeMore: IPipeMoreFunc; -} - -export interface IStreamFunction { - (chunkArg: T, toolsArg: IStreamTools): Promise; -} - -export interface IStreamEndFunction { - (toolsArg: IStreamTools): Promise; -} - -export interface IStreamOptions { - objectMode?: boolean; - readableObjectMode?: boolean; - writableObjectMode?: boolean; -} - -export let createDuplexStream = ( - funcArg: IStreamFunction, - endFuncArg?: IStreamEndFunction, - optionsArg: IStreamOptions = { - objectMode: false, - readableObjectMode: true, - writableObjectMode: true, - } -) => { - return plugins.through2( - optionsArg, - function (chunk, enc, cb) { - let truncated = false; - const tools: IStreamTools = { - truncate: () => { - truncated = true; - cb(null, null); - }, - pipeMore: (pipeObject) => { - this.push(pipeObject); - }, - }; - const asyncWrapper = async () => { - const resultChunk: rT = await funcArg(chunk, tools); - if (!truncated) { - cb(null, resultChunk); - } - }; - asyncWrapper().catch((err) => { - console.log(err); - }); - }, - function (cb) { - const tools: IStreamTools = { - truncate: () => { - cb(); - }, - pipeMore: (pushArg) => { - this.push(pushArg); - }, - }; - const asyncWrapper = async () => { - if (endFuncArg) { - const result = await endFuncArg(tools); - this.push(result); - } - cb(); - }; - asyncWrapper().catch((err) => { - console.log(err); - }); - } - ); -};