269 lines
8.1 KiB
TypeScript
269 lines
8.1 KiB
TypeScript
import * as plugins from './taskbuffer.plugins.js';
|
|
import { Task } from './taskbuffer.classes.task.js';
|
|
import {
|
|
AbstractDistributedCoordinator,
|
|
type IDistributedTaskRequestResult,
|
|
} from './taskbuffer.classes.distributedcoordinator.js';
|
|
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo } from './taskbuffer.interfaces.js';
|
|
|
|
export interface ICronJob {
|
|
cronString: string;
|
|
taskName: string;
|
|
job: any;
|
|
}
|
|
|
|
export interface ITaskManagerConstructorOptions {
|
|
distributedCoordinator?: AbstractDistributedCoordinator;
|
|
}
|
|
|
|
export class TaskManager {
|
|
public randomId = plugins.smartunique.shortId();
|
|
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
|
|
private cronJobManager = new plugins.smarttime.CronManager();
|
|
public options: ITaskManagerConstructorOptions = {
|
|
distributedCoordinator: null,
|
|
};
|
|
|
|
constructor(options: ITaskManagerConstructorOptions = {}) {
|
|
this.options = Object.assign(this.options, options);
|
|
}
|
|
|
|
public getTaskByName(taskName: string): Task<any, any> {
|
|
return this.taskMap.findSync((task) => task.name === taskName);
|
|
}
|
|
|
|
public addTask(task: Task<any, any>): void {
|
|
if (!task.name) {
|
|
throw new Error('Task must have a name to be added to taskManager');
|
|
}
|
|
this.taskMap.add(task);
|
|
}
|
|
|
|
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
|
|
this.addTask(task);
|
|
this.scheduleTaskByName(task.name, cronString);
|
|
}
|
|
|
|
public async triggerTaskByName(taskName: string): Promise<any> {
|
|
const taskToTrigger = this.getTaskByName(taskName);
|
|
if (!taskToTrigger) {
|
|
throw new Error(`No task with the name ${taskName} found.`);
|
|
}
|
|
return taskToTrigger.trigger();
|
|
}
|
|
|
|
public async triggerTask(task: Task<any, any>) {
|
|
return task.trigger();
|
|
}
|
|
|
|
public scheduleTaskByName(taskName: string, cronString: string) {
|
|
const taskToSchedule = this.getTaskByName(taskName);
|
|
if (!taskToSchedule) {
|
|
throw new Error(`No task with the name ${taskName} found.`);
|
|
}
|
|
this.handleTaskScheduling(taskToSchedule, cronString);
|
|
}
|
|
|
|
private handleTaskScheduling(task: Task<any, any>, cronString: string) {
|
|
const cronJob = this.cronJobManager.addCronjob(
|
|
cronString,
|
|
async (triggerTime: number) => {
|
|
this.logTaskState(task);
|
|
if (this.options.distributedCoordinator) {
|
|
const announcementResult = await this.performDistributedConsultation(
|
|
task,
|
|
triggerTime,
|
|
);
|
|
if (!announcementResult.shouldTrigger) {
|
|
console.log('Distributed coordinator result: NOT EXECUTING');
|
|
return;
|
|
} else {
|
|
console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
|
|
}
|
|
}
|
|
await task.trigger();
|
|
},
|
|
);
|
|
task.cronJob = cronJob;
|
|
}
|
|
|
|
private logTaskState(task: Task<any, any>) {
|
|
console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
|
|
const bufferState = task.buffered
|
|
? `buffered with max ${task.bufferMax} buffered calls`
|
|
: `unbuffered`;
|
|
console.log(`Task >>${task.name}<< is ${bufferState}`);
|
|
}
|
|
|
|
private async performDistributedConsultation(
|
|
task: Task<any, any>,
|
|
triggerTime: number,
|
|
): Promise<IDistributedTaskRequestResult> {
|
|
console.log('Found a distributed coordinator, performing consultation.');
|
|
|
|
return this.options.distributedCoordinator.fireDistributedTaskRequest({
|
|
submitterId: this.randomId,
|
|
requestResponseId: plugins.smartunique.shortId(),
|
|
status: 'requesting',
|
|
taskExecutionParallel: 1,
|
|
taskExecutionTime: triggerTime,
|
|
taskExecutionTimeout: task.timeout,
|
|
taskName: task.name,
|
|
taskVersion: task.version,
|
|
});
|
|
}
|
|
|
|
public descheduleTaskByName(taskName: string) {
|
|
const task = this.getTaskByName(taskName);
|
|
if (task && task.cronJob) {
|
|
this.cronJobManager.removeCronjob(task.cronJob);
|
|
task.cronJob = null;
|
|
}
|
|
if (this.cronJobManager.cronjobs.isEmpty) {
|
|
this.cronJobManager.stop();
|
|
}
|
|
}
|
|
|
|
public async descheduleTask(task: Task<any, any>) {
|
|
await this.descheduleTaskByName(task.name);
|
|
}
|
|
|
|
public getScheduleForTaskName(taskName: string): string | null {
|
|
const task = this.getTaskByName(taskName);
|
|
return task && task.cronJob ? task.cronJob.cronExpression : null;
|
|
}
|
|
|
|
public async start() {
|
|
if (this.options.distributedCoordinator) {
|
|
await this.options.distributedCoordinator.start();
|
|
}
|
|
this.cronJobManager.start();
|
|
}
|
|
|
|
public async stop() {
|
|
this.cronJobManager.stop();
|
|
if (this.options.distributedCoordinator) {
|
|
await this.options.distributedCoordinator.stop();
|
|
}
|
|
}
|
|
|
|
// Get metadata for a specific task
|
|
public getTaskMetadata(taskName: string): ITaskMetadata | null {
|
|
const task = this.getTaskByName(taskName);
|
|
if (!task) return null;
|
|
return task.getMetadata();
|
|
}
|
|
|
|
// Get metadata for all tasks
|
|
public getAllTasksMetadata(): ITaskMetadata[] {
|
|
return this.taskMap.getArray().map(task => task.getMetadata());
|
|
}
|
|
|
|
// Get scheduled tasks with their schedules and next run times
|
|
public getScheduledTasks(): IScheduledTaskInfo[] {
|
|
const scheduledTasks: IScheduledTaskInfo[] = [];
|
|
|
|
for (const task of this.taskMap.getArray()) {
|
|
if (task.cronJob) {
|
|
scheduledTasks.push({
|
|
name: task.name || 'unnamed',
|
|
schedule: task.cronJob.cronExpression,
|
|
nextRun: new Date(task.cronJob.getNextExecutionTime()),
|
|
lastRun: task.lastRun,
|
|
steps: task.getStepsMetadata?.(),
|
|
metadata: task.getMetadata(),
|
|
});
|
|
}
|
|
}
|
|
|
|
return scheduledTasks;
|
|
}
|
|
|
|
// Get next scheduled runs across all tasks
|
|
public getNextScheduledRuns(limit: number = 10): Array<{ taskName: string; nextRun: Date; schedule: string }> {
|
|
const scheduledRuns = this.getScheduledTasks()
|
|
.map(task => ({
|
|
taskName: task.name,
|
|
nextRun: task.nextRun,
|
|
schedule: task.schedule,
|
|
}))
|
|
.sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime())
|
|
.slice(0, limit);
|
|
|
|
return scheduledRuns;
|
|
}
|
|
|
|
// Add, execute, and remove a task while collecting metadata
|
|
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
|
|
task: Task<T, TSteps>,
|
|
options?: {
|
|
schedule?: string;
|
|
trackProgress?: boolean;
|
|
}
|
|
): Promise<ITaskExecutionReport> {
|
|
// Add task to manager
|
|
this.addTask(task);
|
|
|
|
// Optionally schedule it
|
|
if (options?.schedule) {
|
|
this.scheduleTaskByName(task.name!, options.schedule);
|
|
}
|
|
|
|
const startTime = Date.now();
|
|
const progressUpdates: Array<{ stepName: string; timestamp: number }> = [];
|
|
|
|
try {
|
|
// Execute the task
|
|
const result = await task.trigger();
|
|
|
|
// Collect execution report
|
|
const report: ITaskExecutionReport = {
|
|
taskName: task.name || 'unnamed',
|
|
startTime,
|
|
endTime: Date.now(),
|
|
duration: Date.now() - startTime,
|
|
steps: task.getStepsMetadata(),
|
|
stepsCompleted: task.getStepsMetadata()
|
|
.filter(step => step.status === 'completed')
|
|
.map(step => step.name),
|
|
progress: task.getProgress(),
|
|
result,
|
|
};
|
|
|
|
// Remove task from manager
|
|
this.taskMap.remove(task);
|
|
|
|
// Deschedule if it was scheduled
|
|
if (options?.schedule && task.name) {
|
|
this.descheduleTaskByName(task.name);
|
|
}
|
|
|
|
return report;
|
|
} catch (error) {
|
|
// Create error report
|
|
const errorReport: ITaskExecutionReport = {
|
|
taskName: task.name || 'unnamed',
|
|
startTime,
|
|
endTime: Date.now(),
|
|
duration: Date.now() - startTime,
|
|
steps: task.getStepsMetadata(),
|
|
stepsCompleted: task.getStepsMetadata()
|
|
.filter(step => step.status === 'completed')
|
|
.map(step => step.name),
|
|
progress: task.getProgress(),
|
|
error: error as Error,
|
|
};
|
|
|
|
// Remove task from manager even on error
|
|
this.taskMap.remove(task);
|
|
|
|
// Deschedule if it was scheduled
|
|
if (options?.schedule && task.name) {
|
|
this.descheduleTaskByName(task.name);
|
|
}
|
|
|
|
throw errorReport;
|
|
}
|
|
}
|
|
}
|