diff --git a/package.json b/package.json index 8416193..4bfa24e 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@types/node": "^20.9.0" }, "dependencies": { - "@push.rocks/lik": "^6.0.6", + "@push.rocks/lik": "^6.0.12", "@push.rocks/smartpromise": "^4.0.3", "@push.rocks/smartrx": "^3.0.7" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index dc599a2..ac925c6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -6,8 +6,8 @@ settings: dependencies: '@push.rocks/lik': - specifier: ^6.0.6 - version: 6.0.6 + specifier: ^6.0.12 + version: 6.0.12 '@push.rocks/smartpromise': specifier: ^4.0.3 version: 4.0.3 @@ -49,7 +49,7 @@ packages: dependencies: '@api.global/typedrequest-interfaces': 3.0.1 '@push.rocks/isounique': 1.0.5 - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartpromise': 4.0.3 '@push.rocks/webrequest': 3.0.34 @@ -61,7 +61,7 @@ packages: '@api.global/typedrequest': 3.0.2 '@api.global/typedrequest-interfaces': 3.0.1 '@api.global/typedsocket': 3.0.0 - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartchok': 1.0.23 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartenv': 5.0.12 @@ -579,8 +579,8 @@ packages: resolution: {integrity: sha512-Z0BVqZZOCif1THTbIKWMgg0wxCzt9CyBtBBqQJiZ+jJ0KlQFrQHNHrPt81/LXe/L4x0cxWsn0bpL6W5DNSvNLw==} dev: true - /@push.rocks/lik@6.0.6: - resolution: {integrity: sha512-5NVFbtR2XsGHxNqUbjFPp91J7mCWFPUc8bVfXIY2YgzhK/I5XI7yT+QBM+Bc2IvGLxjvQPkueRfPKl1IEnegeA==} + /@push.rocks/lik@6.0.12: + resolution: {integrity: sha512-/vzlOZ26gCmXZz67LeM2hJ+aNM49Jxvf3FKxLMXHhJwffd3LcV96MYbMfKzKR/za/bh5Itf3a6UjLL5mmN6Pew==} dependencies: '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartmatch': 2.0.0 @@ -628,7 +628,7 @@ packages: /@push.rocks/smartcli@4.0.8: resolution: {integrity: sha512-B4F3nqq7ko8tev1wxGdFnh/zSDDP8Q9LpEOb3wTf0jayyhYetFQ7n6zi4J9fhXYBKPkJSyQEBoOfRmgJyeLHkA==} dependencies: - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartlog': 3.0.3 '@push.rocks/smartparam': 1.1.10 '@push.rocks/smartpromise': 4.0.3 @@ -676,7 +676,7 @@ packages: /@push.rocks/smartfile@10.0.41: resolution: {integrity: sha512-xOOy0duI34M2qrJZggpk51EHGXmg9+mBL1Q55tNiQKXzfx89P3coY1EAZG8tvmep3qB712QEKe7T+u04t42Kjg==} dependencies: - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartfile-interfaces': 1.0.7 '@push.rocks/smarthash': 3.0.4 @@ -697,7 +697,7 @@ packages: /@push.rocks/smartfile@11.0.0: resolution: {integrity: sha512-S2JyuRiBu6tocKGTbdLgz0NcGuJ2fP7mD+NHryqdnvXzIwapE1jZ1XAF8/xiLuB9rmIs7agKFm+2Jj3/PtfIWw==} dependencies: - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartfile-interfaces': 1.0.7 '@push.rocks/smarthash': 3.0.4 @@ -899,7 +899,7 @@ packages: '@api.global/typedserver': 3.0.9 '@push.rocks/isohash': 2.0.1 '@push.rocks/isounique': 1.0.5 - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartenv': 5.0.12 '@push.rocks/smartjson': 5.0.10 @@ -963,7 +963,7 @@ packages: /@push.rocks/smarttime@4.0.6: resolution: {integrity: sha512-1whOow0YJw/TbN758TedRRxApoZbsvyxCVpoGjXh7DE/fEEgs7RCr4vVF5jYpyXNQuNMLpKJcTsSfyQ6RvH4Aw==} dependencies: - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartpromise': 4.0.3 croner: 7.0.4 @@ -1026,7 +1026,7 @@ packages: resolution: {integrity: sha512-w5Q3g1TT5SDIXukAAoYVuWud+Y5ysS8qiBqPU00/re895VVZhUOSNJMNU6jyneZigmbWtwSLsxDkZHlsHWpfuA==} dependencies: '@apiglobal/typedrequest-interfaces': 2.0.1 - '@push.rocks/lik': 6.0.6 + '@push.rocks/lik': 6.0.12 '@push.rocks/smartenv': 5.0.12 '@push.rocks/smartjson': 5.0.10 '@push.rocks/smartpromise': 4.0.3 diff --git a/test/assets/writabletext.txt b/test/assets/writabletext.txt index e922c03..a3a236a 100644 --- a/test/assets/writabletext.txt +++ b/test/assets/writabletext.txt @@ -46,5 +46,3 @@ hi+wow hi+wow hi+wow hi+wow -hi+wow -hi+wow diff --git a/test/backpressure.ts b/test/backpressure.ts index 0312d50..c5c7812 100644 --- a/test/backpressure.ts +++ b/test/backpressure.ts @@ -4,6 +4,7 @@ import * as smartdelay from '@push.rocks/smartdelay'; async function testBackpressure() { const stream1 = new SmartDuplex({ + name: 'stream1', objectMode: true, handleBackpressure: true, writeFunction: async (chunk, tools) => { @@ -12,25 +13,29 @@ async function testBackpressure() { } }); const stream2 = new SmartDuplex({ + name: 'stream2', objectMode: true, handleBackpressure: true, writeFunction: async (chunk, tools) => { - await new Promise(resolve => setTimeout(resolve, 100)); // Slow processing + await new Promise(resolve => setTimeout(resolve, 1)); // Slow processing console.log(`processed chunk ${chunk} in stream 2`); return chunk; } }); // This stream processes data more slowly const stream3 = new SmartDuplex({ - handleBackpressure: false, + objectMode: true, + name: 'stream3', + handleBackpressure: true, writeFunction: async (chunk, tools) => { - console.log(`finished chunk ${chunk} in stream 3`); + await new Promise(resolve => setTimeout(resolve, 200)); // Slow processing + console.log(`processed chunk ${chunk} in stream 3`); } }); stream1.pipe(stream2).pipe(stream3); let backpressured = false; - for (let i = 1; i < 100; i++) { + for (let i = 0; i < 1000; i++) { const canContinue = stream1.write(`Chunk ${i}`, 'utf8'); if (!canContinue) { backpressured = true; diff --git a/test/test.smartstream.ts b/test/test.smartstream.ts index 5e31fe7..e89ce27 100644 --- a/test/test.smartstream.ts +++ b/test/test.smartstream.ts @@ -23,26 +23,4 @@ tap.test('should create a SmartStream from a Buffer', async () => { }); }); -tap.test('should create a SmartStream from an Observable', async () => { - const observableData = 'Observable test data'; - const testObservable = smartrx.rxjs.of(Buffer.from(observableData)); - - const smartStream = SmartDuplex.fromObservable(testObservable, { - handleBackpressure: false, - }); - - let receivedData = Buffer.alloc(0); - - return new Promise((resolve) => { - smartStream.on('data', (chunk: Buffer) => { - receivedData = Buffer.concat([receivedData, chunk]); - }); - - smartStream.on('end', () => { - expect(receivedData.toString()).toEqual(observableData); - resolve(); - }); - }); -}); - tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index e25bff0..37e9019 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.19', + version: '3.0.20', description: 'simplifies access to node streams' } diff --git a/ts/smartstream.classes.smartduplex.ts b/ts/smartstream.classes.smartduplex.ts index b7b63d3..357617b 100644 --- a/ts/smartstream.classes.smartduplex.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -15,6 +15,8 @@ export interface IStreamFinalFunction { } export interface ISmartDuplexOptions extends DuplexOptions { + debug?: boolean; + name?: string; handleBackpressure?: boolean; readFunction?: () => Promise; writeFunction?: IStreamWriteFunction; @@ -33,141 +35,71 @@ export class SmartDuplex extends Duplex { return smartDuplex; } - static fromObservable( - observable: plugins.smartrx.rxjs.Observable, - options?: ISmartDuplexOptions - ): SmartDuplex { - const smartStream = new SmartDuplex(options); - smartStream.observableSubscription = observable.subscribe({ - next: (data) => { - if (!smartStream.push(data) && smartStream.handleBackpressure) { - // Pause the observable if the stream buffer is full - smartStream.observableSubscription?.unsubscribe(); - smartStream.once('drain', () => { - // Resume the observable when the stream buffer is drained - smartStream.observableSubscription?.unsubscribe(); - smartStream.observableSubscription = observable.subscribe((data) => { - smartStream.push(data); - }); - }); - } - }, - error: (err) => { - smartStream.emit('error', err); - }, - complete: () => { - smartStream.push(null); // Signal the end of the data - }, - }); - - return smartStream; - } - - static fromReplaySubject( - replaySubject: plugins.smartrx.rxjs.ReplaySubject, - options?: DuplexOptions - ): SmartDuplex { - const smartStream = new SmartDuplex(options); - let isBackpressured = false; - - // Subscribe to the ReplaySubject - const subscription = replaySubject.subscribe({ - next: (data) => { - const canPush = smartStream.push(data); - if (!canPush) { - // If push returns false, pause the subscription because of backpressure - isBackpressured = true; - subscription.unsubscribe(); - } - }, - error: (err) => { - smartStream.emit('error', err); - }, - complete: () => { - smartStream.push(null); // End the stream when the ReplaySubject completes - }, - }); - - // Listen for 'drain' event to resume the subscription if it was paused - smartStream.on('drain', () => { - if (isBackpressured) { - isBackpressured = false; - // Resubscribe to the ReplaySubject since we previously paused - smartStream.observableSubscription = replaySubject.subscribe({ - next: (data) => { - if (!smartStream.push(data)) { - smartStream.observableSubscription?.unsubscribe(); - isBackpressured = true; - } - }, - // No need to repeat error and complete handling here because it's already set up above - }); - } - }); - - return smartStream; - } - // INSTANCE - private readFunction?: () => Promise; - private handleBackpressure: boolean; - private writeFunction?: IStreamWriteFunction; - private finalFunction?: IStreamFinalFunction; + private backpressuredArray = new plugins.lik.BackpressuredArray(); + public options: ISmartDuplexOptions; private observableSubscription?: plugins.smartrx.rxjs.Subscription; + private debugLog(messageArg: string) { + if (this.options.debug) { + console.log(messageArg); + } + } constructor(optionsArg?: ISmartDuplexOptions) { super(optionsArg); - this.readFunction = optionsArg?.readFunction; - this.writeFunction = optionsArg?.writeFunction; - this.finalFunction = optionsArg?.finalFunction; - this.handleBackpressure = optionsArg?.handleBackpressure ?? true; + this.options = optionsArg; } public async _read(size: number): Promise { - if (this.readFunction) { - await this.readFunction(); + await this.backpressuredArray.waitForItems(); + this.debugLog(`${this.options.name}: read was called`); + if (this.options.readFunction) { + await this.options.readFunction(); + } + let canPushMore = true; + while(this.backpressuredArray.data.length > 0 && canPushMore) { + const nextChunk = this.backpressuredArray.shift(); + if (nextChunk) { + canPushMore = this.push(nextChunk); + } } } private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap>(); - // Ensure the _write method types the chunk as TInput and encodes TOutput public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { - if (!this.writeFunction) { + if (!this.options.writeFunction) { return callback(new Error('No stream function provided')); } + let isTruncated = false; const tools: IStreamTools = { truncate: () => { this.push(null); + isTruncated = true; callback(); }, - push: (pushArg: TOutput) => this.push(pushArg), + push: (pushArg: TOutput) => { + this.backpressuredArray.push(pushArg); + }, }; try { const writeDeferred = plugins.smartpromise.defer(); this.asyncWritePromiseObjectmap.add(writeDeferred.promise); - const modifiedChunk = await this.writeFunction(chunk, tools); - if (modifiedChunk) { - const drainDeferred = plugins.smartpromise.defer(); - this.once('drain', () => { - drainDeferred.resolve(); - }); - const canPushMore = this.push(modifiedChunk); - if (!canPushMore) { - await drainDeferred.promise; - console.log('jojojo'); - callback(); - writeDeferred.resolve(); - } else { - callback(); - writeDeferred.resolve(); - } - } else { - callback(); - writeDeferred.resolve(); + const modifiedChunk = await this.options.writeFunction(chunk, tools); + if (isTruncated) { + return; } + if (modifiedChunk) { + const canPushMore = this.backpressuredArray.push(modifiedChunk); + if (!canPushMore) { + this.debugLog(`${this.options.name}: cannot push more`); + await this.backpressuredArray.waitForSpace(); + this.debugLog(`${this.options.name}: can push more again`); + } + } + callback(); writeDeferred.resolve(); writeDeferred.promise.then(() => { this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); @@ -179,14 +111,14 @@ export class SmartDuplex extends Duplex { public async _final(callback: (error?: Error | null) => void) { await Promise.all(this.asyncWritePromiseObjectmap.getArray()); - if (this.finalFunction) { + if (this.options.finalFunction) { const tools: IStreamTools = { truncate: () => callback(), push: (pipeObject) => this.push(pipeObject), }; try { - const finalChunk = await this.finalFunction(tools); + const finalChunk = await this.options.finalFunction(tools); if (finalChunk) { this.push(finalChunk); }