import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult, } from './taskbuffer.classes.distributedcoordinator.js'; import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js'; import { logger } from './taskbuffer.logging.js'; export interface ICronJob { cronString: string; taskName: string; job: any; } export interface ITaskManagerConstructorOptions { distributedCoordinator?: AbstractDistributedCoordinator; } export class TaskManager { public randomId = plugins.smartunique.shortId(); public taskMap = new plugins.lik.ObjectMap>(); public readonly taskSubject = new plugins.smartrx.rxjs.Subject(); private taskSubscriptions = new Map, plugins.smartrx.rxjs.Subscription>(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; constructor(options: ITaskManagerConstructorOptions = {}) { this.options = Object.assign(this.options, options); } public getTaskByName(taskName: string): Task { return this.taskMap.findSync((task) => task.name === taskName); } public addTask(task: Task): void { if (!task.name) { throw new Error('Task must have a name to be added to taskManager'); } this.taskMap.add(task); const subscription = task.eventSubject.subscribe((event) => { this.taskSubject.next(event); }); this.taskSubscriptions.set(task, subscription); } public removeTask(task: Task): void { this.taskMap.remove(task); const subscription = this.taskSubscriptions.get(task); if (subscription) { subscription.unsubscribe(); this.taskSubscriptions.delete(task); } } public addAndScheduleTask(task: Task, cronString: string) { this.addTask(task); this.scheduleTaskByName(task.name, cronString); } public async triggerTaskByName(taskName: string): Promise { const taskToTrigger = this.getTaskByName(taskName); if (!taskToTrigger) { throw new Error(`No task with the name ${taskName} found.`); } return taskToTrigger.trigger(); } public async triggerTask(task: Task) { return task.trigger(); } public scheduleTaskByName(taskName: string, cronString: string) { const taskToSchedule = this.getTaskByName(taskName); if (!taskToSchedule) { throw new Error(`No task with the name ${taskName} found.`); } this.handleTaskScheduling(taskToSchedule, cronString); } private handleTaskScheduling(task: Task, cronString: string) { const cronJob = this.cronJobManager.addCronjob( cronString, async (triggerTime: number) => { this.logTaskState(task); if (this.options.distributedCoordinator) { const announcementResult = await this.performDistributedConsultation( task, triggerTime, ); if (!announcementResult.shouldTrigger) { logger.log('info', 'Distributed coordinator result: NOT EXECUTING'); return; } else { logger.log('info', 'Distributed coordinator result: CHOSEN AND EXECUTING'); } } try { await task.trigger(); } catch (err) { logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`); } }, ); task.cronJob = cronJob; } private logTaskState(task: Task) { logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`); const bufferState = task.buffered ? `buffered with max ${task.bufferMax} buffered calls` : `unbuffered`; logger.log('info', `Task >>${task.name}<< is ${bufferState}`); } private async performDistributedConsultation( task: Task, triggerTime: number, ): Promise { logger.log('info', 'Found a distributed coordinator, performing consultation.'); return this.options.distributedCoordinator.fireDistributedTaskRequest({ submitterId: this.randomId, requestResponseId: plugins.smartunique.shortId(), status: 'requesting', taskExecutionParallel: 1, taskExecutionTime: triggerTime, taskExecutionTimeout: task.timeout, taskName: task.name, taskVersion: task.version, }); } public descheduleTaskByName(taskName: string) { const task = this.getTaskByName(taskName); if (task && task.cronJob) { this.cronJobManager.removeCronjob(task.cronJob); task.cronJob = null; } if (this.cronJobManager.cronjobs.isEmpty) { this.cronJobManager.stop(); } } public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } public getScheduleForTaskName(taskName: string): string | null { const task = this.getTaskByName(taskName); return task && task.cronJob ? task.cronJob.cronExpression : null; } public async start() { if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.start(); } this.cronJobManager.start(); } public async stop() { this.cronJobManager.stop(); if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.stop(); } for (const [, subscription] of this.taskSubscriptions) { subscription.unsubscribe(); } this.taskSubscriptions.clear(); } // Get metadata for a specific task public getTaskMetadata(taskName: string): ITaskMetadata | null { const task = this.getTaskByName(taskName); if (!task) return null; return task.getMetadata(); } // Get metadata for all tasks public getAllTasksMetadata(): ITaskMetadata[] { return this.taskMap.getArray().map(task => task.getMetadata()); } // Get scheduled tasks with their schedules and next run times public getScheduledTasks(): IScheduledTaskInfo[] { const scheduledTasks: IScheduledTaskInfo[] = []; for (const task of this.taskMap.getArray()) { if (task.cronJob) { scheduledTasks.push({ name: task.name || 'unnamed', schedule: task.cronJob.cronExpression, nextRun: new Date(task.cronJob.getNextExecutionTime()), lastRun: task.lastRun, steps: task.getStepsMetadata?.(), metadata: task.getMetadata(), }); } } return scheduledTasks; } // Get next scheduled runs across all tasks public getNextScheduledRuns(limit: number = 10): Array<{ taskName: string; nextRun: Date; schedule: string }> { const scheduledRuns = this.getScheduledTasks() .map(task => ({ taskName: task.name, nextRun: task.nextRun, schedule: task.schedule, })) .sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime()) .slice(0, limit); return scheduledRuns; } public getTasksByLabel(key: string, value: string): Task[] { return this.taskMap.getArray().filter(task => task.labels[key] === value); } public getTasksMetadataByLabel(key: string, value: string): ITaskMetadata[] { return this.getTasksByLabel(key, value).map(task => task.getMetadata()); } // Add, execute, and remove a task while collecting metadata public async addExecuteRemoveTask>( task: Task, options?: { schedule?: string; trackProgress?: boolean; } ): Promise { // Add task to manager this.addTask(task); // Optionally schedule it if (options?.schedule) { this.scheduleTaskByName(task.name!, options.schedule); } const startTime = Date.now(); const progressUpdates: Array<{ stepName: string; timestamp: number }> = []; try { // Execute the task const result = await task.trigger(); // Collect execution report const report: ITaskExecutionReport = { taskName: task.name || 'unnamed', startTime, endTime: Date.now(), duration: Date.now() - startTime, steps: task.getStepsMetadata(), stepsCompleted: task.getStepsMetadata() .filter(step => step.status === 'completed') .map(step => step.name), progress: task.getProgress(), result, }; // Remove task from manager this.removeTask(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } return report; } catch (error) { // Create error report const errorReport: ITaskExecutionReport = { taskName: task.name || 'unnamed', startTime, endTime: Date.now(), duration: Date.now() - startTime, steps: task.getStepsMetadata(), stepsCompleted: task.getStepsMetadata() .filter(step => step.status === 'completed') .map(step => step.name), progress: task.getProgress(), error: error as Error, }; // Remove task from manager even on error this.removeTask(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } throw errorReport; } } }