import * as plugins from './classes.plugins.js'; export class BackpressuredArray { public data: T[]; private highWaterMark: number; public hasSpace = new plugins.smartrx.rxjs.Subject<'hasSpace'>(); private itemsAvailable = new plugins.smartrx.rxjs.Subject<'itemsAvailable'>(); private isDestroyed = false; constructor(highWaterMark: number = 16) { this.data = []; this.highWaterMark = highWaterMark; } push(item: T): boolean { this.data.push(item); this.itemsAvailable.next('itemsAvailable'); const spaceAvailable = this.checkSpaceAvailable(); if (spaceAvailable) { this.hasSpace.next('hasSpace'); } return spaceAvailable; } shift(): T | undefined { const item = this.data.shift(); if (this.checkSpaceAvailable()) { this.hasSpace.next('hasSpace'); } return item; } checkSpaceAvailable(): boolean { return this.data.length < this.highWaterMark; } public checkHasItems(): boolean { return this.data.length > 0; } waitForSpace(): Promise { return new Promise((resolve) => { if (this.checkSpaceAvailable() || this.isDestroyed) { resolve(); } else { const subscription = this.hasSpace.subscribe({ next: () => { subscription.unsubscribe(); resolve(); }, complete: () => { resolve(); }, }); } }); } waitForItems(): Promise { return new Promise((resolve) => { if (this.data.length > 0 || this.isDestroyed) { resolve(); } else { const subscription = this.itemsAvailable.subscribe({ next: () => { subscription.unsubscribe(); resolve(); }, complete: () => { resolve(); }, }); } }); } /** * destroys the BackpressuredArray, completing all subjects */ public destroy() { this.isDestroyed = true; this.hasSpace.complete(); this.itemsAvailable.complete(); } }