Compare commits

...

4 Commits

Author SHA1 Message Date
703dc11c6c 3.0.26 2023-11-13 20:34:22 +01:00
28725d1723 fix(core): update 2023-11-13 20:34:21 +01:00
c77e0f2ba6 3.0.25 2023-11-13 19:12:24 +01:00
196fb6d396 fix(core): update 2023-11-13 19:12:23 +01:00
4 changed files with 46 additions and 7 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstream", "name": "@push.rocks/smartstream",
"version": "3.0.24", "version": "3.0.26",
"private": false, "private": false,
"description": "simplifies access to node streams", "description": "simplifies access to node streams",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

View File

@ -10,3 +10,41 @@ hi+wow
hi+wow hi+wow
hi+wow hi+wow
hi+wow hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstream', name: '@push.rocks/smartstream',
version: '3.0.24', version: '3.0.26',
description: 'simplifies access to node streams' description: 'simplifies access to node streams'
} }

View File

@ -35,7 +35,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
} }
// INSTANCE // INSTANCE
private backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(); private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>;
public options: ISmartDuplexOptions<TInput, TOutput>; public options: ISmartDuplexOptions<TInput, TOutput>;
private observableSubscription?: plugins.smartrx.rxjs.Subscription; private observableSubscription?: plugins.smartrx.rxjs.Subscription;
private debugLog(messageArg: string) { private debugLog(messageArg: string) {
@ -45,8 +45,11 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
} }
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) { constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
super(optionsArg); super(Object.assign({
highWaterMark: 1,
}, optionsArg));
this.options = optionsArg; this.options = optionsArg;
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(this.options.highWaterMark || 1)
} }
public async _read(size: number): Promise<void> { public async _read(size: number): Promise<void> {
@ -59,11 +62,9 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
let canPushMore = true; let canPushMore = true;
while(this.backpressuredArray.data.length > 0 && canPushMore) { while(this.backpressuredArray.data.length > 0 && canPushMore) {
const nextChunk = this.backpressuredArray.shift(); const nextChunk = this.backpressuredArray.shift();
if (nextChunk) {
canPushMore = this.push(nextChunk); canPushMore = this.push(nextChunk);
} }
} }
}
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>(); private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
// Ensure the _write method types the chunk as TInput and encodes TOutput // Ensure the _write method types the chunk as TInput and encodes TOutput