From bae776d4e957cc539aaa7f7f3bf6cd1c0a8f6a6b Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Sat, 12 Aug 2023 12:24:10 +0200 Subject: [PATCH] fix(core): update --- ts/00_commitinfo_data.ts | 2 +- ...skbuffer.classes.distributedcoordinator.ts | 22 +-- ts/taskbuffer.classes.taskmanager.ts | 178 +++++++----------- 3 files changed, 79 insertions(+), 123 deletions(-) diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 888b1b5..183754b 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '3.1.1', + version: '3.1.2', description: 'flexible task management. TypeScript ready!' } diff --git a/ts/taskbuffer.classes.distributedcoordinator.ts b/ts/taskbuffer.classes.distributedcoordinator.ts index 55cbb70..2d20c78 100644 --- a/ts/taskbuffer.classes.distributedcoordinator.ts +++ b/ts/taskbuffer.classes.distributedcoordinator.ts @@ -2,13 +2,10 @@ import { Task } from './taskbuffer.classes.task.js'; import * as plugins from './taskbuffer.plugins.js'; /** - * constains all data for the final coordinator to actually make an informed decision + * Contains all data for the final coordinator to make an informed decision. */ export interface IDistributedTaskRequest { - /** - * this needs to correlate to the consultationResult - */ - submitterRandomId: string; + submitterId: string; taskName: string; taskVersion: string; taskExecutionTime: number; @@ -18,24 +15,19 @@ export interface IDistributedTaskRequest { } export interface IDistributedTaskRequestResult { - /** - * this needs to correlate to the decisionInfoBasis - */ - submitterRandomId: string; - /** - * can be used while debugging - */ + submitterId: string; considered: boolean; - rank: string; + rank: number; reason: string; shouldTrigger: boolean; } export abstract class AbstractDistributedCoordinator { public abstract fireDistributedTaskRequest( - infoBasisArg: IDistributedTaskRequest + infoBasis: IDistributedTaskRequest ): Promise; + public abstract updateDistributedTaskRequest( - infoBasisArg: IDistributedTaskRequest + infoBasis: IDistributedTaskRequest ): Promise; } diff --git a/ts/taskbuffer.classes.taskmanager.ts b/ts/taskbuffer.classes.taskmanager.ts index 46ff0e6..ed66224 100644 --- a/ts/taskbuffer.classes.taskmanager.ts +++ b/ts/taskbuffer.classes.taskmanager.ts @@ -1,10 +1,10 @@ import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; -import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js'; +import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js'; export interface ICronJob { cronString: string; - taskNameArg: string; + taskName: string; job: any; } @@ -16,54 +16,34 @@ export class TaskManager { public randomId = plugins.isounique.uni(); public taskMap = new plugins.lik.ObjectMap(); private cronJobManager = new plugins.smarttime.CronManager(); - public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; - constructor(optionsArg: ITaskManagerConstructorOptions = {}) { - this.options = Object.assign(this.options, optionsArg); + constructor(options: ITaskManagerConstructorOptions = {}) { + this.options = Object.assign(this.options, options); } - /** - * checks if a task is already present - * @param taskNameArg - */ - public getTaskByName(taskNameArg: string): Task { - return this.taskMap.findSync((itemArg) => { - return itemArg.name === taskNameArg; - }); + public getTaskByName(taskName: string): Task { + return this.taskMap.findSync((task) => task.name === taskName); } - /** - * adds a Task to the TaskManager - * @param taskArg - */ - public addTask(taskArg: Task): void { - if (!taskArg.name) { - throw new Error('taskArg needs a name to be added to taskManager'); + public addTask(task: Task): void { + if (!task.name) { + throw new Error('Task must have a name to be added to taskManager'); } - this.taskMap.add(taskArg); + this.taskMap.add(task); } - /** - * adds and schedules a task at once - * @param taskArg - * @param cronStringArg - */ - public addAndScheduleTask(taskArg: Task, cronStringArg: string) { - this.addTask(taskArg); - this.scheduleTaskByName(taskArg.name, cronStringArg); + public addAndScheduleTask(task: Task, cronString: string) { + this.addTask(task); + this.scheduleTaskByName(task.name, cronString); } - /** - * triggers a task in the TaskManagerByName - * @param taskNameArg - */ - public triggerTaskByName(taskNameArg: string): Promise { - const taskToTrigger = this.getTaskByName(taskNameArg); + public async triggerTaskByName(taskName: string): Promise { + const taskToTrigger = this.getTaskByName(taskName); if (!taskToTrigger) { - throw new Error(`There is no task with the name of ${taskNameArg}`); + throw new Error(`No task with the name ${taskName} found.`); } return taskToTrigger.trigger(); } @@ -72,95 +52,79 @@ export class TaskManager { return task.trigger(); } - /** - * schedules the task by name - * @param taskNameArg - */ - public scheduleTaskByName(taskNameArg: string, cronStringArg: string) { - const taskToSchedule = this.getTaskByName(taskNameArg); - const cronJob = this.cronJobManager.addCronjob( - cronStringArg, - async (triggerTimeArg: number) => { - console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`); - console.log( - `task >>${taskToSchedule.name}<< is ${ - taskToSchedule.buffered - ? `buffered with max ${taskToSchedule.bufferMax} buffered calls` - : `unbuffered` - }` - ); - if (this.options.distributedCoordinator) { - console.log(`Found a distrubuted coordinator, performing distributed consultation.`); - const announcementResult = - await this.options.distributedCoordinator.fireDistributedTaskRequest({ - submitterRandomId: this.randomId, - status: 'requesting', - taskExecutionParallel: 1, - taskExecutionTime: triggerTimeArg, - taskExecutionTimeout: taskToSchedule.timeout, - taskName: taskToSchedule.name, - taskVersion: taskToSchedule.version, - }); - - if (!announcementResult.shouldTrigger) { - console.log('distributed coordinator result: NOT EXECUTING'); - return; - } else { - console.log('distributed coordinator result: CHOSEN AND EXECUTING'); - } - } - await taskToSchedule.trigger(); - } - ); - taskToSchedule.cronJob = cronJob; + public scheduleTaskByName(taskName: string, cronString: string) { + const taskToSchedule = this.getTaskByName(taskName); + if (!taskToSchedule) { + throw new Error(`No task with the name ${taskName} found.`); + } + this.handleTaskScheduling(taskToSchedule, cronString); } - /** - * deschedules a task by name - * @param taskNameArg - */ - public descheduleTaskByName(taskNameArg: string) { - const taskToDeSchedule = this.getTaskByName(taskNameArg); - if (taskToDeSchedule.cronJob) { - this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob); - taskToDeSchedule.cronJob = null; + private handleTaskScheduling(task: Task, cronString: string) { + const cronJob = this.cronJobManager.addCronjob( + cronString, + async (triggerTime: number) => { + this.logTaskState(task); + if (this.options.distributedCoordinator) { + const announcementResult = await this.performDistributedConsultation(task, triggerTime); + if (!announcementResult.shouldTrigger) { + console.log('Distributed coordinator result: NOT EXECUTING'); + return; + } else { + console.log('Distributed coordinator result: CHOSEN AND EXECUTING'); + } + } + await task.trigger(); + } + ); + task.cronJob = cronJob; + } + + private logTaskState(task: Task) { + console.log(`Taskbuffer schedule triggered task >>${task.name}<<`); + const bufferState = task.buffered + ? `buffered with max ${task.bufferMax} buffered calls` + : `unbuffered`; + console.log(`Task >>${task.name}<< is ${bufferState}`); + } + + private async performDistributedConsultation(task: Task, triggerTime: number): Promise { + console.log('Found a distributed coordinator, performing consultation.'); + return this.options.distributedCoordinator.fireDistributedTaskRequest({ + submitterId: this.randomId, + status: 'requesting', + taskExecutionParallel: 1, + taskExecutionTime: triggerTime, + taskExecutionTimeout: task.timeout, + taskName: task.name, + taskVersion: task.version, + }); + } + + public descheduleTaskByName(taskName: string) { + const task = this.getTaskByName(taskName); + if (task && task.cronJob) { + this.cronJobManager.removeCronjob(task.cronJob); + task.cronJob = null; } if (this.cronJobManager.cronjobs.isEmpty) { this.cronJobManager.stop(); } } - /** - * deschedules a task - * @param task - */ public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } - /** - * returns the schedule of a specific task - * @param taskNameArg - */ -public getScheduleForTaskName(taskNameArg: string): string | null { - const task = this.getTaskByName(taskNameArg); - if (!task || !task.cronJob) { - return null; + public getScheduleForTaskName(taskName: string): string | null { + const task = this.getTaskByName(taskName); + return task && task.cronJob ? task.cronJob.cronExpression : null; } - return task.cronJob.cronExpression; -} - - /** - * starts the taskmanager - */ public start() { this.cronJobManager.start(); } - /** - * stops the taskmanager - */ public stop() { this.cronJobManager.stop(); }