Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
d04ed21607 | |||
bae776d4e9 | |||
fcd7ea467e | |||
e061b96056 | |||
c2ce669f0c | |||
05f91c3e35 | |||
94e327c722 | |||
57a27604a7 | |||
b077bd7a1b | |||
f2c2dab782 |
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/taskbuffer",
|
||||
"version": "3.0.13",
|
||||
"version": "3.1.2",
|
||||
"private": false,
|
||||
"description": "flexible task management. TypeScript ready!",
|
||||
"main": "dist_ts/index.js",
|
||||
@ -42,8 +42,8 @@
|
||||
"@gitzone/tsbundle": "^2.0.8",
|
||||
"@gitzone/tsrun": "^1.2.44",
|
||||
"@gitzone/tstest": "^1.0.77",
|
||||
"@push.rocks/tapbundle": "^5.0.12",
|
||||
"@types/node": "^20.4.5"
|
||||
"@push.rocks/tapbundle": "^5.0.15",
|
||||
"@types/node": "^20.4.10"
|
||||
},
|
||||
"files": [
|
||||
"ts/**/*",
|
||||
|
1071
pnpm-lock.yaml
generated
1071
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@ -12,13 +12,6 @@ tap.test('new Task() should return a new task', async () => {
|
||||
expect(testTask).toBeInstanceOf(taskbuffer.Task);
|
||||
});
|
||||
|
||||
tap.test('should be able to get the task state', async () => {
|
||||
const testTask = new taskbuffer.Task({
|
||||
taskFunction: async () => {},
|
||||
});
|
||||
expect(testTask.state).toEqual('ready');
|
||||
});
|
||||
|
||||
tap.test('should have bufferMax set to the provided value', async () => {
|
||||
const task2 = new taskbuffer.Task({
|
||||
taskFunction: async () => {},
|
||||
|
@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/taskbuffer',
|
||||
version: '3.0.13',
|
||||
version: '3.1.2',
|
||||
description: 'flexible task management. TypeScript ready!'
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ export class BufferRunner {
|
||||
// initialize by default
|
||||
public bufferCounter: number = 0;
|
||||
|
||||
constructor(taskArg: Task) {
|
||||
constructor(taskArg: Task<any>) {
|
||||
this.task = taskArg;
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ export interface ICycleObject {
|
||||
export class CycleCounter {
|
||||
public task: Task;
|
||||
public cycleObjectArray: ICycleObject[] = [];
|
||||
constructor(taskArg: Task) {
|
||||
constructor(taskArg: Task<any>) {
|
||||
this.task = taskArg;
|
||||
}
|
||||
public getPromiseForCycle(cycleCountArg: number) {
|
||||
|
@ -2,13 +2,10 @@ import { Task } from './taskbuffer.classes.task.js';
|
||||
import * as plugins from './taskbuffer.plugins.js';
|
||||
|
||||
/**
|
||||
* constains all data for the final coordinator to actually make an informed decision
|
||||
* Contains all data for the final coordinator to make an informed decision.
|
||||
*/
|
||||
export interface IDistributedTaskRequest {
|
||||
/**
|
||||
* this needs to correlate to the consultationResult
|
||||
*/
|
||||
submitterRandomId: string;
|
||||
submitterId: string;
|
||||
taskName: string;
|
||||
taskVersion: string;
|
||||
taskExecutionTime: number;
|
||||
@ -18,24 +15,19 @@ export interface IDistributedTaskRequest {
|
||||
}
|
||||
|
||||
export interface IDistributedTaskRequestResult {
|
||||
/**
|
||||
* this needs to correlate to the decisionInfoBasis
|
||||
*/
|
||||
submitterRandomId: string;
|
||||
/**
|
||||
* can be used while debugging
|
||||
*/
|
||||
submitterId: string;
|
||||
considered: boolean;
|
||||
rank: string;
|
||||
rank: number;
|
||||
reason: string;
|
||||
shouldTrigger: boolean;
|
||||
}
|
||||
|
||||
export abstract class AbstractDistributedCoordinator {
|
||||
public abstract fireDistributedTaskRequest(
|
||||
infoBasisArg: IDistributedTaskRequest
|
||||
infoBasis: IDistributedTaskRequest
|
||||
): Promise<IDistributedTaskRequestResult>;
|
||||
|
||||
public abstract updateDistributedTaskRequest(
|
||||
infoBasisArg: IDistributedTaskRequest
|
||||
infoBasis: IDistributedTaskRequest
|
||||
): Promise<void>;
|
||||
}
|
||||
|
@ -15,8 +15,9 @@ export interface ITaskSetupFunction<T = undefined> {
|
||||
export type TPreOrAfterTaskFunction = () => Task<any>;
|
||||
|
||||
export class Task<T = undefined> {
|
||||
// STATIC
|
||||
public static extractTask<T = undefined>(preOrAfterTaskArg: Task<T> | TPreOrAfterTaskFunction): Task<T> {
|
||||
public static extractTask<T = undefined>(
|
||||
preOrAfterTaskArg: Task<T> | TPreOrAfterTaskFunction
|
||||
): Task<T> {
|
||||
switch (true) {
|
||||
case !preOrAfterTaskArg:
|
||||
return null;
|
||||
@ -44,7 +45,7 @@ export class Task<T = undefined> {
|
||||
}
|
||||
};
|
||||
|
||||
public static isTaskTouched<T = undefined> (
|
||||
public static isTaskTouched<T = undefined>(
|
||||
taskArg: Task<T> | TPreOrAfterTaskFunction,
|
||||
touchedTasksArray: Task<T>[]
|
||||
): boolean {
|
||||
@ -56,7 +57,7 @@ export class Task<T = undefined> {
|
||||
}
|
||||
}
|
||||
return result;
|
||||
};
|
||||
}
|
||||
|
||||
public static runTask = async <T>(
|
||||
taskArg: Task<T> | TPreOrAfterTaskFunction,
|
||||
@ -65,6 +66,11 @@ export class Task<T = undefined> {
|
||||
const taskToRun = Task.extractTask(taskArg);
|
||||
const done = plugins.smartpromise.defer();
|
||||
|
||||
// Wait for all blocking tasks to finish
|
||||
for (const task of taskToRun.blockingTasks) {
|
||||
await task.finished;
|
||||
}
|
||||
|
||||
if (!taskToRun.setupValue && taskToRun.taskSetup) {
|
||||
taskToRun.setupValue = await taskToRun.taskSetup();
|
||||
}
|
||||
@ -77,6 +83,14 @@ export class Task<T = undefined> {
|
||||
|
||||
done.promise.then(async () => {
|
||||
taskToRun.running = false;
|
||||
|
||||
// When the task has finished running, resolve the finished promise
|
||||
taskToRun.resolveFinished();
|
||||
|
||||
// Create a new finished promise for the next run
|
||||
taskToRun.finished = new Promise((resolve) => {
|
||||
taskToRun.resolveFinished = resolve;
|
||||
});
|
||||
});
|
||||
|
||||
const options = {
|
||||
@ -125,7 +139,6 @@ export class Task<T = undefined> {
|
||||
return await done.promise;
|
||||
};
|
||||
|
||||
// INSTANCE
|
||||
public name: string;
|
||||
public version: string;
|
||||
public taskFunction: ITaskFunction<T>;
|
||||
@ -139,12 +152,20 @@ export class Task<T = undefined> {
|
||||
public preTask: Task<T> | TPreOrAfterTaskFunction;
|
||||
public afterTask: Task<T> | TPreOrAfterTaskFunction;
|
||||
|
||||
// Add a list to store the blocking tasks
|
||||
public blockingTasks: Task[] = [];
|
||||
|
||||
// Add a promise that will resolve when the task has finished
|
||||
private finished: Promise<void>;
|
||||
private resolveFinished: () => void;
|
||||
|
||||
public running: boolean = false;
|
||||
public bufferRunner = new BufferRunner(this);
|
||||
public cycleCounter = new CycleCounter(this);
|
||||
|
||||
public idle: boolean = true;
|
||||
private _state: string = 'ready';
|
||||
public get idle() {
|
||||
return !this.running;
|
||||
}
|
||||
|
||||
public taskSetup: ITaskSetupFunction<T>;
|
||||
public setupValue: T;
|
||||
@ -162,12 +183,16 @@ export class Task<T = undefined> {
|
||||
this.taskFunction = optionsArg.taskFunction;
|
||||
this.preTask = optionsArg.preTask;
|
||||
this.afterTask = optionsArg.afterTask;
|
||||
this.idle = !this.running;
|
||||
this.buffered = optionsArg.buffered;
|
||||
this.bufferMax = optionsArg.bufferMax;
|
||||
this.execDelay = optionsArg.execDelay;
|
||||
this.name = optionsArg.name;
|
||||
this.taskSetup = optionsArg.taskSetup;
|
||||
|
||||
// Create the finished promise
|
||||
this.finished = new Promise((resolve) => {
|
||||
this.resolveFinished = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
public trigger(x?: any): Promise<any> {
|
||||
@ -185,16 +210,4 @@ export class Task<T = undefined> {
|
||||
public triggerBuffered(x?: any): Promise<any> {
|
||||
return this.bufferRunner.trigger(x);
|
||||
}
|
||||
|
||||
get state(): string {
|
||||
return this._state;
|
||||
}
|
||||
|
||||
set state(stateArg: string) {
|
||||
if (stateArg === 'locked') {
|
||||
this._state = 'locked';
|
||||
} else {
|
||||
logger.log('error', `state type ${stateArg} could not be set`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
import * as plugins from './taskbuffer.plugins.js';
|
||||
import { Task } from './taskbuffer.classes.task.js';
|
||||
import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js';
|
||||
import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js';
|
||||
|
||||
export interface ICronJob {
|
||||
cronString: string;
|
||||
taskNameArg: string;
|
||||
taskName: string;
|
||||
job: any;
|
||||
}
|
||||
|
||||
@ -16,54 +16,34 @@ export class TaskManager {
|
||||
public randomId = plugins.isounique.uni();
|
||||
public taskMap = new plugins.lik.ObjectMap<Task>();
|
||||
private cronJobManager = new plugins.smarttime.CronManager();
|
||||
|
||||
public options: ITaskManagerConstructorOptions = {
|
||||
distributedCoordinator: null,
|
||||
};
|
||||
|
||||
constructor(optionosArg: ITaskManagerConstructorOptions = {}) {
|
||||
this.options = Object.assign(this.options, optionosArg);
|
||||
constructor(options: ITaskManagerConstructorOptions = {}) {
|
||||
this.options = Object.assign(this.options, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* checks if a task is already present
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public getTaskByName(taskNameArg: string): Task {
|
||||
return this.taskMap.findSync((itemArg) => {
|
||||
return itemArg.name === taskNameArg;
|
||||
});
|
||||
public getTaskByName(taskName: string): Task {
|
||||
return this.taskMap.findSync((task) => task.name === taskName);
|
||||
}
|
||||
|
||||
/**
|
||||
* adds a Task to the TaskManager
|
||||
* @param taskArg
|
||||
*/
|
||||
public addTask(taskArg: Task): void {
|
||||
if (!taskArg.name) {
|
||||
throw new Error('taskArg needs a name to be added to taskManager');
|
||||
public addTask(task: Task): void {
|
||||
if (!task.name) {
|
||||
throw new Error('Task must have a name to be added to taskManager');
|
||||
}
|
||||
this.taskMap.add(taskArg);
|
||||
this.taskMap.add(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* adds and schedules a task at once
|
||||
* @param taskArg
|
||||
* @param cronStringArg
|
||||
*/
|
||||
public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
|
||||
this.addTask(taskArg);
|
||||
this.scheduleTaskByName(taskArg.name, cronStringArg);
|
||||
public addAndScheduleTask(task: Task, cronString: string) {
|
||||
this.addTask(task);
|
||||
this.scheduleTaskByName(task.name, cronString);
|
||||
}
|
||||
|
||||
/**
|
||||
* triggers a task in the TaskManagerByName
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public triggerTaskByName(taskNameArg: string): Promise<any> {
|
||||
const taskToTrigger = this.getTaskByName(taskNameArg);
|
||||
public async triggerTaskByName(taskName: string): Promise<any> {
|
||||
const taskToTrigger = this.getTaskByName(taskName);
|
||||
if (!taskToTrigger) {
|
||||
throw new Error(`There is no task with the name of ${taskNameArg}`);
|
||||
throw new Error(`No task with the name ${taskName} found.`);
|
||||
}
|
||||
return taskToTrigger.trigger();
|
||||
}
|
||||
@ -72,87 +52,79 @@ export class TaskManager {
|
||||
return task.trigger();
|
||||
}
|
||||
|
||||
/**
|
||||
* schedules the task by name
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) {
|
||||
const taskToSchedule = this.getTaskByName(taskNameArg);
|
||||
const cronJob = this.cronJobManager.addCronjob(
|
||||
cronStringArg,
|
||||
async (triggerTimeArg: number) => {
|
||||
console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
|
||||
console.log(
|
||||
`task >>${taskToSchedule.name}<< is ${
|
||||
taskToSchedule.buffered
|
||||
? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
|
||||
: `unbuffered`
|
||||
}`
|
||||
);
|
||||
if (this.options.distributedCoordinator) {
|
||||
console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
|
||||
const announcementResult =
|
||||
await this.options.distributedCoordinator.fireDistributedTaskRequest({
|
||||
submitterRandomId: this.randomId,
|
||||
status: 'requesting',
|
||||
taskExecutionParallel: 1,
|
||||
taskExecutionTime: triggerTimeArg,
|
||||
taskExecutionTimeout: taskToSchedule.timeout,
|
||||
taskName: taskToSchedule.name,
|
||||
taskVersion: taskToSchedule.version,
|
||||
});
|
||||
|
||||
if (!announcementResult.shouldTrigger) {
|
||||
console.log('distributed coordinator result: NOT EXECUTING');
|
||||
return;
|
||||
} else {
|
||||
console.log('distributed coordinator result: CHOSEN AND EXECUTING');
|
||||
}
|
||||
}
|
||||
await taskToSchedule.trigger();
|
||||
}
|
||||
);
|
||||
taskToSchedule.cronJob = cronJob;
|
||||
public scheduleTaskByName(taskName: string, cronString: string) {
|
||||
const taskToSchedule = this.getTaskByName(taskName);
|
||||
if (!taskToSchedule) {
|
||||
throw new Error(`No task with the name ${taskName} found.`);
|
||||
}
|
||||
this.handleTaskScheduling(taskToSchedule, cronString);
|
||||
}
|
||||
|
||||
/**
|
||||
* deschedules a task by name
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public descheduleTaskByName(taskNameArg: string) {
|
||||
const taskToDeSchedule = this.getTaskByName(taskNameArg);
|
||||
if (taskToDeSchedule.cronJob) {
|
||||
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
|
||||
taskToDeSchedule.cronJob = null;
|
||||
private handleTaskScheduling(task: Task, cronString: string) {
|
||||
const cronJob = this.cronJobManager.addCronjob(
|
||||
cronString,
|
||||
async (triggerTime: number) => {
|
||||
this.logTaskState(task);
|
||||
if (this.options.distributedCoordinator) {
|
||||
const announcementResult = await this.performDistributedConsultation(task, triggerTime);
|
||||
if (!announcementResult.shouldTrigger) {
|
||||
console.log('Distributed coordinator result: NOT EXECUTING');
|
||||
return;
|
||||
} else {
|
||||
console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
|
||||
}
|
||||
}
|
||||
await task.trigger();
|
||||
}
|
||||
);
|
||||
task.cronJob = cronJob;
|
||||
}
|
||||
|
||||
private logTaskState(task: Task) {
|
||||
console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
|
||||
const bufferState = task.buffered
|
||||
? `buffered with max ${task.bufferMax} buffered calls`
|
||||
: `unbuffered`;
|
||||
console.log(`Task >>${task.name}<< is ${bufferState}`);
|
||||
}
|
||||
|
||||
private async performDistributedConsultation(task: Task, triggerTime: number): Promise<IDistributedTaskRequestResult> {
|
||||
console.log('Found a distributed coordinator, performing consultation.');
|
||||
return this.options.distributedCoordinator.fireDistributedTaskRequest({
|
||||
submitterId: this.randomId,
|
||||
status: 'requesting',
|
||||
taskExecutionParallel: 1,
|
||||
taskExecutionTime: triggerTime,
|
||||
taskExecutionTimeout: task.timeout,
|
||||
taskName: task.name,
|
||||
taskVersion: task.version,
|
||||
});
|
||||
}
|
||||
|
||||
public descheduleTaskByName(taskName: string) {
|
||||
const task = this.getTaskByName(taskName);
|
||||
if (task && task.cronJob) {
|
||||
this.cronJobManager.removeCronjob(task.cronJob);
|
||||
task.cronJob = null;
|
||||
}
|
||||
if (this.cronJobManager.cronjobs.isEmpty) {
|
||||
this.cronJobManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* deschedules a task
|
||||
* @param task
|
||||
*/
|
||||
public async descheduleTask(task: Task) {
|
||||
await this.descheduleTaskByName(task.name);
|
||||
}
|
||||
/**
|
||||
* returns all schedules of a specific task
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public getSchedulesForTaskName(taskNameArg: string) {}
|
||||
|
||||
/**
|
||||
* starts the taskmanager
|
||||
*/
|
||||
public getScheduleForTaskName(taskName: string): string | null {
|
||||
const task = this.getTaskByName(taskName);
|
||||
return task && task.cronJob ? task.cronJob.cronExpression : null;
|
||||
}
|
||||
|
||||
public start() {
|
||||
this.cronJobManager.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* stops the taskmanager
|
||||
*/
|
||||
public stop() {
|
||||
this.cronJobManager.stop();
|
||||
}
|
||||
|
Reference in New Issue
Block a user