import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js'; export interface ICronJob { cronString: string; taskNameArg: string; job: any; } export interface ITaskManagerConstructorOptions { distributedCoordinator?: AbstractDistributedCoordinator; } export class TaskManager { public randomId = plugins.isounique.uni(); public taskMap = new plugins.lik.ObjectMap(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; constructor(optionsArg: ITaskManagerConstructorOptions = {}) { this.options = Object.assign(this.options, optionsArg); } /** * checks if a task is already present * @param taskNameArg */ public getTaskByName(taskNameArg: string): Task { return this.taskMap.findSync((itemArg) => { return itemArg.name === taskNameArg; }); } /** * adds a Task to the TaskManager * @param taskArg */ public addTask(taskArg: Task): void { if (!taskArg.name) { throw new Error('taskArg needs a name to be added to taskManager'); } this.taskMap.add(taskArg); } /** * adds and schedules a task at once * @param taskArg * @param cronStringArg */ public addAndScheduleTask(taskArg: Task, cronStringArg: string) { this.addTask(taskArg); this.scheduleTaskByName(taskArg.name, cronStringArg); } /** * triggers a task in the TaskManagerByName * @param taskNameArg */ public triggerTaskByName(taskNameArg: string): Promise { const taskToTrigger = this.getTaskByName(taskNameArg); if (!taskToTrigger) { throw new Error(`There is no task with the name of ${taskNameArg}`); } return taskToTrigger.trigger(); } public async triggerTask(task: Task) { return task.trigger(); } /** * schedules the task by name * @param taskNameArg */ public scheduleTaskByName(taskNameArg: string, cronStringArg: string) { const taskToSchedule = this.getTaskByName(taskNameArg); const cronJob = this.cronJobManager.addCronjob( cronStringArg, async (triggerTimeArg: number) => { console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`); console.log( `task >>${taskToSchedule.name}<< is ${ taskToSchedule.buffered ? `buffered with max ${taskToSchedule.bufferMax} buffered calls` : `unbuffered` }` ); if (this.options.distributedCoordinator) { console.log(`Found a distrubuted coordinator, performing distributed consultation.`); const announcementResult = await this.options.distributedCoordinator.fireDistributedTaskRequest({ submitterRandomId: this.randomId, status: 'requesting', taskExecutionParallel: 1, taskExecutionTime: triggerTimeArg, taskExecutionTimeout: taskToSchedule.timeout, taskName: taskToSchedule.name, taskVersion: taskToSchedule.version, }); if (!announcementResult.shouldTrigger) { console.log('distributed coordinator result: NOT EXECUTING'); return; } else { console.log('distributed coordinator result: CHOSEN AND EXECUTING'); } } await taskToSchedule.trigger(); } ); taskToSchedule.cronJob = cronJob; } /** * deschedules a task by name * @param taskNameArg */ public descheduleTaskByName(taskNameArg: string) { const taskToDeSchedule = this.getTaskByName(taskNameArg); if (taskToDeSchedule.cronJob) { this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob); taskToDeSchedule.cronJob = null; } if (this.cronJobManager.cronjobs.isEmpty) { this.cronJobManager.stop(); } } /** * deschedules a task * @param task */ public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } /** * returns the schedule of a specific task * @param taskNameArg */ public getScheduleForTaskName(taskNameArg: string): string | null { const task = this.getTaskByName(taskNameArg); if (!task || !task.cronJob) { return null; } return task.cronJob.cronExpression; } /** * starts the taskmanager */ public start() { this.cronJobManager.start(); } /** * stops the taskmanager */ public stop() { this.cronJobManager.stop(); } }