import * as plugins from './smartdata.plugins.js'; import { SmartdataDb } from './smartdata.classes.db.js'; import { Manager, setDefaultManagerForDoc } from './smartdata.classes.collection.js'; import { SmartDataDbDoc, svDb, unI } from './smartdata.classes.doc.js'; @Manager() class DistributedClass extends SmartDataDbDoc { // INSTANCE @unI() public id: string; @svDb() public data: { status: 'bidding' | 'settled' | 'initializing' | 'stopped'; biddingShortcode?: string; lastUpdated: number; elected: boolean; /** * used to store request */ taskRequests: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest[]; /** * only used by the leader to convey consultation results */ taskRequestResults: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequestResult[]; }; } /** * This file implements a distributed coordinator according to the @pushrocks/taskbuffer standard. * you should not set up this yourself. Instead, there is a factory on the SmartdataDb class * that will take care of setting this up. */ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distributedCoordination .AbstractDistributedCoordinator { public readyPromise: Promise; public db: SmartdataDb; private asyncExecutionStack = new plugins.lik.AsyncExecutionStack(); private ownInstance: DistributedClass; constructor(dbArg?: SmartdataDb) { super(); this.db = dbArg; setDefaultManagerForDoc(this, DistributedClass); this.readyPromise = this.db.statusConnectedDeferred.promise; this.init(); } // smartdata specific stuff public async start() { await this.init(); } public async stop() { if (this.ownInstance?.data.elected) { this.ownInstance.data.elected = false; } else { console.log(`can't stop a distributed instance that has not been started yet.`); } } public id = plugins.smartunique.uni('distributedInstance'); private startHeartbeat = async () => { while (this.ownInstance.data.status !== 'stopped') { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.ownInstance.updateFromDb(); this.ownInstance.data.lastUpdated = Date.now(); await this.ownInstance.save(); }); await plugins.smartdelay.delayFor(10000); } }; public async init() { await this.readyPromise; if (!this.ownInstance) { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { this.ownInstance = new DistributedClass(); this.ownInstance.id = this.id; this.ownInstance.data = { elected: false, lastUpdated: Date.now(), status: 'initializing', taskRequests: [], taskRequestResults: [], }; await this.ownInstance.save(); }); } // lets enable the heartbeat this.startHeartbeat(); // lets do a leader check await this.checkAndMaybeLead(); return this.ownInstance; } // --> leader election public async checkAndMaybeLead() { const allInstances = await DistributedClass.getInstances({}); let leader = allInstances.find((instanceArg) => instanceArg.data.elected === true); if ( leader && leader.data.lastUpdated >= Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ minutes: 1 }) ) { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.ownInstance.updateFromDb(); this.ownInstance.data.status = 'settled'; await this.ownInstance.save(); }); return; } else { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { this.ownInstance.data.status = 'bidding'; this.ownInstance.data.biddingShortcode = plugins.smartunique.shortId(); await this.ownInstance.save(); }); await plugins.smartdelay.delayFor(plugins.smarttime.getMilliSecondsFromUnits({ minutes: 2 })); await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { let biddingInstances = await DistributedClass.getInstances({}); biddingInstances = biddingInstances.filter( (instanceArg) => !instanceArg.data.elected && instanceArg.data.lastUpdated >= Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ minutes: 1 }) ); this.ownInstance.data.elected = true; for (const biddingInstance of biddingInstances) { if (biddingInstance.data.biddingShortcode < this.ownInstance.data.biddingShortcode) { this.ownInstance.data.elected = false; } } await this.ownInstance.save(); }); if (this.ownInstance.data.elected) { this.leadFunction(); } } } /** * when it has been determined * that this instance is leading * the leading is implemented here */ public async leadFunction() { const ownInstance = await this.init(); const watcher = await DistributedClass.watch({}); /** * this function is started once per unique job request */ const startResultTimer = async () => {}; watcher.changeSubject.subscribe({ next: async (distributedDoc) => { distributedDoc; }, }); while (this.ownInstance.data.status !== 'stopped') { await plugins.smartdelay.delayFor(1000); } } // abstract implemented methods public async fireDistributedTaskRequest( taskRequestArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest ): Promise { const ownInstance = await this.init(); await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { this.ownInstance.data.taskRequests.push(taskRequestArg); await this.ownInstance.save(); }); return null; } public async updateDistributedTaskRequest( infoBasisArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest ): Promise { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { const existingInfoBasis = this.ownInstance.data.taskRequests.find((infoBasisItem) => { return ( infoBasisItem.taskName === infoBasisArg.taskName && infoBasisItem.taskExecutionTime === infoBasisArg.taskExecutionTime ); }); Object.assign(existingInfoBasis, infoBasisArg); await this.ownInstance.save(); plugins.smartdelay.delayFor(60000).then(() => { this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { const indexToRemove = this.ownInstance.data.taskRequests.indexOf(existingInfoBasis); this.ownInstance.data.taskRequests.splice(indexToRemove, indexToRemove); await this.ownInstance.save(); }); }); }); } }