2022-05-17 21:54:26 +00:00
|
|
|
import { SmartDataDbDoc } from './smartdata.classes.doc.js';
|
2022-05-16 22:33:44 +00:00
|
|
|
import * as plugins from './smartdata.plugins.js';
|
|
|
|
|
|
|
|
/**
|
|
|
|
* a wrapper for the native mongodb cursor. Exposes better
|
|
|
|
*/
|
|
|
|
export class SmartdataDbWatcher<T = any> {
|
|
|
|
// STATIC
|
2022-05-17 19:26:17 +00:00
|
|
|
public readyDeferred = plugins.smartpromise.defer();
|
2022-05-16 22:33:44 +00:00
|
|
|
|
|
|
|
// INSTANCE
|
2022-05-17 21:54:26 +00:00
|
|
|
private changeStream: plugins.mongodb.ChangeStream<T>;
|
|
|
|
|
2022-05-19 14:15:28 +00:00
|
|
|
public changeSubject = new plugins.smartrx.rxjs.Subject<T>();
|
2022-11-01 17:23:57 +00:00
|
|
|
constructor(
|
|
|
|
changeStreamArg: plugins.mongodb.ChangeStream<T>,
|
|
|
|
smartdataDbDocArg: typeof SmartDataDbDoc
|
|
|
|
) {
|
2022-05-16 22:33:44 +00:00
|
|
|
this.changeStream = changeStreamArg;
|
2023-08-16 11:16:39 +00:00
|
|
|
this.changeStream.on('change', async (item: any) => {
|
|
|
|
if (!item.fullDocument) {
|
|
|
|
this.changeSubject.next(null);
|
|
|
|
return;
|
|
|
|
}
|
2022-11-01 17:23:57 +00:00
|
|
|
this.changeSubject.next(
|
2023-08-16 11:16:39 +00:00
|
|
|
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T
|
2022-11-01 17:23:57 +00:00
|
|
|
);
|
|
|
|
});
|
2022-05-17 19:26:17 +00:00
|
|
|
plugins.smartdelay.delayFor(0).then(() => {
|
|
|
|
this.readyDeferred.resolve();
|
|
|
|
});
|
2022-05-16 22:33:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public async close() {
|
|
|
|
await this.changeStream.close();
|
|
|
|
}
|
|
|
|
}
|