smartrx/ts/smartrx.classes.observableintake.ts

106 lines
2.5 KiB
TypeScript
Raw Permalink Normal View History

2022-08-05 10:50:37 +00:00
import * as plugins from './smartrx.plugins.js';
2017-11-01 13:59:28 +00:00
2022-08-05 10:50:37 +00:00
import * as rxjs from './smartrx.plugins.rxjs.js';
2022-01-24 06:37:52 +00:00
2017-11-01 13:59:28 +00:00
/**
* ObservableIntake
*/
export class ObservableIntake<T> {
2022-01-24 06:37:52 +00:00
public observable: rxjs.Observable<T>;
2019-09-10 05:25:59 +00:00
public completed: Promise<void>;
2020-05-26 00:18:02 +00:00
private completedDeffered: plugins.smartpromise.Deferred<void>;
2017-11-01 13:59:28 +00:00
private observableFunctions: any = {
2022-01-24 02:44:13 +00:00
next: (payloadArg: T) => {
2017-11-01 13:59:28 +00:00
// nothing
},
2022-01-24 02:44:13 +00:00
complete: (payloadArg: T) => {
2017-11-01 13:59:28 +00:00
// nothing
2020-07-12 01:53:32 +00:00
},
};
2022-01-24 02:44:13 +00:00
private generator: Generator<T> = null;
private buffered = false;
2022-01-24 02:44:13 +00:00
private payloadBuffer: any[] = [];
2017-11-01 13:59:28 +00:00
constructor() {
2022-08-05 10:50:37 +00:00
this.observable = new rxjs.Observable((observerArg: rxjs.Observer<any>) => {
2022-01-24 02:44:13 +00:00
this.observableFunctions.next = (...args: any) => {
return observerArg.next(args);
};
2022-01-24 02:44:13 +00:00
this.observableFunctions.complete = () => {
this.completedDeffered.resolve();
2022-01-24 02:44:13 +00:00
return observerArg.complete();
};
});
this.completedDeffered = plugins.smartpromise.defer();
this.completed = this.completedDeffered.promise;
2017-11-01 13:59:28 +00:00
}
2022-01-24 06:37:52 +00:00
public setObservable(observableFunc: rxjs.Observable<any>) {
2022-01-24 02:44:13 +00:00
this.observable = observableFunc;
2017-11-01 13:59:28 +00:00
}
2019-09-10 05:25:59 +00:00
public push(payloadArg: T) {
2017-11-01 16:01:30 +00:00
if (this.buffered) {
this.payloadBuffer.push(payloadArg);
2017-11-01 16:01:30 +00:00
} else {
this.internalPush(payloadArg);
2017-11-01 16:01:30 +00:00
}
}
/**
* pushes many payloads as array
* @param payloadArgArray
*/
2019-09-10 05:25:59 +00:00
public pushMany(payloadArgArray: T[]) {
for (const item of payloadArgArray) {
this.push(item);
2017-11-01 16:01:30 +00:00
}
}
/**
* sets a generator to query the next pushed value
* @param generatorArg
*/
2022-01-24 02:44:13 +00:00
public setGenerator(generatorArg: Generator<T>) {
this.generator = generatorArg;
2017-11-01 16:01:30 +00:00
}
2019-09-10 05:25:59 +00:00
public makeBuffered() {
this.buffered = true;
2017-11-01 13:59:28 +00:00
}
2022-01-24 02:44:13 +00:00
public subscribe(...args: any) {
return this.observable.subscribe(...args);
2017-11-01 13:59:28 +00:00
}
2017-11-01 16:01:30 +00:00
/**
* request the next values in the quantity specified
* @param howManyArg if a generator is set, of a buffer exists, this allows retrieving values
*/
2019-09-10 05:25:59 +00:00
public request(howManyArg: number) {
2017-11-01 16:01:30 +00:00
if (howManyArg === 0) {
return;
2017-11-01 16:01:30 +00:00
} else {
for (let i = 0; i !== howManyArg; i++) {
if (this.payloadBuffer.length > 0) {
this.internalPush(this.payloadBuffer.shift());
2017-11-01 16:01:30 +00:00
} else {
2022-01-24 02:44:13 +00:00
const nextPayload = this.generator.next();
this.internalPush(nextPayload.value);
2017-11-01 16:01:30 +00:00
}
}
}
}
/**
* signals the completion of this observable
*/
2019-09-10 05:25:59 +00:00
public signalComplete() {
this.observableFunctions.complete();
2017-11-01 13:59:28 +00:00
}
2017-11-01 16:01:30 +00:00
2022-01-24 02:44:13 +00:00
private internalPush(payloadArg: T) {
this.observableFunctions.next(payloadArg);
2017-11-01 16:01:30 +00:00
}
2017-11-01 13:59:28 +00:00
}