2018-10-10 14:44:44 +00:00
|
|
|
import * as plugins from './smartrx.plugins';
|
2017-11-01 13:59:28 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* ObservableIntake
|
|
|
|
*/
|
|
|
|
export class ObservableIntake<T> {
|
2020-05-26 00:18:02 +00:00
|
|
|
public observable: plugins.rxjs.Observable<T>;
|
2019-09-10 05:25:59 +00:00
|
|
|
public completed: Promise<void>;
|
2020-05-26 00:18:02 +00:00
|
|
|
private completedDeffered: plugins.smartpromise.Deferred<void>;
|
2017-11-01 13:59:28 +00:00
|
|
|
private observableFunctions: any = {
|
2022-01-24 02:44:13 +00:00
|
|
|
next: (payloadArg: T) => {
|
2017-11-01 13:59:28 +00:00
|
|
|
// nothing
|
|
|
|
},
|
2022-01-24 02:44:13 +00:00
|
|
|
complete: (payloadArg: T) => {
|
2017-11-01 13:59:28 +00:00
|
|
|
// nothing
|
2020-07-12 01:53:32 +00:00
|
|
|
},
|
2018-10-10 14:44:44 +00:00
|
|
|
};
|
2022-01-24 02:44:13 +00:00
|
|
|
private generator: Generator<T> = null;
|
2018-10-10 14:44:44 +00:00
|
|
|
private buffered = false;
|
2022-01-24 02:44:13 +00:00
|
|
|
private payloadBuffer: any[] = [];
|
2017-11-01 13:59:28 +00:00
|
|
|
|
2018-10-10 14:44:44 +00:00
|
|
|
constructor() {
|
2022-01-24 02:44:13 +00:00
|
|
|
this.observable = plugins.rxjs.Observable.create((observerArg: plugins.rxjs.Observer<any>) => {
|
|
|
|
this.observableFunctions.next = (...args: any) => {
|
|
|
|
return observerArg.next(args);
|
2018-10-10 14:44:44 +00:00
|
|
|
};
|
2022-01-24 02:44:13 +00:00
|
|
|
this.observableFunctions.complete = () => {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.completedDeffered.resolve();
|
2022-01-24 02:44:13 +00:00
|
|
|
return observerArg.complete();
|
2018-10-10 14:44:44 +00:00
|
|
|
};
|
|
|
|
});
|
|
|
|
this.completedDeffered = plugins.smartpromise.defer();
|
|
|
|
this.completed = this.completedDeffered.promise;
|
2017-11-01 13:59:28 +00:00
|
|
|
}
|
|
|
|
|
2022-01-24 02:44:13 +00:00
|
|
|
public setObservable(observableFunc: plugins.rxjs.Observable<any>) {
|
|
|
|
this.observable = observableFunc;
|
2017-11-01 13:59:28 +00:00
|
|
|
}
|
|
|
|
|
2019-09-10 05:25:59 +00:00
|
|
|
public push(payloadArg: T) {
|
2017-11-01 16:01:30 +00:00
|
|
|
if (this.buffered) {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.payloadBuffer.push(payloadArg);
|
2017-11-01 16:01:30 +00:00
|
|
|
} else {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.internalPush(payloadArg);
|
2017-11-01 16:01:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* pushes many payloads as array
|
|
|
|
* @param payloadArgArray
|
|
|
|
*/
|
2019-09-10 05:25:59 +00:00
|
|
|
public pushMany(payloadArgArray: T[]) {
|
|
|
|
for (const item of payloadArgArray) {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.push(item);
|
2017-11-01 16:01:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* sets a generator to query the next pushed value
|
|
|
|
* @param generatorArg
|
|
|
|
*/
|
2022-01-24 02:44:13 +00:00
|
|
|
public setGenerator(generatorArg: Generator<T>) {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.generator = generatorArg;
|
2017-11-01 16:01:30 +00:00
|
|
|
}
|
|
|
|
|
2019-09-10 05:25:59 +00:00
|
|
|
public makeBuffered() {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.buffered = true;
|
2017-11-01 13:59:28 +00:00
|
|
|
}
|
|
|
|
|
2022-01-24 02:44:13 +00:00
|
|
|
public subscribe(...args: any) {
|
2018-10-10 14:44:44 +00:00
|
|
|
return this.observable.subscribe(...args);
|
2017-11-01 13:59:28 +00:00
|
|
|
}
|
|
|
|
|
2017-11-01 16:01:30 +00:00
|
|
|
/**
|
|
|
|
* request the next values in the quantity specified
|
|
|
|
* @param howManyArg if a generator is set, of a buffer exists, this allows retrieving values
|
|
|
|
*/
|
2019-09-10 05:25:59 +00:00
|
|
|
public request(howManyArg: number) {
|
2017-11-01 16:01:30 +00:00
|
|
|
if (howManyArg === 0) {
|
2018-10-10 14:44:44 +00:00
|
|
|
return;
|
2017-11-01 16:01:30 +00:00
|
|
|
} else {
|
|
|
|
for (let i = 0; i !== howManyArg; i++) {
|
|
|
|
if (this.payloadBuffer.length > 0) {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.internalPush(this.payloadBuffer.shift());
|
2017-11-01 16:01:30 +00:00
|
|
|
} else {
|
2022-01-24 02:44:13 +00:00
|
|
|
const nextPayload = this.generator.next();
|
|
|
|
this.internalPush(nextPayload.value);
|
2017-11-01 16:01:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* signals the completion of this observable
|
|
|
|
*/
|
2019-09-10 05:25:59 +00:00
|
|
|
public signalComplete() {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.observableFunctions.complete();
|
2017-11-01 13:59:28 +00:00
|
|
|
}
|
2017-11-01 16:01:30 +00:00
|
|
|
|
2022-01-24 02:44:13 +00:00
|
|
|
private internalPush(payloadArg: T) {
|
2018-10-10 14:44:44 +00:00
|
|
|
this.observableFunctions.next(payloadArg);
|
2017-11-01 16:01:30 +00:00
|
|
|
}
|
2017-11-01 13:59:28 +00:00
|
|
|
}
|