diff --git a/dist/smartrx.classes.observableintake.d.ts b/dist/smartrx.classes.observableintake.d.ts index 2173586..8accc13 100644 --- a/dist/smartrx.classes.observableintake.d.ts +++ b/dist/smartrx.classes.observableintake.d.ts @@ -7,9 +7,32 @@ export declare class ObservableIntake { completed: Promise; private completedDeffered; private observableFunctions; + private generator; + private buffered; + private payloadBuffer; constructor(); setObservable(observableFunc: any): void; push(payloadArg: T): void; + /** + * pushes many payloads as array + * @param payloadArgArray + */ + pushMany(payloadArgArray: T[]): void; + /** + * sets a generator to query the next pushed value + * @param generatorArg + */ + setGenerator(generatorArg: any): void; + makeBuffered(): void; subscribe(...args: any[]): Subscription; + /** + * request the next values in the quantity specified + * @param howManyArg if a generator is set, of a buffer exists, this allows retrieving values + */ + request(howManyArg: number): void; + /** + * signals the completion of this observable + */ signalComplete(): void; + private internalPush(payloadArg); } diff --git a/dist/smartrx.classes.observableintake.js b/dist/smartrx.classes.observableintake.js index efb1f7b..d3e4cbd 100644 --- a/dist/smartrx.classes.observableintake.js +++ b/dist/smartrx.classes.observableintake.js @@ -15,6 +15,9 @@ class ObservableIntake { // nothing } }; + this.generator = null; + this.buffered = false; + this.payloadBuffer = []; this.observable = rxjs_1.Observable.create((observerArg) => { this.observableFunctions.next = (...args) => { return observerArg.next(...args); @@ -31,14 +34,64 @@ class ObservableIntake { this.observable = observableFunc(); } push(payloadArg) { - this.observableFunctions.next(payloadArg); + if (this.buffered) { + this.payloadBuffer.push(payloadArg); + } + else { + this.internalPush(payloadArg); + } + } + /** + * pushes many payloads as array + * @param payloadArgArray + */ + pushMany(payloadArgArray) { + for (let item of payloadArgArray) { + this.push(item); + } + } + /** + * sets a generator to query the next pushed value + * @param generatorArg + */ + setGenerator(generatorArg) { + this.generator = generatorArg; + } + makeBuffered() { + this.buffered = true; } subscribe(...args) { return this.observable.subscribe(...args); } + /** + * request the next values in the quantity specified + * @param howManyArg if a generator is set, of a buffer exists, this allows retrieving values + */ + request(howManyArg) { + if (howManyArg === 0) { + return; + } + else { + for (let i = 0; i !== howManyArg; i++) { + if (this.payloadBuffer.length > 0) { + this.internalPush(this.payloadBuffer.shift()); + } + else { + const nextPayload = this.generator(); + this.internalPush(nextPayload); + } + } + } + } + /** + * signals the completion of this observable + */ signalComplete() { this.observableFunctions.complete(); } + internalPush(payloadArg) { + this.observableFunctions.next(payloadArg); + } } exports.ObservableIntake = ObservableIntake; -//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic21hcnRyeC5jbGFzc2VzLm9ic2VydmFibGVpbnRha2UuanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi90cy9zbWFydHJ4LmNsYXNzZXMub2JzZXJ2YWJsZWludGFrZS50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiOztBQUFBLDZDQUE0QztBQUM1QywrQkFBK0M7QUFHL0M7O0dBRUc7QUFDSDtJQWFFO1FBVFEsd0JBQW1CLEdBQVE7WUFDakMsSUFBSSxFQUFFLENBQUMsVUFBVSxFQUFFLEVBQUU7Z0JBQ25CLFVBQVU7WUFDWixDQUFDO1lBQ0QsUUFBUSxFQUFFLENBQUMsVUFBVSxFQUFFLEVBQUU7Z0JBQ3ZCLFVBQVU7WUFDWixDQUFDO1NBQ0YsQ0FBQTtRQUdDLElBQUksQ0FBQyxVQUFVLEdBQUcsaUJBQVUsQ0FBQyxNQUFNLENBQUMsQ0FBQyxXQUFXLEVBQUUsRUFBRTtZQUNsRCxJQUFJLENBQUMsbUJBQW1CLENBQUMsSUFBSSxHQUFHLENBQUMsR0FBRyxJQUFJLEVBQUUsRUFBRTtnQkFDMUMsTUFBTSxDQUFDLFdBQVcsQ0FBQyxJQUFJLENBQUMsR0FBRyxJQUFJLENBQUMsQ0FBQTtZQUNsQyxDQUFDLENBQUE7WUFDRCxJQUFJLENBQUMsbUJBQW1CLENBQUMsUUFBUSxHQUFHLENBQUMsR0FBRyxJQUFJLEVBQUUsRUFBRTtnQkFDOUMsSUFBSSxDQUFDLGlCQUFpQixDQUFDLE9BQU8sRUFBRSxDQUFBO2dCQUNoQyxNQUFNLENBQUMsV0FBVyxDQUFDLFFBQVEsQ0FBQyxHQUFHLElBQUksQ0FBQyxDQUFBO1lBQ3RDLENBQUMsQ0FBQTtRQUNILENBQUMsQ0FBQyxDQUFBO1FBQ0YsSUFBSSxDQUFDLGlCQUFpQixHQUFHLE9BQU8sQ0FBQyxNQUFNLENBQUMsS0FBSyxFQUFFLENBQUE7UUFDL0MsSUFBSSxDQUFDLFNBQVMsR0FBRyxJQUFJLENBQUMsaUJBQWlCLENBQUMsT0FBTyxDQUFBO0lBQ2pELENBQUM7SUFFRCxhQUFhLENBQUUsY0FBYztRQUMzQixJQUFJLENBQUMsVUFBVSxHQUFHLGNBQWMsRUFBRSxDQUFBO0lBQ3BDLENBQUM7SUFFRCxJQUFJLENBQUUsVUFBYTtRQUNqQixJQUFJLENBQUMsbUJBQW1CLENBQUMsSUFBSSxDQUFDLFVBQVUsQ0FBQyxDQUFBO0lBQzNDLENBQUM7SUFFRCxTQUFTLENBQUUsR0FBRyxJQUFJO1FBQ2hCLE1BQU0sQ0FBQyxJQUFJLENBQUMsVUFBVSxDQUFDLFNBQVMsQ0FBQyxHQUFHLElBQUksQ0FBQyxDQUFBO0lBQzNDLENBQUM7SUFFRCxjQUFjO1FBQ1osSUFBSSxDQUFDLG1CQUFtQixDQUFDLFFBQVEsRUFBRSxDQUFBO0lBQ3JDLENBQUM7Q0FDRjtBQTFDRCw0Q0EwQ0MifQ== \ No newline at end of file +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic21hcnRyeC5jbGFzc2VzLm9ic2VydmFibGVpbnRha2UuanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi90cy9zbWFydHJ4LmNsYXNzZXMub2JzZXJ2YWJsZWludGFrZS50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiOztBQUFBLDZDQUE0QztBQUM1QywrQkFBK0M7QUFHL0M7O0dBRUc7QUFDSDtJQWdCRTtRQVpRLHdCQUFtQixHQUFRO1lBQ2pDLElBQUksRUFBRSxDQUFDLFVBQVUsRUFBRSxFQUFFO2dCQUNuQixVQUFVO1lBQ1osQ0FBQztZQUNELFFBQVEsRUFBRSxDQUFDLFVBQVUsRUFBRSxFQUFFO2dCQUN2QixVQUFVO1lBQ1osQ0FBQztTQUNGLENBQUE7UUFDTyxjQUFTLEdBQUcsSUFBSSxDQUFBO1FBQ2hCLGFBQVEsR0FBRyxLQUFLLENBQUE7UUFDaEIsa0JBQWEsR0FBRyxFQUFFLENBQUE7UUFHeEIsSUFBSSxDQUFDLFVBQVUsR0FBRyxpQkFBVSxDQUFDLE1BQU0sQ0FBQyxDQUFDLFdBQVcsRUFBRSxFQUFFO1lBQ2xELElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLEdBQUcsQ0FBQyxHQUFHLElBQUksRUFBRSxFQUFFO2dCQUMxQyxNQUFNLENBQUMsV0FBVyxDQUFDLElBQUksQ0FBQyxHQUFHLElBQUksQ0FBQyxDQUFBO1lBQ2xDLENBQUMsQ0FBQTtZQUNELElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxRQUFRLEdBQUcsQ0FBQyxHQUFHLElBQUksRUFBRSxFQUFFO2dCQUM5QyxJQUFJLENBQUMsaUJBQWlCLENBQUMsT0FBTyxFQUFFLENBQUE7Z0JBQ2hDLE1BQU0sQ0FBQyxXQUFXLENBQUMsUUFBUSxDQUFDLEdBQUcsSUFBSSxDQUFDLENBQUE7WUFDdEMsQ0FBQyxDQUFBO1FBQ0gsQ0FBQyxDQUFDLENBQUE7UUFDRixJQUFJLENBQUMsaUJBQWlCLEdBQUcsT0FBTyxDQUFDLE1BQU0sQ0FBQyxLQUFLLEVBQUUsQ0FBQTtRQUMvQyxJQUFJLENBQUMsU0FBUyxHQUFHLElBQUksQ0FBQyxpQkFBaUIsQ0FBQyxPQUFPLENBQUE7SUFDakQsQ0FBQztJQUVELGFBQWEsQ0FBRSxjQUFjO1FBQzNCLElBQUksQ0FBQyxVQUFVLEdBQUcsY0FBYyxFQUFFLENBQUE7SUFDcEMsQ0FBQztJQUVELElBQUksQ0FBRSxVQUFhO1FBQ2pCLEVBQUUsQ0FBQyxDQUFDLElBQUksQ0FBQyxRQUFRLENBQUMsQ0FBQyxDQUFDO1lBQ2xCLElBQUksQ0FBQyxhQUFhLENBQUMsSUFBSSxDQUFDLFVBQVUsQ0FBQyxDQUFBO1FBQ3JDLENBQUM7UUFBQyxJQUFJLENBQUMsQ0FBQztZQUNOLElBQUksQ0FBQyxZQUFZLENBQUMsVUFBVSxDQUFDLENBQUE7UUFDL0IsQ0FBQztJQUNILENBQUM7SUFFRDs7O09BR0c7SUFDSCxRQUFRLENBQUUsZUFBb0I7UUFDNUIsR0FBRyxDQUFDLENBQUMsSUFBSSxJQUFJLElBQUksZUFBZSxDQUFDLENBQUMsQ0FBQztZQUNqQyxJQUFJLENBQUMsSUFBSSxDQUFDLElBQUksQ0FBQyxDQUFBO1FBQ2pCLENBQUM7SUFDSCxDQUFDO0lBRUQ7OztPQUdHO0lBQ0gsWUFBWSxDQUFFLFlBQVk7UUFDeEIsSUFBSSxDQUFDLFNBQVMsR0FBRyxZQUFZLENBQUE7SUFDL0IsQ0FBQztJQUVELFlBQVk7UUFDVixJQUFJLENBQUMsUUFBUSxHQUFHLElBQUksQ0FBQTtJQUN0QixDQUFDO0lBRUQsU0FBUyxDQUFFLEdBQUcsSUFBSTtRQUNoQixNQUFNLENBQUMsSUFBSSxDQUFDLFVBQVUsQ0FBQyxTQUFTLENBQUMsR0FBRyxJQUFJLENBQUMsQ0FBQTtJQUMzQyxDQUFDO0lBRUQ7OztPQUdHO0lBQ0gsT0FBTyxDQUFFLFVBQWtCO1FBQ3pCLEVBQUUsQ0FBQyxDQUFDLFVBQVUsS0FBSyxDQUFDLENBQUMsQ0FBQyxDQUFDO1lBQ3JCLE1BQU0sQ0FBQTtRQUNSLENBQUM7UUFBQyxJQUFJLENBQUMsQ0FBQztZQUNOLEdBQUcsQ0FBQyxDQUFDLElBQUksQ0FBQyxHQUFHLENBQUMsRUFBRSxDQUFDLEtBQUssVUFBVSxFQUFFLENBQUMsRUFBRSxFQUFFLENBQUM7Z0JBQ3RDLEVBQUUsQ0FBQyxDQUFDLElBQUksQ0FBQyxhQUFhLENBQUMsTUFBTSxHQUFHLENBQUMsQ0FBQyxDQUFDLENBQUM7b0JBQ2xDLElBQUksQ0FBQyxZQUFZLENBQUMsSUFBSSxDQUFDLGFBQWEsQ0FBQyxLQUFLLEVBQUUsQ0FBQyxDQUFBO2dCQUMvQyxDQUFDO2dCQUFDLElBQUksQ0FBQyxDQUFDO29CQUNOLE1BQU0sV0FBVyxHQUFHLElBQUksQ0FBQyxTQUFTLEVBQUUsQ0FBQTtvQkFDcEMsSUFBSSxDQUFDLFlBQVksQ0FBQyxXQUFXLENBQUMsQ0FBQTtnQkFDaEMsQ0FBQztZQUNILENBQUM7UUFDSCxDQUFDO0lBQ0gsQ0FBQztJQUVEOztPQUVHO0lBQ0gsY0FBYztRQUNaLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxRQUFRLEVBQUUsQ0FBQTtJQUNyQyxDQUFDO0lBRU8sWUFBWSxDQUFFLFVBQVU7UUFDOUIsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksQ0FBQyxVQUFVLENBQUMsQ0FBQTtJQUMzQyxDQUFDO0NBQ0Y7QUFqR0QsNENBaUdDIn0= \ No newline at end of file diff --git a/test/test.observableintake.ts b/test/test.observableintake.ts index 9f7d0c1..7f2c6b7 100644 --- a/test/test.observableintake.ts +++ b/test/test.observableintake.ts @@ -7,15 +7,34 @@ tap.test('should create a valid instance of observableinstake', async () => { expect(testObservableIntake).to.be.instanceOf(smartrx.ObservableIntake) }) -tap.test('expect testObserservableIntake to be lazy', async (tools) => { +tap.test('expect testObserservableIntake to push things', async (tools) => { const testObserservableIntake = new smartrx.ObservableIntake() testObserservableIntake.subscribe(value => { console.log(value) }) testObserservableIntake.push('hi') + testObserservableIntake.push('wow') testObserservableIntake.signalComplete() await testObserservableIntake.completed }) +tap.test('expect testObserservableIntake to push things', async (tools) => { + const testObserservableIntake = new smartrx.ObservableIntake() + testObserservableIntake.push('hi') + testObserservableIntake.push('wow') + testObserservableIntake.makeBuffered() + testObserservableIntake.push('jo') + testObserservableIntake.subscribe(value => { + console.log(value) + testObserservableIntake.signalComplete() + }) + testObserservableIntake.request(1) + await testObserservableIntake.completed +}) + +tap.test('', async () => { + +}) + tap.start() diff --git a/ts/smartrx.classes.observableintake.ts b/ts/smartrx.classes.observableintake.ts index 2801ac9..a614a19 100644 --- a/ts/smartrx.classes.observableintake.ts +++ b/ts/smartrx.classes.observableintake.ts @@ -17,6 +17,9 @@ export class ObservableIntake { // nothing } } + private generator = null + private buffered = false + private payloadBuffer = [] constructor () { this.observable = Observable.create((observerArg) => { @@ -37,14 +40,66 @@ export class ObservableIntake { } push (payloadArg: T) { - this.observableFunctions.next(payloadArg) + if (this.buffered) { + this.payloadBuffer.push(payloadArg) + } else { + this.internalPush(payloadArg) + } + } + + /** + * pushes many payloads as array + * @param payloadArgArray + */ + pushMany (payloadArgArray: T[]) { + for (let item of payloadArgArray) { + this.push(item) + } + } + + /** + * sets a generator to query the next pushed value + * @param generatorArg + */ + setGenerator (generatorArg) { + this.generator = generatorArg + } + + makeBuffered() { + this.buffered = true } subscribe (...args) { return this.observable.subscribe(...args) } + /** + * request the next values in the quantity specified + * @param howManyArg if a generator is set, of a buffer exists, this allows retrieving values + */ + request (howManyArg: number) { + if (howManyArg === 0) { + return + } else { + for (let i = 0; i !== howManyArg; i++) { + if (this.payloadBuffer.length > 0) { + this.internalPush(this.payloadBuffer.shift()) + } else { + const nextPayload = this.generator() + this.internalPush(nextPayload) + } + } + } + } + + /** + * signals the completion of this observable + */ signalComplete () { this.observableFunctions.complete() } + + private internalPush (payloadArg) { + this.observableFunctions.next(payloadArg) + } }