Compare commits

..

2 Commits

Author SHA1 Message Date
d6b3896dd3 5.0.32 2023-08-16 13:16:40 +02:00
49b11b17ce fix(core): update 2023-08-16 13:16:39 +02:00
4 changed files with 17 additions and 6 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdata", "name": "@push.rocks/smartdata",
"version": "5.0.31", "version": "5.0.32",
"private": false, "private": false,
"description": "do more with data", "description": "do more with data",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdata', name: '@push.rocks/smartdata',
version: '5.0.31', version: '5.0.32',
description: 'do more with data' description: 'do more with data'
} }

View File

@ -56,8 +56,10 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
public async stop() { public async stop() {
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
if (this.ownInstance?.data.elected) { if (this.distributedWatcher) {
await this.distributedWatcher.close(); await this.distributedWatcher.close();
}
if (this.ownInstance?.data.elected) {
this.ownInstance.data.elected = false; this.ownInstance.data.elected = false;
} }
if (this.ownInstance?.data.status === 'stopped') { if (this.ownInstance?.data.status === 'stopped') {
@ -225,7 +227,12 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
this.distributedWatcher.changeSubject.subscribe({ this.distributedWatcher.changeSubject.subscribe({
next: async (distributedDoc) => { next: async (distributedDoc) => {
console.log(`registered change for distributed doc ${distributedDoc.id}`); if (!distributedDoc) {
console.log(`registered deletion of instance...`);
return;
}
console.log(distributedDoc);
console.log(`registered change for ${distributedDoc.id}`);
distributedDoc; distributedDoc;
}, },
}); });

View File

@ -17,9 +17,13 @@ export class SmartdataDbWatcher<T = any> {
smartdataDbDocArg: typeof SmartDataDbDoc smartdataDbDocArg: typeof SmartDataDbDoc
) { ) {
this.changeStream = changeStreamArg; this.changeStream = changeStreamArg;
this.changeStream.on('change', async (item: T) => { this.changeStream.on('change', async (item: any) => {
if (!item.fullDocument) {
this.changeSubject.next(null);
return;
}
this.changeSubject.next( this.changeSubject.next(
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item) as any as T smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T
); );
}); });
plugins.smartdelay.delayFor(0).then(() => { plugins.smartdelay.delayFor(0).then(() => {