From 84f186924e06b87f7c18cf8025ac9e6930a48e64 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Mon, 14 Nov 2022 14:54:26 +0100 Subject: [PATCH] fix(core): update --- ts/00_commitinfo_data.ts | 2 +- ...skbuffer.classes.distributedcoordinator.ts | 39 ++++++++++++++++++- ts/taskbuffer.classes.task.ts | 7 ++++ ts/taskbuffer.classes.taskmanager.ts | 26 +++++++++++-- 4 files changed, 69 insertions(+), 5 deletions(-) diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 0fe0e23..e0e08f8 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@pushrocks/taskbuffer', - version: '3.0.3', + version: '3.0.4', description: 'flexible task management. TypeScript ready!' } diff --git a/ts/taskbuffer.classes.distributedcoordinator.ts b/ts/taskbuffer.classes.distributedcoordinator.ts index 3746938..69ed85c 100644 --- a/ts/taskbuffer.classes.distributedcoordinator.ts +++ b/ts/taskbuffer.classes.distributedcoordinator.ts @@ -1,5 +1,42 @@ +import { Task } from './taskbuffer.classes.task.js'; import * as plugins from './taskbuffer.plugins.js'; +/** + * constains all data for the final coordinator to actually make an informed decision + */ +export interface IDistributedDecisionInfoBasis { + taskName: string; + taskVersion: string; + taskExecutionTime: number; + taskExecutionTimeout: number; + taskExecutionParallel: number; + status: 'requesting' | 'gotRejected' | 'failed' | 'succeeded'; +} + +export interface ITaskConsultationResult { + considered: boolean; + rank: string; + reason: string; + shouldTrigger: boolean; +} + +export interface IDistributedCoordinatorConstructorOptions { + announceDistributedDecisionInfoBasis: (infoBasisArg: IDistributedDecisionInfoBasis) => Promise + updateDistributedDecisionInfoBasis: (infoBasisArg: IDistributedDecisionInfoBasis) => Promise +} + export class DistributedCoordinator { - + public options: IDistributedCoordinatorConstructorOptions; + + constructor(optionsArg: IDistributedCoordinatorConstructorOptions) { + this.options = optionsArg; + } + + public async announceDistributedDecisionInfoBasis(infoBasisArg: IDistributedDecisionInfoBasis): Promise { + return this.options.announceDistributedDecisionInfoBasis(infoBasisArg); + } + public async updateDistributedDevisionInfoBasis(infoBasisArg: IDistributedDecisionInfoBasis): Promise { + return this.options.updateDistributedDecisionInfoBasis(infoBasisArg) + } + } \ No newline at end of file diff --git a/ts/taskbuffer.classes.task.ts b/ts/taskbuffer.classes.task.ts index db31f57..522410e 100644 --- a/ts/taskbuffer.classes.task.ts +++ b/ts/taskbuffer.classes.task.ts @@ -128,12 +128,19 @@ export class Task { // INSTANCE // mandatory properties public name: string; + /** + * the version of the task + * should follow semver + * might be important for DistributedCoordinator + */ + public version: string; public taskFunction: ITaskFunction; public buffered: boolean; public cronJob: plugins.smarttime.CronJob; public bufferMax: number; public execDelay: number; + public timeout: number; // tasks to run before and after public preTask: Task | TPreOrAfterTaskFunction; diff --git a/ts/taskbuffer.classes.taskmanager.ts b/ts/taskbuffer.classes.taskmanager.ts index 54e65e6..e7167c3 100644 --- a/ts/taskbuffer.classes.taskmanager.ts +++ b/ts/taskbuffer.classes.taskmanager.ts @@ -1,5 +1,6 @@ import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; +import { DistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js'; export interface ICronJob { cronString: string; @@ -7,12 +8,20 @@ export interface ICronJob { job: any; } +export interface ITaskManagerConstructorOptions { + distributedCoordinator?: DistributedCoordinator +} + export class TaskManager { public taskMap = new plugins.lik.ObjectMap(); private cronJobManager = new plugins.smarttime.CronManager(); - constructor() { - // nothing here + public options: ITaskManagerConstructorOptions = { + distributedCoordinator: null + }; + + constructor(optionosArg: ITaskManagerConstructorOptions) { + this.options = Object.assign(this.options, optionosArg); } /** @@ -68,7 +77,7 @@ export class TaskManager { */ public scheduleTaskByName(taskNameArg: string, cronStringArg: string) { const taskToSchedule = this.getTaskByName(taskNameArg); - const cronJob = this.cronJobManager.addCronjob(cronStringArg, async () => { + const cronJob = this.cronJobManager.addCronjob(cronStringArg, async (triggerTimeArg: number) => { console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`); console.log( `task >>${taskToSchedule.name}<< is ${ @@ -77,6 +86,17 @@ export class TaskManager { : `unbuffered` }` ); + if (this.options.distributedCoordinator) { + console.log(`Found a distrubuted coordinator, performing distributed consultation.`); + const announcementResult = this.options.distributedCoordinator.announceDistributedDecisionInfoBasis({ + status: 'requesting', + taskExecutionParallel: 1, + taskExecutionTime: triggerTimeArg, + taskExecutionTimeout: taskToSchedule.timeout, + taskName: taskToSchedule.name, + taskVersion: taskToSchedule.version, + }) + } await taskToSchedule.trigger(); }); taskToSchedule.cronJob = cronJob;