Compare commits

..

10 Commits

Author SHA1 Message Date
aa632a5294 3.1.3 2023-08-13 09:37:24 +02:00
6499dd45cf fix(core): update 2023-08-13 09:37:23 +02:00
d04ed21607 3.1.2 2023-08-12 12:24:11 +02:00
bae776d4e9 fix(core): update 2023-08-12 12:24:10 +02:00
fcd7ea467e 3.1.1 2023-08-12 12:10:38 +02:00
e061b96056 fix(core): update 2023-08-12 12:10:37 +02:00
c2ce669f0c 3.1.0 2023-08-04 13:03:29 +02:00
05f91c3e35 feat(Task): Tasks can now be blocked by other tasks. 2023-08-04 13:03:28 +02:00
94e327c722 3.0.15 2023-08-04 11:58:54 +02:00
57a27604a7 fix(core): update 2023-08-04 11:58:53 +02:00
7 changed files with 1099 additions and 249 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/taskbuffer", "name": "@push.rocks/taskbuffer",
"version": "3.0.14", "version": "3.1.3",
"private": false, "private": false,
"description": "flexible task management. TypeScript ready!", "description": "flexible task management. TypeScript ready!",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -42,8 +42,8 @@
"@gitzone/tsbundle": "^2.0.8", "@gitzone/tsbundle": "^2.0.8",
"@gitzone/tsrun": "^1.2.44", "@gitzone/tsrun": "^1.2.44",
"@gitzone/tstest": "^1.0.77", "@gitzone/tstest": "^1.0.77",
"@push.rocks/tapbundle": "^5.0.12", "@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.4.5" "@types/node": "^20.4.10"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

