From 7f9983382a8df7574597bdd3aeaddd3057ad12ca Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Wed, 16 Aug 2023 12:08:27 +0200 Subject: [PATCH] fix(core): update --- test/test.distributedcoordinator.ts | 3 +- ts/00_commitinfo_data.ts | 2 +- ...martdata.classes.distributedcoordinator.ts | 34 ++++++++++++++++--- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/test/test.distributedcoordinator.ts b/test/test.distributedcoordinator.ts index 2a70fdf..1014a06 100644 --- a/test/test.distributedcoordinator.ts +++ b/test/test.distributedcoordinator.ts @@ -55,6 +55,7 @@ tap.test('SmartdataDistributedCoordinator should handle distributed task request const mockTaskRequest: taskbuffer.distributedCoordination.IDistributedTaskRequest = { submitterId: "mockSubmitter12345", // Some unique mock submitter ID + requestResponseId: 'uni879873462hjhfkjhsdf', // Some unique ID for the request-response taskName: "SampleTask", taskVersion: "1.0.0", // Assuming it's a version string taskExecutionTime: Date.now(), @@ -105,7 +106,7 @@ tap.test('should elect only one leader amongst multiple instances', async (tools tap.test('should clean up', async () => { await smartmongoInstance.stopAndDumpToDir(`.nogit/testdata/`); - setTimeout(() => process.exit(0), 1000); + setTimeout(() => process.exit(), 2000); }) tap.start({ throwOnError: true }); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 8ce9e9f..76f90ad 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartdata', - version: '5.0.30', + version: '5.0.31', description: 'do more with data' } diff --git a/ts/smartdata.classes.distributedcoordinator.ts b/ts/smartdata.classes.distributedcoordinator.ts index 6c338ef..fb2995b 100644 --- a/ts/smartdata.classes.distributedcoordinator.ts +++ b/ts/smartdata.classes.distributedcoordinator.ts @@ -2,6 +2,7 @@ import * as plugins from './smartdata.plugins.js'; import { SmartdataDb } from './smartdata.classes.db.js'; import { Manager, setDefaultManagerForDoc } from './smartdata.classes.collection.js'; import { SmartDataDbDoc, svDb, unI } from './smartdata.classes.doc.js'; +import { SmartdataDbWatcher } from './smartdata.classes.watcher.js'; @Manager() export class DistributedClass extends SmartDataDbDoc { @@ -39,6 +40,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu public db: SmartdataDb; private asyncExecutionStack = new plugins.lik.AsyncExecutionStack(); public ownInstance: DistributedClass; + public distributedWatcher: SmartdataDbWatcher; constructor(dbArg: SmartdataDb) { super(); @@ -55,6 +57,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu public async stop() { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { if (this.ownInstance?.data.elected) { + await this.distributedWatcher.close(); this.ownInstance.data.elected = false; } if (this.ownInstance?.data.status === 'stopped') { @@ -208,12 +211,25 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu * the leading is implemented here */ public async leadFunction() { - const watcher = await DistributedClass.watch({}); - watcher.changeSubject.subscribe({ + this.distributedWatcher = await DistributedClass.watch({}); + + const currentTaskRequests: Array<{ + taskName: string; + taskExecutionTime: number; + /** + * all instances that requested this task + */ + requestingDistibutedInstanceIds: string[]; + responseTimeout: plugins.smartdelay.Timeout; + }> = []; + + this.distributedWatcher.changeSubject.subscribe({ next: async (distributedDoc) => { + console.log(`registered change for distributed doc ${distributedDoc.id}`); distributedDoc; }, }); + while (this.ownInstance.data.status !== 'stopped' && this.ownInstance.data.elected) { const allInstances = await DistributedClass.getInstances({}); for (const instance of allInstances) { @@ -239,8 +255,18 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu await this.ownInstance.save(); }); await plugins.smartdelay.delayFor(10000); - - return null; + const result = await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { + await this.ownInstance.updateFromDb(); + const taskRequestResult = this.ownInstance.data.taskRequestResults.find((resultItem) => { + return resultItem.requestResponseId === taskRequestArg.requestResponseId; + }); + return taskRequestResult; + }); + if (!result) { + console.warn('no result found for task request...'); + return null; + } + return result; } public async updateDistributedTaskRequest(