import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js'; export interface ICronJob { cronString: string; taskName: string; job: any; } export interface ITaskManagerConstructorOptions { distributedCoordinator?: AbstractDistributedCoordinator; } export class TaskManager { public randomId = plugins.smartunique.shortId(); public taskMap = new plugins.lik.ObjectMap(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; constructor(options: ITaskManagerConstructorOptions = {}) { this.options = Object.assign(this.options, options); } public getTaskByName(taskName: string): Task { return this.taskMap.findSync((task) => task.name === taskName); } public addTask(task: Task): void { if (!task.name) { throw new Error('Task must have a name to be added to taskManager'); } this.taskMap.add(task); } public addAndScheduleTask(task: Task, cronString: string) { this.addTask(task); this.scheduleTaskByName(task.name, cronString); } public async triggerTaskByName(taskName: string): Promise { const taskToTrigger = this.getTaskByName(taskName); if (!taskToTrigger) { throw new Error(`No task with the name ${taskName} found.`); } return taskToTrigger.trigger(); } public async triggerTask(task: Task) { return task.trigger(); } public scheduleTaskByName(taskName: string, cronString: string) { const taskToSchedule = this.getTaskByName(taskName); if (!taskToSchedule) { throw new Error(`No task with the name ${taskName} found.`); } this.handleTaskScheduling(taskToSchedule, cronString); } private handleTaskScheduling(task: Task, cronString: string) { const cronJob = this.cronJobManager.addCronjob( cronString, async (triggerTime: number) => { this.logTaskState(task); if (this.options.distributedCoordinator) { const announcementResult = await this.performDistributedConsultation(task, triggerTime); if (!announcementResult.shouldTrigger) { console.log('Distributed coordinator result: NOT EXECUTING'); return; } else { console.log('Distributed coordinator result: CHOSEN AND EXECUTING'); } } await task.trigger(); } ); task.cronJob = cronJob; } private logTaskState(task: Task) { console.log(`Taskbuffer schedule triggered task >>${task.name}<<`); const bufferState = task.buffered ? `buffered with max ${task.bufferMax} buffered calls` : `unbuffered`; console.log(`Task >>${task.name}<< is ${bufferState}`); } private async performDistributedConsultation(task: Task, triggerTime: number): Promise { console.log('Found a distributed coordinator, performing consultation.'); return this.options.distributedCoordinator.fireDistributedTaskRequest({ submitterId: this.randomId, requestResponseId: plugins.smartunique.shortId(), status: 'requesting', taskExecutionParallel: 1, taskExecutionTime: triggerTime, taskExecutionTimeout: task.timeout, taskName: task.name, taskVersion: task.version, }); } public descheduleTaskByName(taskName: string) { const task = this.getTaskByName(taskName); if (task && task.cronJob) { this.cronJobManager.removeCronjob(task.cronJob); task.cronJob = null; } if (this.cronJobManager.cronjobs.isEmpty) { this.cronJobManager.stop(); } } public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } public getScheduleForTaskName(taskName: string): string | null { const task = this.getTaskByName(taskName); return task && task.cronJob ? task.cronJob.cronExpression : null; } public async start() { if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.start(); } this.cronJobManager.start(); } public async stop() { this.cronJobManager.stop(); if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.stop(); } } }