import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult, } from './taskbuffer.classes.distributedcoordinator.js'; import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js'; import { logger } from './taskbuffer.logging.js'; export interface ICronJob { cronString: string; taskName: string; job: any; } export interface ITaskManagerConstructorOptions { distributedCoordinator?: AbstractDistributedCoordinator; } export class TaskManager { public randomId = plugins.smartunique.shortId(); public taskMap = new plugins.lik.ObjectMap>(); public readonly taskSubject = new plugins.smartrx.rxjs.Subject(); private taskSubscriptions = new Map, plugins.smartrx.rxjs.Subscription>(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; // Constraint system public constraintGroups: TaskConstraintGroup[] = []; private constraintQueue: IConstrainedTaskEntry[] = []; private drainTimer: ReturnType | null = null; constructor(options: ITaskManagerConstructorOptions = {}) { this.options = Object.assign(this.options, options); } public getTaskByName(taskName: string): Task { return this.taskMap.findSync((task) => task.name === taskName); } public addTask(task: Task): void { if (!task.name) { throw new Error('Task must have a name to be added to taskManager'); } this.taskMap.add(task); const subscription = task.eventSubject.subscribe((event) => { this.taskSubject.next(event); }); this.taskSubscriptions.set(task, subscription); } public removeTask(task: Task): void { this.taskMap.remove(task); const subscription = this.taskSubscriptions.get(task); if (subscription) { subscription.unsubscribe(); this.taskSubscriptions.delete(task); } } public addAndScheduleTask(task: Task, cronString: string) { this.addTask(task); this.scheduleTaskByName(task.name, cronString); } // Constraint group management public addConstraintGroup(group: TaskConstraintGroup): void { this.constraintGroups.push(group); } public removeConstraintGroup(name: string): void { this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name); } // Core constraint evaluation public async triggerTaskConstrained(task: Task, input?: any): Promise { // Gather applicable constraints const applicableGroups: Array<{ group: TaskConstraintGroup; key: string }> = []; for (const group of this.constraintGroups) { const key = group.getConstraintKey(task, input); if (key !== null) { applicableGroups.push({ group, key }); } } // No constraints apply → check shouldExecute then trigger directly if (applicableGroups.length === 0) { const shouldRun = await this.checkAllShouldExecute(task, input); if (!shouldRun) { return undefined; } return task.trigger(input); } // Check if all constraints allow running const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key)); if (allCanRun) { return this.executeWithConstraintTracking(task, input, applicableGroups); } // Blocked → enqueue with deferred promise and cached constraint keys const deferred = plugins.smartpromise.defer(); const constraintKeys = new Map(); for (const { group, key } of applicableGroups) { constraintKeys.set(group.name, key); } this.constraintQueue.push({ task, input, deferred, constraintKeys }); return deferred.promise; } private async checkAllShouldExecute(task: Task, input?: any): Promise { for (const group of this.constraintGroups) { const shouldRun = await group.checkShouldExecute(task, input); if (!shouldRun) { return false; } } return true; } private async executeWithConstraintTracking( task: Task, input: any, groups: Array<{ group: TaskConstraintGroup; key: string }>, ): Promise { // Acquire slots synchronously to prevent race conditions for (const { group, key } of groups) { group.acquireSlot(key); } // Check shouldExecute after acquiring slots const shouldRun = await this.checkAllShouldExecute(task, input); if (!shouldRun) { // Release slots and drain queue for (const { group, key } of groups) { group.releaseSlot(key); } this.drainConstraintQueue(); return undefined; } try { return await task.trigger(input); } finally { // Release slots for (const { group, key } of groups) { group.releaseSlot(key); } this.drainConstraintQueue(); } } private drainConstraintQueue(): void { let shortestCooldown = Infinity; const stillQueued: IConstrainedTaskEntry[] = []; for (const entry of this.constraintQueue) { const applicableGroups: Array<{ group: TaskConstraintGroup; key: string }> = []; for (const group of this.constraintGroups) { const key = group.getConstraintKey(entry.task, entry.input); if (key !== null) { applicableGroups.push({ group, key }); } } // No constraints apply anymore (group removed?) → check shouldExecute then run if (applicableGroups.length === 0) { this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => { if (!shouldRun) { entry.deferred.resolve(undefined); return; } entry.task.trigger(entry.input).then( (result) => entry.deferred.resolve(result), (err) => entry.deferred.reject(err), ); }); continue; } const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key)); if (allCanRun) { // executeWithConstraintTracking handles shouldExecute check internally this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then( (result) => entry.deferred.resolve(result), (err) => entry.deferred.reject(err), ); } else { stillQueued.push(entry); // Track shortest cooldown for timer scheduling for (const { group, key } of applicableGroups) { const remaining = group.getCooldownRemaining(key); if (remaining > 0 && remaining < shortestCooldown) { shortestCooldown = remaining; } } } } this.constraintQueue = stillQueued; // Schedule next drain if there are cooldown-blocked entries if (this.drainTimer) { clearTimeout(this.drainTimer); this.drainTimer = null; } if (stillQueued.length > 0 && shortestCooldown < Infinity) { this.drainTimer = setTimeout(() => { this.drainTimer = null; this.drainConstraintQueue(); }, shortestCooldown + 1); } } public async triggerTaskByName(taskName: string): Promise { const taskToTrigger = this.getTaskByName(taskName); if (!taskToTrigger) { throw new Error(`No task with the name ${taskName} found.`); } return this.triggerTaskConstrained(taskToTrigger); } public async triggerTask(task: Task) { return this.triggerTaskConstrained(task); } public scheduleTaskByName(taskName: string, cronString: string) { const taskToSchedule = this.getTaskByName(taskName); if (!taskToSchedule) { throw new Error(`No task with the name ${taskName} found.`); } this.handleTaskScheduling(taskToSchedule, cronString); } private handleTaskScheduling(task: Task, cronString: string) { const cronJob = this.cronJobManager.addCronjob( cronString, async (triggerTime: number) => { this.logTaskState(task); if (this.options.distributedCoordinator) { const announcementResult = await this.performDistributedConsultation( task, triggerTime, ); if (!announcementResult.shouldTrigger) { logger.log('info', 'Distributed coordinator result: NOT EXECUTING'); return; } else { logger.log('info', 'Distributed coordinator result: CHOSEN AND EXECUTING'); } } try { await this.triggerTaskConstrained(task); } catch (err) { logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`); } }, ); task.cronJob = cronJob; } private logTaskState(task: Task) { logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`); const bufferState = task.buffered ? `buffered with max ${task.bufferMax} buffered calls` : `unbuffered`; logger.log('info', `Task >>${task.name}<< is ${bufferState}`); } private async performDistributedConsultation( task: Task, triggerTime: number, ): Promise { logger.log('info', 'Found a distributed coordinator, performing consultation.'); return this.options.distributedCoordinator.fireDistributedTaskRequest({ submitterId: this.randomId, requestResponseId: plugins.smartunique.shortId(), status: 'requesting', taskExecutionParallel: 1, taskExecutionTime: triggerTime, taskExecutionTimeout: task.timeout, taskName: task.name, taskVersion: task.version, }); } public descheduleTaskByName(taskName: string) { const task = this.getTaskByName(taskName); if (task && task.cronJob) { this.cronJobManager.removeCronjob(task.cronJob); task.cronJob = null; } if (this.cronJobManager.cronjobs.isEmpty) { this.cronJobManager.stop(); } } public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } public getScheduleForTaskName(taskName: string): string | null { const task = this.getTaskByName(taskName); return task && task.cronJob ? task.cronJob.cronExpression : null; } public async start() { if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.start(); } this.cronJobManager.start(); } public async stop() { this.cronJobManager.stop(); if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.stop(); } for (const [, subscription] of this.taskSubscriptions) { subscription.unsubscribe(); } this.taskSubscriptions.clear(); if (this.drainTimer) { clearTimeout(this.drainTimer); this.drainTimer = null; } } // Get metadata for a specific task public getTaskMetadata(taskName: string): ITaskMetadata | null { const task = this.getTaskByName(taskName); if (!task) return null; return task.getMetadata(); } // Get metadata for all tasks public getAllTasksMetadata(): ITaskMetadata[] { return this.taskMap.getArray().map(task => task.getMetadata()); } // Get scheduled tasks with their schedules and next run times public getScheduledTasks(): IScheduledTaskInfo[] { const scheduledTasks: IScheduledTaskInfo[] = []; for (const task of this.taskMap.getArray()) { if (task.cronJob) { scheduledTasks.push({ name: task.name || 'unnamed', schedule: task.cronJob.cronExpression, nextRun: new Date(task.cronJob.getNextExecutionTime()), lastRun: task.lastRun, steps: task.getStepsMetadata?.(), metadata: task.getMetadata(), }); } } return scheduledTasks; } // Get next scheduled runs across all tasks public getNextScheduledRuns(limit: number = 10): Array<{ taskName: string; nextRun: Date; schedule: string }> { const scheduledRuns = this.getScheduledTasks() .map(task => ({ taskName: task.name, nextRun: task.nextRun, schedule: task.schedule, })) .sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime()) .slice(0, limit); return scheduledRuns; } public getTasksByLabel(key: string, value: string): Task[] { return this.taskMap.getArray().filter(task => task.labels[key] === value); } public getTasksMetadataByLabel(key: string, value: string): ITaskMetadata[] { return this.getTasksByLabel(key, value).map(task => task.getMetadata()); } // Add, execute, and remove a task while collecting metadata public async addExecuteRemoveTask>( task: Task, options?: { schedule?: string; trackProgress?: boolean; } ): Promise { // Add task to manager this.addTask(task); // Optionally schedule it if (options?.schedule) { this.scheduleTaskByName(task.name!, options.schedule); } const startTime = Date.now(); try { // Execute the task through constraints const result = await this.triggerTaskConstrained(task); // Collect execution report const report: ITaskExecutionReport = { taskName: task.name || 'unnamed', startTime, endTime: Date.now(), duration: Date.now() - startTime, steps: task.getStepsMetadata(), stepsCompleted: task.getStepsMetadata() .filter(step => step.status === 'completed') .map(step => step.name), progress: task.getProgress(), result, }; // Remove task from manager this.removeTask(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } return report; } catch (error) { // Create error report const errorReport: ITaskExecutionReport = { taskName: task.name || 'unnamed', startTime, endTime: Date.now(), duration: Date.now() - startTime, steps: task.getStepsMetadata(), stepsCompleted: task.getStepsMetadata() .filter(step => step.status === 'completed') .map(step => step.name), progress: task.getProgress(), error: error as Error, }; // Remove task from manager even on error this.removeTask(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } throw errorReport; } } }