import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult, } from './taskbuffer.classes.distributedcoordinator.js'; import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo } from './taskbuffer.interfaces.js'; export interface ICronJob { cronString: string; taskName: string; job: any; } export interface ITaskManagerConstructorOptions { distributedCoordinator?: AbstractDistributedCoordinator; } export class TaskManager { public randomId = plugins.smartunique.shortId(); public taskMap = new plugins.lik.ObjectMap>(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; constructor(options: ITaskManagerConstructorOptions = {}) { this.options = Object.assign(this.options, options); } public getTaskByName(taskName: string): Task { return this.taskMap.findSync((task) => task.name === taskName); } public addTask(task: Task): void { if (!task.name) { throw new Error('Task must have a name to be added to taskManager'); } this.taskMap.add(task); } public addAndScheduleTask(task: Task, cronString: string) { this.addTask(task); this.scheduleTaskByName(task.name, cronString); } public async triggerTaskByName(taskName: string): Promise { const taskToTrigger = this.getTaskByName(taskName); if (!taskToTrigger) { throw new Error(`No task with the name ${taskName} found.`); } return taskToTrigger.trigger(); } public async triggerTask(task: Task) { return task.trigger(); } public scheduleTaskByName(taskName: string, cronString: string) { const taskToSchedule = this.getTaskByName(taskName); if (!taskToSchedule) { throw new Error(`No task with the name ${taskName} found.`); } this.handleTaskScheduling(taskToSchedule, cronString); } private handleTaskScheduling(task: Task, cronString: string) { const cronJob = this.cronJobManager.addCronjob( cronString, async (triggerTime: number) => { this.logTaskState(task); if (this.options.distributedCoordinator) { const announcementResult = await this.performDistributedConsultation( task, triggerTime, ); if (!announcementResult.shouldTrigger) { console.log('Distributed coordinator result: NOT EXECUTING'); return; } else { console.log('Distributed coordinator result: CHOSEN AND EXECUTING'); } } await task.trigger(); }, ); task.cronJob = cronJob; } private logTaskState(task: Task) { console.log(`Taskbuffer schedule triggered task >>${task.name}<<`); const bufferState = task.buffered ? `buffered with max ${task.bufferMax} buffered calls` : `unbuffered`; console.log(`Task >>${task.name}<< is ${bufferState}`); } private async performDistributedConsultation( task: Task, triggerTime: number, ): Promise { console.log('Found a distributed coordinator, performing consultation.'); return this.options.distributedCoordinator.fireDistributedTaskRequest({ submitterId: this.randomId, requestResponseId: plugins.smartunique.shortId(), status: 'requesting', taskExecutionParallel: 1, taskExecutionTime: triggerTime, taskExecutionTimeout: task.timeout, taskName: task.name, taskVersion: task.version, }); } public descheduleTaskByName(taskName: string) { const task = this.getTaskByName(taskName); if (task && task.cronJob) { this.cronJobManager.removeCronjob(task.cronJob); task.cronJob = null; } if (this.cronJobManager.cronjobs.isEmpty) { this.cronJobManager.stop(); } } public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } public getScheduleForTaskName(taskName: string): string | null { const task = this.getTaskByName(taskName); return task && task.cronJob ? task.cronJob.cronExpression : null; } public async start() { if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.start(); } this.cronJobManager.start(); } public async stop() { this.cronJobManager.stop(); if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.stop(); } } // Get metadata for a specific task public getTaskMetadata(taskName: string): ITaskMetadata | null { const task = this.getTaskByName(taskName); if (!task) return null; return task.getMetadata(); } // Get metadata for all tasks public getAllTasksMetadata(): ITaskMetadata[] { return this.taskMap.getArray().map(task => task.getMetadata()); } // Get scheduled tasks with their schedules and next run times public getScheduledTasks(): IScheduledTaskInfo[] { const scheduledTasks: IScheduledTaskInfo[] = []; for (const task of this.taskMap.getArray()) { if (task.cronJob) { scheduledTasks.push({ name: task.name || 'unnamed', schedule: task.cronJob.cronExpression, nextRun: new Date(task.cronJob.getNextExecutionTime()), lastRun: task.lastRun, steps: task.getStepsMetadata?.(), metadata: task.getMetadata(), }); } } return scheduledTasks; } // Get next scheduled runs across all tasks public getNextScheduledRuns(limit: number = 10): Array<{ taskName: string; nextRun: Date; schedule: string }> { const scheduledRuns = this.getScheduledTasks() .map(task => ({ taskName: task.name, nextRun: task.nextRun, schedule: task.schedule, })) .sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime()) .slice(0, limit); return scheduledRuns; } // Add, execute, and remove a task while collecting metadata public async addExecuteRemoveTask>( task: Task, options?: { schedule?: string; trackProgress?: boolean; } ): Promise { // Add task to manager this.addTask(task); // Optionally schedule it if (options?.schedule) { this.scheduleTaskByName(task.name!, options.schedule); } const startTime = Date.now(); const progressUpdates: Array<{ stepName: string; timestamp: number }> = []; try { // Execute the task const result = await task.trigger(); // Collect execution report const report: ITaskExecutionReport = { taskName: task.name || 'unnamed', startTime, endTime: Date.now(), duration: Date.now() - startTime, steps: task.getStepsMetadata(), stepsCompleted: task.getStepsMetadata() .filter(step => step.status === 'completed') .map(step => step.name), progress: task.getProgress(), result, }; // Remove task from manager this.taskMap.remove(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } return report; } catch (error) { // Create error report const errorReport: ITaskExecutionReport = { taskName: task.name || 'unnamed', startTime, endTime: Date.now(), duration: Date.now() - startTime, steps: task.getStepsMetadata(), stepsCompleted: task.getStepsMetadata() .filter(step => step.status === 'completed') .map(step => step.name), progress: task.getProgress(), error: error as Error, }; // Remove task from manager even on error this.taskMap.remove(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } throw errorReport; } } }