import * as plugins from './smartrx.plugins.js'; import * as rxjs from './smartrx.plugins.rxjs.js'; /** * ObservableIntake */ export class ObservableIntake { public observable: rxjs.Observable; public completed: Promise; private completedDeffered: plugins.smartpromise.Deferred; private observableFunctions: any = { next: (payloadArg: T) => { // nothing }, complete: (payloadArg: T) => { // nothing }, }; private generator: Generator = null; private buffered = false; private payloadBuffer: any[] = []; constructor() { this.observable = new rxjs.Observable((observerArg: rxjs.Observer) => { this.observableFunctions.next = (...args: any) => { return observerArg.next(args); }; this.observableFunctions.complete = () => { this.completedDeffered.resolve(); return observerArg.complete(); }; }); this.completedDeffered = plugins.smartpromise.defer(); this.completed = this.completedDeffered.promise; } public setObservable(observableFunc: rxjs.Observable) { this.observable = observableFunc; } public push(payloadArg: T) { if (this.buffered) { this.payloadBuffer.push(payloadArg); } else { this.internalPush(payloadArg); } } /** * pushes many payloads as array * @param payloadArgArray */ public pushMany(payloadArgArray: T[]) { for (const item of payloadArgArray) { this.push(item); } } /** * sets a generator to query the next pushed value * @param generatorArg */ public setGenerator(generatorArg: Generator) { this.generator = generatorArg; } public makeBuffered() { this.buffered = true; } public subscribe(...args: any) { return this.observable.subscribe(...args); } /** * request the next values in the quantity specified * @param howManyArg if a generator is set, of a buffer exists, this allows retrieving values */ public request(howManyArg: number) { if (howManyArg === 0) { return; } else { for (let i = 0; i !== howManyArg; i++) { if (this.payloadBuffer.length > 0) { this.internalPush(this.payloadBuffer.shift()); } else { const nextPayload = this.generator.next(); this.internalPush(nextPayload.value); } } } } /** * signals the completion of this observable */ public signalComplete() { this.observableFunctions.complete(); } private internalPush(payloadArg: T) { this.observableFunctions.next(payloadArg); } }