1071
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -12,13 +12,6 @@ tap.test('new Task() should return a new task', async () => {
expect(testTask).toBeInstanceOf(taskbuffer.Task); expect(testTask).toBeInstanceOf(taskbuffer.Task);
}); });
tap.test('should be able to get the task state', async () => {
const testTask = new taskbuffer.Task({
taskFunction: async () => {},
});
expect(testTask.state).toEqual('ready');
});
tap.test('should have bufferMax set to the provided value', async () => { tap.test('should have bufferMax set to the provided value', async () => {
const task2 = new taskbuffer.Task({ const task2 = new taskbuffer.Task({
taskFunction: async () => {}, taskFunction: async () => {},

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/taskbuffer', name: '@push.rocks/taskbuffer',
version: '3.0.14', version: '3.1.3',
description: 'flexible task management. TypeScript ready!' description: 'flexible task management. TypeScript ready!'
} }

View File

@ -2,13 +2,10 @@ import { Task } from './taskbuffer.classes.task.js';
import * as plugins from './taskbuffer.plugins.js'; import * as plugins from './taskbuffer.plugins.js';
/** /**
* constains all data for the final coordinator to actually make an informed decision * Contains all data for the final coordinator to make an informed decision.
*/ */
export interface IDistributedTaskRequest { export interface IDistributedTaskRequest {
/** submitterId: string;
* this needs to correlate to the consultationResult
*/
submitterRandomId: string;
taskName: string; taskName: string;
taskVersion: string; taskVersion: string;
taskExecutionTime: number; taskExecutionTime: number;
@ -18,24 +15,22 @@ export interface IDistributedTaskRequest {
} }
export interface IDistributedTaskRequestResult { export interface IDistributedTaskRequestResult {
/** submitterId: string;
* this needs to correlate to the decisionInfoBasis
*/
submitterRandomId: string;
/**
* can be used while debugging
*/
considered: boolean; considered: boolean;
rank: string; rank: number;
reason: string; reason: string;
shouldTrigger: boolean; shouldTrigger: boolean;
} }
export abstract class AbstractDistributedCoordinator { export abstract class AbstractDistributedCoordinator {
public abstract fireDistributedTaskRequest( public abstract fireDistributedTaskRequest(
infoBasisArg: IDistributedTaskRequest infoBasis: IDistributedTaskRequest
): Promise<IDistributedTaskRequestResult>; ): Promise<IDistributedTaskRequestResult>;
public abstract updateDistributedTaskRequest( public abstract updateDistributedTaskRequest(
infoBasisArg: IDistributedTaskRequest infoBasis: IDistributedTaskRequest
): Promise<void>; ): Promise<void>;
public abstract start(): Promise<void>;
public abstract stop(): Promise<void>;
} }

View File

@ -15,8 +15,9 @@ export interface ITaskSetupFunction<T = undefined> {
export type TPreOrAfterTaskFunction = () => Task<any>; export type TPreOrAfterTaskFunction = () => Task<any>;
export class Task<T = undefined> { export class Task<T = undefined> {
// STATIC public static extractTask<T = undefined>(
public static extractTask<T = undefined>(preOrAfterTaskArg: Task<T> | TPreOrAfterTaskFunction): Task<T> { preOrAfterTaskArg: Task<T> | TPreOrAfterTaskFunction
): Task<T> {
switch (true) { switch (true) {
case !preOrAfterTaskArg: case !preOrAfterTaskArg:
return null; return null;
@ -44,7 +45,7 @@ export class Task<T = undefined> {
} }
}; };
public static isTaskTouched<T = undefined> ( public static isTaskTouched<T = undefined>(
taskArg: Task<T> | TPreOrAfterTaskFunction, taskArg: Task<T> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T>[] touchedTasksArray: Task<T>[]
): boolean { ): boolean {
@ -56,7 +57,7 @@ export class Task<T = undefined> {
} }
} }
return result; return result;
}; }
public static runTask = async <T>( public static runTask = async <T>(
taskArg: Task<T> | TPreOrAfterTaskFunction, taskArg: Task<T> | TPreOrAfterTaskFunction,
@ -65,6 +66,11 @@ export class Task<T = undefined> {
const taskToRun = Task.extractTask(taskArg); const taskToRun = Task.extractTask(taskArg);
const done = plugins.smartpromise.defer(); const done = plugins.smartpromise.defer();
// Wait for all blocking tasks to finish
for (const task of taskToRun.blockingTasks) {
await task.finished;
}
if (!taskToRun.setupValue && taskToRun.taskSetup) { if (!taskToRun.setupValue && taskToRun.taskSetup) {
taskToRun.setupValue = await taskToRun.taskSetup(); taskToRun.setupValue = await taskToRun.taskSetup();
} }
@ -77,6 +83,14 @@ export class Task<T = undefined> {
done.promise.then(async () => { done.promise.then(async () => {
taskToRun.running = false; taskToRun.running = false;
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
}); });
const options = { const options = {
@ -125,7 +139,6 @@ export class Task<T = undefined> {
return await done.promise; return await done.promise;
}; };
// INSTANCE
public name: string; public name: string;
public version: string; public version: string;
public taskFunction: ITaskFunction<T>; public taskFunction: ITaskFunction<T>;
@ -139,12 +152,20 @@ export class Task<T = undefined> {
public preTask: Task<T> | TPreOrAfterTaskFunction; public preTask: Task<T> | TPreOrAfterTaskFunction;
public afterTask: Task<T> | TPreOrAfterTaskFunction; public afterTask: Task<T> | TPreOrAfterTaskFunction;
// Add a list to store the blocking tasks
public blockingTasks: Task[] = [];
// Add a promise that will resolve when the task has finished
private finished: Promise<void>;
private resolveFinished: () => void;
public running: boolean = false; public running: boolean = false;
public bufferRunner = new BufferRunner(this); public bufferRunner = new BufferRunner(this);
public cycleCounter = new CycleCounter(this); public cycleCounter = new CycleCounter(this);
public idle: boolean = true; public get idle() {
private _state: string = 'ready'; return !this.running;
}
public taskSetup: ITaskSetupFunction<T>; public taskSetup: ITaskSetupFunction<T>;
public setupValue: T; public setupValue: T;
@ -162,12 +183,16 @@ export class Task<T = undefined> {
this.taskFunction = optionsArg.taskFunction; this.taskFunction = optionsArg.taskFunction;
this.preTask = optionsArg.preTask; this.preTask = optionsArg.preTask;
this.afterTask = optionsArg.afterTask; this.afterTask = optionsArg.afterTask;
this.idle = !this.running;
this.buffered = optionsArg.buffered; this.buffered = optionsArg.buffered;
this.bufferMax = optionsArg.bufferMax; this.bufferMax = optionsArg.bufferMax;
this.execDelay = optionsArg.execDelay; this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name; this.name = optionsArg.name;
this.taskSetup = optionsArg.taskSetup; this.taskSetup = optionsArg.taskSetup;
// Create the finished promise
this.finished = new Promise((resolve) => {
this.resolveFinished = resolve;
});
} }
public trigger(x?: any): Promise<any> { public trigger(x?: any): Promise<any> {
@ -185,16 +210,4 @@ export class Task<T = undefined> {
public triggerBuffered(x?: any): Promise<any> { public triggerBuffered(x?: any): Promise<any> {
return this.bufferRunner.trigger(x); return this.bufferRunner.trigger(x);
} }
get state(): string {
return this._state;
}
set state(stateArg: string) {
if (stateArg === 'locked') {
this._state = 'locked';
} else {
logger.log('error', `state type ${stateArg} could not be set`);
}
}
} }

View File

@ -1,10 +1,10 @@
import * as plugins from './taskbuffer.plugins.js'; import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js'; import { Task } from './taskbuffer.classes.task.js';
import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js';
export interface ICronJob { export interface ICronJob {
cronString: string; cronString: string;
taskNameArg: string; taskName: string;
job: any; job: any;
} }
@ -16,54 +16,34 @@ export class TaskManager {
public randomId = plugins.isounique.uni(); public randomId = plugins.isounique.uni();
public taskMap = new plugins.lik.ObjectMap<Task>(); public taskMap = new plugins.lik.ObjectMap<Task>();
private cronJobManager = new plugins.smarttime.CronManager(); private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = { public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null, distributedCoordinator: null,
}; };
constructor(optionosArg: ITaskManagerConstructorOptions = {}) { constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, optionosArg); this.options = Object.assign(this.options, options);
} }
/** public getTaskByName(taskName: string): Task {
* checks if a task is already present return this.taskMap.findSync((task) => task.name === taskName);
* @param taskNameArg
*/
public getTaskByName(taskNameArg: string): Task {
return this.taskMap.findSync((itemArg) => {
return itemArg.name === taskNameArg;
});
} }
/** public addTask(task: Task): void {
* adds a Task to the TaskManager if (!task.name) {
* @param taskArg throw new Error('Task must have a name to be added to taskManager');
*/
public addTask(taskArg: Task): void {
if (!taskArg.name) {
throw new Error('taskArg needs a name to be added to taskManager');
} }
this.taskMap.add(taskArg); this.taskMap.add(task);
} }
/** public addAndScheduleTask(task: Task, cronString: string) {
* adds and schedules a task at once this.addTask(task);
* @param taskArg this.scheduleTaskByName(task.name, cronString);
* @param cronStringArg
*/
public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
this.addTask(taskArg);
this.scheduleTaskByName(taskArg.name, cronStringArg);
} }
/** public async triggerTaskByName(taskName: string): Promise<any> {
* triggers a task in the TaskManagerByName const taskToTrigger = this.getTaskByName(taskName);
* @param taskNameArg
*/
public triggerTaskByName(taskNameArg: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskNameArg);
if (!taskToTrigger) { if (!taskToTrigger) {
throw new Error(`There is no task with the name of ${taskNameArg}`); throw new Error(`No task with the name ${taskName} found.`);
} }
return taskToTrigger.trigger(); return taskToTrigger.trigger();
} }
@ -72,88 +52,86 @@ export class TaskManager {
return task.trigger(); return task.trigger();
} }
/** public scheduleTaskByName(taskName: string, cronString: string) {
* schedules the task by name const taskToSchedule = this.getTaskByName(taskName);
* @param taskNameArg if (!taskToSchedule) {
*/ throw new Error(`No task with the name ${taskName} found.`);
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) { }
const taskToSchedule = this.getTaskByName(taskNameArg); this.handleTaskScheduling(taskToSchedule, cronString);
const cronJob = this.cronJobManager.addCronjob( }
cronStringArg,
async (triggerTimeArg: number) => {
console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
console.log(
`task >>${taskToSchedule.name}<< is ${
taskToSchedule.buffered
? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
: `unbuffered`
}`
);
if (this.options.distributedCoordinator) {
console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
const announcementResult =
await this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterRandomId: this.randomId,
status: 'requesting',
taskExecutionParallel: 1,
taskExecutionTime: triggerTimeArg,
taskExecutionTimeout: taskToSchedule.timeout,
taskName: taskToSchedule.name,
taskVersion: taskToSchedule.version,
});
private handleTaskScheduling(task: Task, cronString: string) {
const cronJob = this.cronJobManager.addCronjob(
cronString,
async (triggerTime: number) => {
this.logTaskState(task);
if (this.options.distributedCoordinator) {
const announcementResult = await this.performDistributedConsultation(task, triggerTime);
if (!announcementResult.shouldTrigger) { if (!announcementResult.shouldTrigger) {
console.log('distributed coordinator result: NOT EXECUTING'); console.log('Distributed coordinator result: NOT EXECUTING');
return; return;
} else { } else {
console.log('distributed coordinator result: CHOSEN AND EXECUTING'); console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
} }
} }
await taskToSchedule.trigger(); await task.trigger();
} }
); );
taskToSchedule.cronJob = cronJob; task.cronJob = cronJob;
} }
/** private logTaskState(task: Task) {
* deschedules a task by name console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
* @param taskNameArg const bufferState = task.buffered
*/ ? `buffered with max ${task.bufferMax} buffered calls`
public descheduleTaskByName(taskNameArg: string) { : `unbuffered`;
const taskToDeSchedule = this.getTaskByName(taskNameArg); console.log(`Task >>${task.name}<< is ${bufferState}`);
if (taskToDeSchedule.cronJob) { }
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
taskToDeSchedule.cronJob = null; private async performDistributedConsultation(task: Task, triggerTime: number): Promise<IDistributedTaskRequestResult> {
console.log('Found a distributed coordinator, performing consultation.');
return this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterId: this.randomId,
status: 'requesting',
taskExecutionParallel: 1,
taskExecutionTime: triggerTime,
taskExecutionTimeout: task.timeout,
taskName: task.name,
taskVersion: task.version,
});
}
public descheduleTaskByName(taskName: string) {
const task = this.getTaskByName(taskName);
if (task && task.cronJob) {
this.cronJobManager.removeCronjob(task.cronJob);
task.cronJob = null;
} }
if (this.cronJobManager.cronjobs.isEmpty) { if (this.cronJobManager.cronjobs.isEmpty) {
this.cronJobManager.stop(); this.cronJobManager.stop();
} }
} }
/**
* deschedules a task
* @param task
*/
public async descheduleTask(task: Task) { public async descheduleTask(task: Task) {
await this.descheduleTaskByName(task.name); await this.descheduleTaskByName(task.name);
} }
/**
* returns all schedules of a specific task
* @param taskNameArg
*/
public getSchedulesForTaskName(taskNameArg: string) {}
/** public getScheduleForTaskName(taskName: string): string | null {
* starts the taskmanager const task = this.getTaskByName(taskName);
*/ return task && task.cronJob ? task.cronJob.cronExpression : null;
public start() { }
public async start() {
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.start();
}
this.cronJobManager.start(); this.cronJobManager.start();
} }
/** public async stop() {
* stops the taskmanager
*/
public stop() {
this.cronJobManager.stop(); this.cronJobManager.stop();
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.stop();
}
} }
} }