import * as plugins from './smartdata.plugins.js'; import { SmartdataDb } from './smartdata.classes.db.js'; import { managed, setDefaultManagerForDoc } from './smartdata.classes.collection.js'; import { SmartDataDbDoc, svDb, unI } from './smartdata.classes.doc.js'; import { SmartdataDbWatcher } from './smartdata.classes.watcher.js'; @managed() export class DistributedClass extends SmartDataDbDoc { // INSTANCE @unI() public id: string; @svDb() public data: { status: 'initializing' | 'bidding' | 'settled' | 'stopped'; biddingShortcode?: string; biddingStartTime?: number; lastUpdated: number; elected: boolean; /** * used to store request */ taskRequests: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest[]; /** * only used by the leader to convey consultation results */ taskRequestResults: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequestResult[]; }; } /** * This file implements a distributed coordinator according to the @pushrocks/taskbuffer standard. * you should not set up this yourself. Instead, there is a factory on the SmartdataDb class * that will take care of setting this up. */ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distributedCoordination .AbstractDistributedCoordinator { public readyPromise: Promise; public db: SmartdataDb; private asyncExecutionStack = new plugins.lik.AsyncExecutionStack(); public ownInstance: DistributedClass; public distributedWatcher: SmartdataDbWatcher; constructor(dbArg: SmartdataDb) { super(); this.db = dbArg; setDefaultManagerForDoc(this, DistributedClass); this.readyPromise = this.db.statusConnectedDeferred.promise; } // smartdata specific stuff public async start() { await this.init(); } public async stop() { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { if (this.distributedWatcher) { await this.distributedWatcher.close(); } if (this.ownInstance?.data.elected) { this.ownInstance.data.elected = false; } if (this.ownInstance?.data.status === 'stopped') { console.log(`stopping a distributed instance that has not been started yet.`); } this.ownInstance.data.status = 'stopped'; await this.ownInstance.save(); console.log(`stopped ${this.ownInstance.id}`); }); } public id = plugins.smartunique.uni('distributedInstance'); private startHeartbeat = async () => { while (this.ownInstance.data.status !== 'stopped') { await this.sendHeartbeat(); await plugins.smartdelay.delayForRandom(5000, 10000); } }; public async sendHeartbeat() { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { if (this.ownInstance.data.status === 'stopped') { console.log(`aborted sending heartbeat because status is stopped`); return; } await this.ownInstance.updateFromDb(); this.ownInstance.data.lastUpdated = Date.now(); await this.ownInstance.save(); console.log(`sent heartbeat for ${this.ownInstance.id}`); const allInstances = DistributedClass.getInstances({}); }); if (this.ownInstance.data.status === 'stopped') { console.log(`aborted sending heartbeat because status is stopped`); return; } const eligibleLeader = await this.getEligibleLeader(); // not awaiting here because we don't want to block the heartbeat this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { if (!eligibleLeader && this.ownInstance.data.status === 'settled') { this.checkAndMaybeLead(); } }); } private async init() { await this.readyPromise; if (!this.ownInstance) { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { this.ownInstance = new DistributedClass(); this.ownInstance.id = this.id; this.ownInstance.data = { elected: false, lastUpdated: Date.now(), status: 'initializing', taskRequests: [], taskRequestResults: [], }; await this.ownInstance.save(); }); } else { console.warn(`distributed instance already initialized`); } // lets enable the heartbeat this.startHeartbeat(); // lets do a leader check await this.checkAndMaybeLead(); return this.ownInstance; } public async getEligibleLeader() { return this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { const allInstances = await DistributedClass.getInstances({}); let leaders = allInstances.filter((instanceArg) => instanceArg.data.elected === true); const eligibleLeader = leaders.find( (leader) => leader.data.lastUpdated >= Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ seconds: 20 }) ); return eligibleLeader; }); } // --> leader election public async checkAndMaybeLead() { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { this.ownInstance.data.status = 'initializing'; this.ownInstance.save(); }); if (await this.getEligibleLeader()) { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.ownInstance.updateFromDb(); this.ownInstance.data.status = 'settled'; await this.ownInstance.save(); console.log(`${this.ownInstance.id} settled as follower`); }); return; } else if ( (await DistributedClass.getInstances({})).find((instanceArg) => { instanceArg.data.status === 'bidding' && instanceArg.data.biddingStartTime <= Date.now() - 4000 && instanceArg.data.biddingStartTime >= Date.now() - 30000; }) ) { console.log('too late to the bidding party... waiting for next round.'); return; } else { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.ownInstance.updateFromDb(); this.ownInstance.data.status = 'bidding'; this.ownInstance.data.biddingStartTime = Date.now(); this.ownInstance.data.biddingShortcode = plugins.smartunique.shortId(); await this.ownInstance.save(); console.log('bidding code stored.'); }); console.log(`bidding for leadership...`); await plugins.smartdelay.delayFor( plugins.smarttime.getMilliSecondsFromUnits({ seconds: 5 }) ); await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { let biddingInstances = await DistributedClass.getInstances({}); biddingInstances = biddingInstances.filter( (instanceArg) => instanceArg.data.status === 'bidding' && instanceArg.data.lastUpdated >= Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ seconds: 10 }) ); console.log(`found ${biddingInstances.length} bidding instances...`); this.ownInstance.data.elected = true; for (const biddingInstance of biddingInstances) { if (biddingInstance.data.biddingShortcode < this.ownInstance.data.biddingShortcode) { this.ownInstance.data.elected = false; } } await plugins.smartdelay.delayFor(5000); console.log(`settling with status elected = ${this.ownInstance.data.elected}`); this.ownInstance.data.status = 'settled'; await this.ownInstance.save(); }); if (this.ownInstance.data.elected) { this.leadFunction(); } } } /** * when it has been determined * that this instance is leading * the leading is implemented here */ public async leadFunction() { this.distributedWatcher = await DistributedClass.watch({}); const currentTaskRequests: Array<{ taskName: string; taskExecutionTime: number; /** * all instances that requested this task */ requestingDistibutedInstanceIds: string[]; responseTimeout: plugins.smartdelay.Timeout; }> = []; this.distributedWatcher.changeSubject.subscribe({ next: async (distributedDoc) => { if (!distributedDoc) { console.log(`registered deletion of instance...`); return; } console.log(distributedDoc); console.log(`registered change for ${distributedDoc.id}`); distributedDoc; }, }); while (this.ownInstance.data.status !== 'stopped' && this.ownInstance.data.elected) { const allInstances = await DistributedClass.getInstances({}); for (const instance of allInstances) { if (instance.data.status === 'stopped') { await instance.delete(); }; } await plugins.smartdelay.delayFor(10000); } } // abstract implemented methods public async fireDistributedTaskRequest( taskRequestArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest ): Promise { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { if (!this.ownInstance) { console.error('instance need to be started first...'); return; } await this.ownInstance.updateFromDb(); this.ownInstance.data.taskRequests.push(taskRequestArg); await this.ownInstance.save(); }); await plugins.smartdelay.delayFor(10000); const result = await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.ownInstance.updateFromDb(); const taskRequestResult = this.ownInstance.data.taskRequestResults.find((resultItem) => { return resultItem.requestResponseId === taskRequestArg.requestResponseId; }); return taskRequestResult; }); if (!result) { console.warn('no result found for task request...'); return null; } return result; } public async updateDistributedTaskRequest( infoBasisArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest ): Promise { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { const existingInfoBasis = this.ownInstance.data.taskRequests.find((infoBasisItem) => { return ( infoBasisItem.taskName === infoBasisArg.taskName && infoBasisItem.taskExecutionTime === infoBasisArg.taskExecutionTime ); }); if (!existingInfoBasis) { console.warn('trying to update a non existing task request... aborting!'); return; } Object.assign(existingInfoBasis, infoBasisArg); await this.ownInstance.save(); plugins.smartdelay.delayFor(60000).then(() => { this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { const indexToRemove = this.ownInstance.data.taskRequests.indexOf(existingInfoBasis); this.ownInstance.data.taskRequests.splice(indexToRemove, indexToRemove); await this.ownInstance.save(); }); }); }); } }