156 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			156 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| import * as plugins from './taskbuffer.plugins.js';
 | |
| import { Task } from './taskbuffer.classes.task.js';
 | |
| import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js';
 | |
| 
 | |
| export interface ICronJob {
 | |
|   cronString: string;
 | |
|   taskNameArg: string;
 | |
|   job: any;
 | |
| }
 | |
| 
 | |
| export interface ITaskManagerConstructorOptions {
 | |
|   distributedCoordinator?: AbstractDistributedCoordinator
 | |
| }
 | |
| 
 | |
| export class TaskManager {
 | |
|   public randomId = plugins.isounique.uni();
 | |
|   public taskMap = new plugins.lik.ObjectMap<Task>();
 | |
|   private cronJobManager = new plugins.smarttime.CronManager();
 | |
| 
 | |
|   public options: ITaskManagerConstructorOptions = {
 | |
|     distributedCoordinator: null
 | |
|   };
 | |
| 
 | |
|   constructor(optionosArg: ITaskManagerConstructorOptions = {}) {
 | |
|     this.options = Object.assign(this.options, optionosArg);
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * checks if a task is already present
 | |
|    * @param taskNameArg
 | |
|    */
 | |
|   public getTaskByName(taskNameArg: string): Task {
 | |
|     return this.taskMap.findSync((itemArg) => {
 | |
|       return itemArg.name === taskNameArg;
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * adds a Task to the TaskManager
 | |
|    * @param taskArg
 | |
|    */
 | |
|   public addTask(taskArg: Task): void {
 | |
|     if (!taskArg.name) {
 | |
|       throw new Error('taskArg needs a name to be added to taskManager');
 | |
|     }
 | |
|     this.taskMap.add(taskArg);
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * adds and schedules a task at once
 | |
|    * @param taskArg
 | |
|    * @param cronStringArg
 | |
|    */
 | |
|   public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
 | |
|     this.addTask(taskArg);
 | |
|     this.scheduleTaskByName(taskArg.name, cronStringArg);
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * triggers a task in the TaskManagerByName
 | |
|    * @param taskNameArg
 | |
|    */
 | |
|   public triggerTaskByName(taskNameArg: string): Promise<any> {
 | |
|     const taskToTrigger = this.getTaskByName(taskNameArg);
 | |
|     if (!taskToTrigger) {
 | |
|       throw new Error(`There is no task with the name of ${taskNameArg}`);
 | |
|     }
 | |
|     return taskToTrigger.trigger();
 | |
|   }
 | |
| 
 | |
|   public async triggerTask(task: Task) {
 | |
|     return task.trigger();
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * schedules the task by name
 | |
|    * @param taskNameArg
 | |
|    */
 | |
|   public scheduleTaskByName(taskNameArg: string, cronStringArg: string) {
 | |
|     const taskToSchedule = this.getTaskByName(taskNameArg);
 | |
|     const cronJob = this.cronJobManager.addCronjob(cronStringArg, async (triggerTimeArg: number) => {
 | |
|       console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
 | |
|       console.log(
 | |
|         `task >>${taskToSchedule.name}<< is ${
 | |
|           taskToSchedule.buffered
 | |
|             ? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
 | |
|             : `unbuffered`
 | |
|         }`
 | |
|       );
 | |
|       if (this.options.distributedCoordinator) {
 | |
|         console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
 | |
|         const announcementResult = await this.options.distributedCoordinator.announceDistributedDecisionInfoBasis({
 | |
|           submitterRandomId: this.randomId,
 | |
|           status: 'requesting',
 | |
|           taskExecutionParallel: 1,
 | |
|           taskExecutionTime: triggerTimeArg,
 | |
|           taskExecutionTimeout: taskToSchedule.timeout,
 | |
|           taskName: taskToSchedule.name,
 | |
|           taskVersion: taskToSchedule.version,
 | |
|         });
 | |
| 
 | |
|         if (!announcementResult.shouldTrigger) {
 | |
|           console.log('distributed coordinator result: NOT EXECUTING')
 | |
|           return;
 | |
|         } else {
 | |
|           console.log('distributed coordinator result: CHOSEN AND EXECUTING')
 | |
|         }
 | |
|       }
 | |
|       await taskToSchedule.trigger();
 | |
|     });
 | |
|     taskToSchedule.cronJob = cronJob;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * deschedules a task by name
 | |
|    * @param taskNameArg
 | |
|    */
 | |
|   public descheduleTaskByName(taskNameArg: string) {
 | |
|     const taskToDeSchedule = this.getTaskByName(taskNameArg);
 | |
|     if (taskToDeSchedule.cronJob) {
 | |
|       this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
 | |
|       taskToDeSchedule.cronJob = null;
 | |
|     }
 | |
|     if (this.cronJobManager.cronjobs.isEmpty) {
 | |
|       this.cronJobManager.stop();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * deschedules a task
 | |
|    * @param task
 | |
|    */
 | |
|   public async descheduleTask(task: Task) {
 | |
|     await this.descheduleTaskByName(task.name);
 | |
|   }
 | |
|   /**
 | |
|    * returns all schedules of a specific task
 | |
|    * @param taskNameArg
 | |
|    */
 | |
|   public getSchedulesForTaskName(taskNameArg: string) {}
 | |
| 
 | |
|   /**
 | |
|    * starts the taskmanager
 | |
|    */
 | |
|   public start() {
 | |
|     this.cronJobManager.start();
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * stops the taskmanager
 | |
|    */
 | |
|   public stop() {
 | |
|     this.cronJobManager.stop();
 | |
|   }
 | |
| }
 |