import * as plugins from './lik.plugins.js'; export class BackpressuredArray { public data: T[]; private highWaterMark: number; public hasSpace = new plugins.smartrx.rxjs.Subject<'hasSpace'>(); private itemsAvailable = new plugins.smartrx.rxjs.Subject<'itemsAvailable'>(); constructor(highWaterMark: number = 16) { this.data = []; this.highWaterMark = highWaterMark; } push(item: T): boolean { this.data.push(item); this.itemsAvailable.next('itemsAvailable'); const spaceAvailable = this.checkSpaceAvailable(); if (spaceAvailable) { this.hasSpace.next('hasSpace'); } return spaceAvailable; } shift(): T | undefined { const item = this.data.shift(); if (this.checkSpaceAvailable()) { this.hasSpace.next('hasSpace'); } return item; } checkSpaceAvailable(): boolean { return this.data.length < this.highWaterMark; } waitForSpace(): Promise { return new Promise((resolve) => { if (this.checkSpaceAvailable()) { resolve(); } else { const subscription = this.hasSpace.subscribe(() => { subscription.unsubscribe(); resolve(); }); } }); } waitForItems(): Promise { return new Promise((resolve) => { if (this.data.length > 0) { resolve(); } else { const subscription = this.itemsAvailable.subscribe(() => { subscription.unsubscribe(); resolve(); }); } }); } }