168 lines
4.5 KiB
TypeScript
168 lines
4.5 KiB
TypeScript
import * as plugins from './taskbuffer.plugins.js';
|
|
import { Task } from './taskbuffer.classes.task.js';
|
|
import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js';
|
|
|
|
export interface ICronJob {
|
|
cronString: string;
|
|
taskNameArg: string;
|
|
job: any;
|
|
}
|
|
|
|
export interface ITaskManagerConstructorOptions {
|
|
distributedCoordinator?: AbstractDistributedCoordinator;
|
|
}
|
|
|
|
export class TaskManager {
|
|
public randomId = plugins.isounique.uni();
|
|
public taskMap = new plugins.lik.ObjectMap<Task>();
|
|
private cronJobManager = new plugins.smarttime.CronManager();
|
|
|
|
public options: ITaskManagerConstructorOptions = {
|
|
distributedCoordinator: null,
|
|
};
|
|
|
|
constructor(optionsArg: ITaskManagerConstructorOptions = {}) {
|
|
this.options = Object.assign(this.options, optionsArg);
|
|
}
|
|
|
|
/**
|
|
* checks if a task is already present
|
|
* @param taskNameArg
|
|
*/
|
|
public getTaskByName(taskNameArg: string): Task {
|
|
return this.taskMap.findSync((itemArg) => {
|
|
return itemArg.name === taskNameArg;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* adds a Task to the TaskManager
|
|
* @param taskArg
|
|
*/
|
|
public addTask(taskArg: Task): void {
|
|
if (!taskArg.name) {
|
|
throw new Error('taskArg needs a name to be added to taskManager');
|
|
}
|
|
this.taskMap.add(taskArg);
|
|
}
|
|
|
|
/**
|
|
* adds and schedules a task at once
|
|
* @param taskArg
|
|
* @param cronStringArg
|
|
*/
|
|
public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
|
|
this.addTask(taskArg);
|
|
this.scheduleTaskByName(taskArg.name, cronStringArg);
|
|
}
|
|
|
|
/**
|
|
* triggers a task in the TaskManagerByName
|
|
* @param taskNameArg
|
|
*/
|
|
public triggerTaskByName(taskNameArg: string): Promise<any> {
|
|
const taskToTrigger = this.getTaskByName(taskNameArg);
|
|
if (!taskToTrigger) {
|
|
throw new Error(`There is no task with the name of ${taskNameArg}`);
|
|
}
|
|
return taskToTrigger.trigger();
|
|
}
|
|
|
|
public async triggerTask(task: Task) {
|
|
return task.trigger();
|
|
}
|
|
|
|
/**
|
|
* schedules the task by name
|
|
* @param taskNameArg
|
|
*/
|
|
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) {
|
|
const taskToSchedule = this.getTaskByName(taskNameArg);
|
|
const cronJob = this.cronJobManager.addCronjob(
|
|
cronStringArg,
|
|
async (triggerTimeArg: number) => {
|
|
console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
|
|
console.log(
|
|
`task >>${taskToSchedule.name}<< is ${
|
|
taskToSchedule.buffered
|
|
? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
|
|
: `unbuffered`
|
|
}`
|
|
);
|
|
if (this.options.distributedCoordinator) {
|
|
console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
|
|
const announcementResult =
|
|
await this.options.distributedCoordinator.fireDistributedTaskRequest({
|
|
submitterRandomId: this.randomId,
|
|
status: 'requesting',
|
|
taskExecutionParallel: 1,
|
|
taskExecutionTime: triggerTimeArg,
|
|
taskExecutionTimeout: taskToSchedule.timeout,
|
|
taskName: taskToSchedule.name,
|
|
taskVersion: taskToSchedule.version,
|
|
});
|
|
|
|
if (!announcementResult.shouldTrigger) {
|
|
console.log('distributed coordinator result: NOT EXECUTING');
|
|
return;
|
|
} else {
|
|
console.log('distributed coordinator result: CHOSEN AND EXECUTING');
|
|
}
|
|
}
|
|
await taskToSchedule.trigger();
|
|
}
|
|
);
|
|
taskToSchedule.cronJob = cronJob;
|
|
}
|
|
|
|
/**
|
|
* deschedules a task by name
|
|
* @param taskNameArg
|
|
*/
|
|
public descheduleTaskByName(taskNameArg: string) {
|
|
const taskToDeSchedule = this.getTaskByName(taskNameArg);
|
|
if (taskToDeSchedule.cronJob) {
|
|
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
|
|
taskToDeSchedule.cronJob = null;
|
|
}
|
|
if (this.cronJobManager.cronjobs.isEmpty) {
|
|
this.cronJobManager.stop();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* deschedules a task
|
|
* @param task
|
|
*/
|
|
public async descheduleTask(task: Task) {
|
|
await this.descheduleTaskByName(task.name);
|
|
}
|
|
|
|
/**
|
|
* returns the schedule of a specific task
|
|
* @param taskNameArg
|
|
*/
|
|
public getScheduleForTaskName(taskNameArg: string): string | null {
|
|
const task = this.getTaskByName(taskNameArg);
|
|
if (!task || !task.cronJob) {
|
|
return null;
|
|
}
|
|
return task.cronJob.cronExpression;
|
|
}
|
|
|
|
|
|
/**
|
|
* starts the taskmanager
|
|
*/
|
|
public start() {
|
|
this.cronJobManager.start();
|
|
}
|
|
|
|
/**
|
|
* stops the taskmanager
|
|
*/
|
|
public stop() {
|
|
this.cronJobManager.stop();
|
|
}
|
|
}
|