Compare commits

...

24 Commits

Author SHA1 Message Date
6115bae66b 3.1.7 2023-10-20 13:05:40 +02:00
1be83af475 fix(core): update 2023-10-20 13:05:39 +02:00
bd14c41edc 3.1.6 2023-08-21 12:40:24 +02:00
d61b79282d fix(core): update 2023-08-21 12:40:24 +02:00
1cba9245fe 3.1.5 2023-08-15 12:02:45 +02:00
4b0db4da21 fix(core): update 2023-08-15 12:02:44 +02:00
a6cd0cb579 3.1.4 2023-08-15 11:39:32 +02:00
bd6109d5ea fix(core): update 2023-08-15 11:39:31 +02:00
aa632a5294 3.1.3 2023-08-13 09:37:24 +02:00
6499dd45cf fix(core): update 2023-08-13 09:37:23 +02:00
d04ed21607 3.1.2 2023-08-12 12:24:11 +02:00
bae776d4e9 fix(core): update 2023-08-12 12:24:10 +02:00
fcd7ea467e 3.1.1 2023-08-12 12:10:38 +02:00
e061b96056 fix(core): update 2023-08-12 12:10:37 +02:00
c2ce669f0c 3.1.0 2023-08-04 13:03:29 +02:00
05f91c3e35 feat(Task): Tasks can now be blocked by other tasks. 2023-08-04 13:03:28 +02:00
94e327c722 3.0.15 2023-08-04 11:58:54 +02:00
57a27604a7 fix(core): update 2023-08-04 11:58:53 +02:00
b077bd7a1b 3.0.14 2023-08-02 02:30:14 +02:00
f2c2dab782 fix(core): update 2023-08-02 02:30:13 +02:00
53a67c0ebe 3.0.13 2023-08-02 00:51:44 +02:00
5240a80cb3 fix(core): update 2023-08-02 00:51:43 +02:00
fa8be6b6d3 3.0.12 2023-08-02 00:07:22 +02:00
b5981d67cf fix(core): update 2023-08-02 00:07:21 +02:00
14 changed files with 2635 additions and 1563 deletions

View File

@ -119,6 +119,6 @@ jobs:
run: | run: |
npmci node install stable npmci node install stable
npmci npm install npmci npm install
pnpm install -g @gitzone/tsdoc pnpm install -g @git.zone/tsdoc
npmci command tsdoc npmci command tsdoc
continue-on-error: true continue-on-error: true

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/taskbuffer", "name": "@push.rocks/taskbuffer",
"version": "3.0.11", "version": "3.1.7",
"private": false, "private": false,
"description": "flexible task management. TypeScript ready!", "description": "flexible task management. TypeScript ready!",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -29,21 +29,21 @@
}, },
"homepage": "https://gitlab.com/pushrocks/taskbuffer#readme", "homepage": "https://gitlab.com/pushrocks/taskbuffer#readme",
"dependencies": { "dependencies": {
"@push.rocks/isounique": "^1.0.5", "@push.rocks/lik": "^6.0.5",
"@push.rocks/lik": "^6.0.0",
"@push.rocks/smartdelay": "^3.0.5", "@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartlog": "^3.0.1", "@push.rocks/smartlog": "^3.0.3",
"@push.rocks/smartpromise": "^4.0.3", "@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/smartrx": "^3.0.0", "@push.rocks/smartrx": "^3.0.6",
"@push.rocks/smarttime": "^4.0.1" "@push.rocks/smarttime": "^4.0.6",
"@push.rocks/smartunique": "^3.0.6"
}, },
"devDependencies": { "devDependencies": {
"@gitzone/tsbuild": "^2.1.63", "@git.zone/tsbuild": "^2.1.66",
"@gitzone/tsbundle": "^2.0.6", "@git.zone/tsbundle": "^2.0.8",
"@gitzone/tsrun": "^1.2.39", "@git.zone/tsrun": "^1.2.44",
"@gitzone/tstest": "^1.0.72", "@git.zone/tstest": "^1.0.77",
"@push.rocks/tapbundle": "^5.0.4", "@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^18.11.18" "@types/node": "^20.8.7"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

3596
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -1,116 +1,89 @@
import { expect, tap } from '@push.rocks/tapbundle'; import { expect, tap } from '@push.rocks/tapbundle';
import * as taskbuffer from '../ts/index.js'; import * as taskbuffer from '../ts/index.js';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as smartdelay from '@push.rocks/smartdelay'; import * as smartdelay from '@push.rocks/smartdelay';
// setup some testData to work with
let testTask: taskbuffer.Task;
let testPreTask = new taskbuffer.Task({
taskFunction: async () => {
console.log('preTask executed');
},
preTask: testTask,
});
// some more tasks to test with
let task1Counter = 0; // how often task 1 is being executed
let task1 = new taskbuffer.Task({
name: 'Task 1',
taskFunction: () => {
let done = smartpromise.defer();
console.log('Task1 started');
setTimeout(() => {
task1Counter++;
console.log('Task1 executed');
done.resolve();
}, 5000);
return done.promise;
},
});
let task2 = new taskbuffer.Task({
name: 'Task 1',
taskFunction: async () => {
const done = smartpromise.defer();
console.log('Task2 started');
setTimeout(() => {
console.log('Task2 executed');
done.resolve();
}, 5000);
await done.promise;
},
});
let task3 = new taskbuffer.Task({
name: 'Task 3',
taskFunction: () => {
let done = smartpromise.defer();
console.log('Task3 started');
setTimeout(() => {
console.log('Task3 executed');
done.resolve();
}, 5000);
return done.promise;
},
});
tap.test('new Task() should return a new task', async () => { tap.test('new Task() should return a new task', async () => {
testTask = new taskbuffer.Task({ const testTask = new taskbuffer.Task({
taskFunction: async () => { taskFunction: async () => {
console.log('executed twice'); console.log('executed twice');
}, },
preTask: testPreTask,
}); });
});
tap.test('expect testTask to be an instance of Task', async () => {
expect(testTask).toBeInstanceOf(taskbuffer.Task); expect(testTask).toBeInstanceOf(taskbuffer.Task);
}); });
tap.test('expect testTask.idle is true', async () => { tap.test('should have bufferMax set to the provided value', async () => {
if (!testTask.idle) { const task2 = new taskbuffer.Task({
throw new Error('testTask.idle is not true'); taskFunction: async () => {},
} });
expect(task2.bufferMax).toBeUndefined(); // test for a task without bufferMax set
const bufferedTask = new taskbuffer.Task({
taskFunction: async () => {},
buffered: true,
bufferMax: 3,
});
expect(bufferedTask.bufferMax).toEqual(3);
}); });
tap.test('testTask.running should be of type boolean and initially false', async () => { tap.test('should be able to trigger tasks multiple times', async () => {
expect(testTask.running).toBeTypeofBoolean(); let task1Counter = 0;
// tslint:disable-next-line:no-unused-expression const task1 = new taskbuffer.Task({
expect(testTask.running).toBeFalse(); name: 'Task 1',
}); taskFunction: () => {
let done = smartpromise.defer();
tap.test('testTask.trigger() should return Promise', async () => { console.log('Task1 started');
expect(testTask.trigger()).toBeInstanceOf(Promise); setTimeout(() => {
}); task1Counter++;
console.log('Task1 executed');
tap.test('testTask.trigger() returned Promise should be fullfilled', async () => { done.resolve();
await testTask.trigger(); }, 5000);
}); return done.promise;
tap.test('expect to run a task without pre and afterTask errorless', async () => {
let localTestTask = new taskbuffer.Task({
taskFunction: async () => {
console.log('only once');
}, },
}); });
await localTestTask.trigger(); await task1.trigger();
await task1.trigger();
expect(task1Counter).toEqual(2);
}); });
tap.test('expect task to run in buffered mode', async () => { tap.test('should execute setup function before the task function', async () => {
let localTestTask = new taskbuffer.Task({ const task2 = new taskbuffer.Task({
name: 'Task 2',
taskSetup: async () => {
console.log('this is the setup function for task 2. It should only run once.')
return {
nice: 'yes',
}
},
taskFunction: async (before, setupArg) => {
expect(setupArg).toEqual({ nice: 'yes' });
const done = smartpromise.defer();
console.log('Task2 started');
setTimeout(() => {
console.log('Task2 executed');
done.resolve();
}, 5000);
await done.promise;
},
});
await task2.trigger();
});
tap.test('should not exceed bufferMax when task is buffered', async () => {
let counter = 0;
const bufferedTask = new taskbuffer.Task({
taskFunction: async () => { taskFunction: async () => {
await smartdelay.delayFor(3000); counter++;
await smartdelay.delayFor(2000);
counter--;
}, },
buffered: true, buffered: true,
bufferMax: 2, bufferMax: 2,
}); });
localTestTask.trigger(); bufferedTask.trigger();
localTestTask.trigger(); bufferedTask.trigger();
localTestTask.trigger(); bufferedTask.trigger();
await localTestTask.trigger(); await smartdelay.delayFor(100);
expect(counter <= bufferedTask.bufferMax).toBeTrue();
}); });
tap.start(); tap.start();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/taskbuffer', name: '@push.rocks/taskbuffer',
version: '3.0.11', version: '3.1.7',
description: 'flexible task management. TypeScript ready!' description: 'flexible task management. TypeScript ready!'
} }

View File

@ -2,9 +2,10 @@ import { Task } from './taskbuffer.classes.task.js';
export class BufferRunner { export class BufferRunner {
public task: Task; public task: Task;
// initialze by default // initialize by default
public bufferCounter: number = 0; public bufferCounter: number = 0;
constructor(taskArg: Task) {
constructor(taskArg: Task<any>) {
this.task = taskArg; this.task = taskArg;
} }
@ -13,7 +14,7 @@ export class BufferRunner {
this.bufferCounter++; this.bufferCounter++;
} }
const returnPromise: Promise<any> = this.task.cycleCounter.getPromiseForCycle( const returnPromise: Promise<any> = this.task.cycleCounter.getPromiseForCycle(
this.bufferCounter + 1 this.bufferCounter
); );
if (!this.task.running) { if (!this.task.running) {
this._run(x); this._run(x);
@ -21,19 +22,13 @@ export class BufferRunner {
return returnPromise; return returnPromise;
} }
private _run(x: any) { private async _run(x: any) {
const recursiveBufferRunner = (x: any) => { this.task.running = true;
if (this.bufferCounter >= 0) { while (this.bufferCounter > 0) {
this.task.running = true; const result = await Task.runTask(this.task, { x: x });
Task.runTask(this.task, { x: x }).then((x) => { this.bufferCounter--;
this.bufferCounter--; // this.bufferCounter drops below 0, the recursion stops. this.task.cycleCounter.informOfCycle(result);
this.task.cycleCounter.informOfCycle(x); }
recursiveBufferRunner(x); this.task.running = false;
});
} else {
this.task.running = false;
}
};
recursiveBufferRunner(x);
} }
} }

View File

@ -9,7 +9,7 @@ export interface ICycleObject {
export class CycleCounter { export class CycleCounter {
public task: Task; public task: Task;
public cycleObjectArray: ICycleObject[] = []; public cycleObjectArray: ICycleObject[] = [];
constructor(taskArg: Task) { constructor(taskArg: Task<any>) {
this.task = taskArg; this.task = taskArg;
} }
public getPromiseForCycle(cycleCountArg: number) { public getPromiseForCycle(cycleCountArg: number) {

View File

@ -2,13 +2,11 @@ import { Task } from './taskbuffer.classes.task.js';
import * as plugins from './taskbuffer.plugins.js'; import * as plugins from './taskbuffer.plugins.js';
/** /**
* constains all data for the final coordinator to actually make an informed decision * Contains all data for the final coordinator to make an informed decision.
*/ */
export interface IDistributedTaskRequest { export interface IDistributedTaskRequest {
/** submitterId: string;
* this needs to correlate to the consultationResult requestResponseId: string;
*/
submitterRandomId: string;
taskName: string; taskName: string;
taskVersion: string; taskVersion: string;
taskExecutionTime: number; taskExecutionTime: number;
@ -18,24 +16,23 @@ export interface IDistributedTaskRequest {
} }
export interface IDistributedTaskRequestResult { export interface IDistributedTaskRequestResult {
/** submitterId: string;
* this needs to correlate to the decisionInfoBasis requestResponseId: string;
*/
submitterRandomId: string;
/**
* can be used while debugging
*/
considered: boolean; considered: boolean;
rank: string; rank: number;
reason: string; reason: string;
shouldTrigger: boolean; shouldTrigger: boolean;
} }
export abstract class AbstractDistributedCoordinator { export abstract class AbstractDistributedCoordinator {
public abstract fireDistributedTaskRequest( public abstract fireDistributedTaskRequest(
infoBasisArg: IDistributedTaskRequest infoBasis: IDistributedTaskRequest
): Promise<IDistributedTaskRequestResult>; ): Promise<IDistributedTaskRequestResult>;
public abstract updateDistributedTaskRequest( public abstract updateDistributedTaskRequest(
infoBasisArg: IDistributedTaskRequest infoBasis: IDistributedTaskRequest
): Promise<void>; ): Promise<void>;
public abstract start(): Promise<void>;
public abstract stop(): Promise<void>;
} }

View File

@ -4,20 +4,25 @@ import { CycleCounter } from './taskbuffer.classes.cyclecounter.js';
import { logger } from './taskbuffer.logging.js'; import { logger } from './taskbuffer.logging.js';
export interface ITaskFunction { export interface ITaskFunction<T = undefined> {
(x?: any): PromiseLike<any>; (x?: any, setupValue?: T): PromiseLike<any>;
} }
export type TPreOrAfterTaskFunction = () => Task; export interface ITaskSetupFunction<T = undefined> {
(): Promise<T>;
}
export class Task { export type TPreOrAfterTaskFunction = () => Task<any>;
// STATIC
public static extractTask(preOrAfterTaskArg: Task | TPreOrAfterTaskFunction): Task { export class Task<T = undefined> {
public static extractTask<T = undefined>(
preOrAfterTaskArg: Task<T> | TPreOrAfterTaskFunction
): Task<T> {
switch (true) { switch (true) {
case !preOrAfterTaskArg: case !preOrAfterTaskArg:
return null; return null;
case preOrAfterTaskArg instanceof Task: case preOrAfterTaskArg instanceof Task:
return preOrAfterTaskArg as Task; return preOrAfterTaskArg as Task<T>;
case typeof preOrAfterTaskArg === 'function': case typeof preOrAfterTaskArg === 'function':
const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction; const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction;
return taskFunction(); return taskFunction();
@ -32,7 +37,7 @@ export class Task {
return done.promise; return done.promise;
}; };
public static isTask = (taskArg: Task): boolean => { public static isTask = (taskArg: Task<any>): boolean => {
if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') { if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') {
return true; return true;
} else { } else {
@ -40,10 +45,10 @@ export class Task {
} }
}; };
public static isTaskTouched = ( public static isTaskTouched<T = undefined>(
taskArg: Task | TPreOrAfterTaskFunction, taskArg: Task<T> | TPreOrAfterTaskFunction,
touchedTasksArray: Task[] touchedTasksArray: Task<T>[]
): boolean => { ): boolean {
const taskToCheck = Task.extractTask(taskArg); const taskToCheck = Task.extractTask(taskArg);
let result = false; let result = false;
for (const keyArg in touchedTasksArray) { for (const keyArg in touchedTasksArray) {
@ -52,44 +57,54 @@ export class Task {
} }
} }
return result; return result;
}; }
public static runTask = async ( public static runTask = async <T>(
taskArg: Task | TPreOrAfterTaskFunction, taskArg: Task<T> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task[] } optionsArg: { x?: any; touchedTasksArray?: Task<T>[] }
) => { ) => {
// extracts the task in case it is specified as a return value of a function
const taskToRun = Task.extractTask(taskArg); const taskToRun = Task.extractTask(taskArg);
const done = plugins.smartpromise.defer(); const done = plugins.smartpromise.defer();
// pay respect to execDelay // Wait for all blocking tasks to finish
for (const task of taskToRun.blockingTasks) {
await task.finished;
}
if (!taskToRun.setupValue && taskToRun.taskSetup) {
taskToRun.setupValue = await taskToRun.taskSetup();
}
if (taskToRun.execDelay) { if (taskToRun.execDelay) {
await plugins.smartdelay.delayFor(taskToRun.execDelay); await plugins.smartdelay.delayFor(taskToRun.execDelay);
} }
// set running params
taskToRun.running = true; taskToRun.running = true;
done.promise.then(async () => { done.promise.then(async () => {
taskToRun.running = false; taskToRun.running = false;
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
}); });
// handle options
const options = { const options = {
...{ x: undefined, touchedTasksArray: [] }, ...{ x: undefined, touchedTasksArray: [] },
...optionsArg, ...optionsArg,
}; };
const x = options.x; const x = options.x;
const touchedTasksArray: Task[] = options.touchedTasksArray; const touchedTasksArray: Task<T>[] = options.touchedTasksArray;
touchedTasksArray.push(taskToRun); touchedTasksArray.push(taskToRun);
// run the task cascade
const localDeferred = plugins.smartpromise.defer(); const localDeferred = plugins.smartpromise.defer();
localDeferred.promise localDeferred.promise
.then(() => { .then(() => {
// lets run any preTask
if (taskToRun.preTask && !Task.isTaskTouched(taskToRun.preTask, touchedTasksArray)) { if (taskToRun.preTask && !Task.isTaskTouched(taskToRun.preTask, touchedTasksArray)) {
return Task.runTask(taskToRun.preTask, { x, touchedTasksArray }); return Task.runTask(taskToRun.preTask, { x, touchedTasksArray });
} else { } else {
@ -99,9 +114,8 @@ export class Task {
} }
}) })
.then(async (x) => { .then(async (x) => {
// lets run the main task
try { try {
return await taskToRun.taskFunction(x); return await taskToRun.taskFunction(x, taskToRun.setupValue);
} catch (e) { } catch (e) {
console.log(e); console.log(e);
} }
@ -125,16 +139,9 @@ export class Task {
return await done.promise; return await done.promise;
}; };
// INSTANCE
// mandatory properties
public name: string; public name: string;
/**
* the version of the task
* should follow semver
* might be important for DistributedCoordinator
*/
public version: string; public version: string;
public taskFunction: ITaskFunction; public taskFunction: ITaskFunction<T>;
public buffered: boolean; public buffered: boolean;
public cronJob: plugins.smarttime.CronJob; public cronJob: plugins.smarttime.CronJob;
@ -142,62 +149,52 @@ export class Task {
public execDelay: number; public execDelay: number;
public timeout: number; public timeout: number;
// tasks to run before and after public preTask: Task<T> | TPreOrAfterTaskFunction;
public preTask: Task | TPreOrAfterTaskFunction; public afterTask: Task<T> | TPreOrAfterTaskFunction;
public afterTask: Task | TPreOrAfterTaskFunction;
// Add a list to store the blocking tasks
public blockingTasks: Task[] = [];
// Add a promise that will resolve when the task has finished
private finished: Promise<void>;
private resolveFinished: () => void;
// initialize by default
public running: boolean = false; public running: boolean = false;
public bufferRunner = new BufferRunner(this); public bufferRunner = new BufferRunner(this);
public cycleCounter = new CycleCounter(this); public cycleCounter = new CycleCounter(this);
public idle: boolean = true; public get idle() {
private _state: string = 'ready'; return !this.running;
}
public taskSetup: ITaskSetupFunction<T>;
public setupValue: T;
constructor(optionsArg: { constructor(optionsArg: {
/** taskFunction: ITaskFunction<T>;
* the task function to run, must return promise preTask?: Task<T> | TPreOrAfterTaskFunction;
*/ afterTask?: Task<T> | TPreOrAfterTaskFunction;
taskFunction: ITaskFunction;
/**
* any other task to run before
*/
preTask?: Task | TPreOrAfterTaskFunction;
/**
* any other task to run after
*/
afterTask?: Task | TPreOrAfterTaskFunction;
/**
* wether this task should run buffered
*/
buffered?: boolean; buffered?: boolean;
/**
* the maximum buffer
*/
bufferMax?: number; bufferMax?: number;
/**
* the execution delay, before the task is executed
* only makes sense when running in buffered mode
*/
execDelay?: number; execDelay?: number;
/**
* the name of the task
*/
name?: string; name?: string;
taskSetup?: ITaskSetupFunction<T>;
}) { }) {
this.taskFunction = optionsArg.taskFunction; this.taskFunction = optionsArg.taskFunction;
this.preTask = optionsArg.preTask; this.preTask = optionsArg.preTask;
this.afterTask = optionsArg.afterTask; this.afterTask = optionsArg.afterTask;
this.idle = !this.running;
this.buffered = optionsArg.buffered; this.buffered = optionsArg.buffered;
this.bufferMax = optionsArg.bufferMax; this.bufferMax = optionsArg.bufferMax;
this.execDelay = optionsArg.execDelay; this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name; this.name = optionsArg.name;
this.taskSetup = optionsArg.taskSetup;
// Create the finished promise
this.finished = new Promise((resolve) => {
this.resolveFinished = resolve;
});
} }
/**
* trigger the task. Will trigger buffered if this.buffered is true
*/
public trigger(x?: any): Promise<any> { public trigger(x?: any): Promise<any> {
if (this.buffered) { if (this.buffered) {
return this.triggerBuffered(x); return this.triggerBuffered(x);
@ -206,31 +203,11 @@ export class Task {
} }
} }
/**
* trigger task unbuffered.
* will actually run the task, not considering any buffered limits.
*/
public triggerUnBuffered(x?: any): Promise<any> { public triggerUnBuffered(x?: any): Promise<any> {
return Task.runTask(this, { x: x }); return Task.runTask<T>(this, { x: x });
} }
/**
* trigger task buffered.
* note: .trigger() also calls this function
*/
public triggerBuffered(x?: any): Promise<any> { public triggerBuffered(x?: any): Promise<any> {
return this.bufferRunner.trigger(x); return this.bufferRunner.trigger(x);
} }
get state(): string {
return this._state;
}
set state(stateArg: string) {
if (stateArg === 'locked') {
this._state = 'locked';
} else {
logger.log('error', `state type ${stateArg} could not be set`);
}
}
} }

View File

@ -1,6 +1,6 @@
import * as plugins from './taskbuffer.plugins.js'; import * as plugins from './taskbuffer.plugins.js';
import { Task, ITaskFunction } from './taskbuffer.classes.task.js'; import { Task, type ITaskFunction } from './taskbuffer.classes.task.js';
export class TaskDebounced<T = unknown> extends Task { export class TaskDebounced<T = unknown> extends Task {
private _debouncedTaskFunction: ITaskFunction; private _debouncedTaskFunction: ITaskFunction;

View File

@ -1,10 +1,10 @@
import * as plugins from './taskbuffer.plugins.js'; import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js'; import { Task } from './taskbuffer.classes.task.js';
import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js';
export interface ICronJob { export interface ICronJob {
cronString: string; cronString: string;
taskNameArg: string; taskName: string;
job: any; job: any;
} }
@ -13,57 +13,37 @@ export interface ITaskManagerConstructorOptions {
} }
export class TaskManager { export class TaskManager {
public randomId = plugins.isounique.uni(); public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task>(); public taskMap = new plugins.lik.ObjectMap<Task>();
private cronJobManager = new plugins.smarttime.CronManager(); private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = { public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null, distributedCoordinator: null,
}; };
constructor(optionosArg: ITaskManagerConstructorOptions = {}) { constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, optionosArg); this.options = Object.assign(this.options, options);
} }
/** public getTaskByName(taskName: string): Task {
* checks if a task is already present return this.taskMap.findSync((task) => task.name === taskName);
* @param taskNameArg
*/
public getTaskByName(taskNameArg: string): Task {
return this.taskMap.findSync((itemArg) => {
return itemArg.name === taskNameArg;
});
} }
/** public addTask(task: Task): void {
* adds a Task to the TaskManager if (!task.name) {
* @param taskArg throw new Error('Task must have a name to be added to taskManager');
*/
public addTask(taskArg: Task): void {
if (!taskArg.name) {
throw new Error('taskArg needs a name to be added to taskManager');
} }
this.taskMap.add(taskArg); this.taskMap.add(task);
} }
/** public addAndScheduleTask(task: Task, cronString: string) {
* adds and schedules a task at once this.addTask(task);
* @param taskArg this.scheduleTaskByName(task.name, cronString);
* @param cronStringArg
*/
public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
this.addTask(taskArg);
this.scheduleTaskByName(taskArg.name, cronStringArg);
} }
/** public async triggerTaskByName(taskName: string): Promise<any> {
* triggers a task in the TaskManagerByName const taskToTrigger = this.getTaskByName(taskName);
* @param taskNameArg
*/
public triggerTaskByName(taskNameArg: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskNameArg);
if (!taskToTrigger) { if (!taskToTrigger) {
throw new Error(`There is no task with the name of ${taskNameArg}`); throw new Error(`No task with the name ${taskName} found.`);
} }
return taskToTrigger.trigger(); return taskToTrigger.trigger();
} }
@ -72,88 +52,88 @@ export class TaskManager {
return task.trigger(); return task.trigger();
} }
/** public scheduleTaskByName(taskName: string, cronString: string) {
* schedules the task by name const taskToSchedule = this.getTaskByName(taskName);
* @param taskNameArg if (!taskToSchedule) {
*/ throw new Error(`No task with the name ${taskName} found.`);
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) { }
const taskToSchedule = this.getTaskByName(taskNameArg); this.handleTaskScheduling(taskToSchedule, cronString);
const cronJob = this.cronJobManager.addCronjob(
cronStringArg,
async (triggerTimeArg: number) => {
console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
console.log(
`task >>${taskToSchedule.name}<< is ${
taskToSchedule.buffered
? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
: `unbuffered`
}`
);
if (this.options.distributedCoordinator) {
console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
const announcementResult =
await this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterRandomId: this.randomId,
status: 'requesting',
taskExecutionParallel: 1,
taskExecutionTime: triggerTimeArg,
taskExecutionTimeout: taskToSchedule.timeout,
taskName: taskToSchedule.name,
taskVersion: taskToSchedule.version,
});
if (!announcementResult.shouldTrigger) {
console.log('distributed coordinator result: NOT EXECUTING');
return;
} else {
console.log('distributed coordinator result: CHOSEN AND EXECUTING');
}
}
await taskToSchedule.trigger();
}
);
taskToSchedule.cronJob = cronJob;
} }
/** private handleTaskScheduling(task: Task, cronString: string) {
* deschedules a task by name const cronJob = this.cronJobManager.addCronjob(
* @param taskNameArg cronString,
*/ async (triggerTime: number) => {
public descheduleTaskByName(taskNameArg: string) { this.logTaskState(task);
const taskToDeSchedule = this.getTaskByName(taskNameArg); if (this.options.distributedCoordinator) {
if (taskToDeSchedule.cronJob) { const announcementResult = await this.performDistributedConsultation(task, triggerTime);
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob); if (!announcementResult.shouldTrigger) {
taskToDeSchedule.cronJob = null; console.log('Distributed coordinator result: NOT EXECUTING');
return;
} else {
console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
}
}
await task.trigger();
}
);
task.cronJob = cronJob;
}
private logTaskState(task: Task) {
console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
: `unbuffered`;
console.log(`Task >>${task.name}<< is ${bufferState}`);
}
private async performDistributedConsultation(task: Task, triggerTime: number): Promise<IDistributedTaskRequestResult> {
console.log('Found a distributed coordinator, performing consultation.');
return this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterId: this.randomId,
requestResponseId: plugins.smartunique.shortId(),
status: 'requesting',
taskExecutionParallel: 1,
taskExecutionTime: triggerTime,
taskExecutionTimeout: task.timeout,
taskName: task.name,
taskVersion: task.version,
});
}
public descheduleTaskByName(taskName: string) {
const task = this.getTaskByName(taskName);
if (task && task.cronJob) {
this.cronJobManager.removeCronjob(task.cronJob);
task.cronJob = null;
} }
if (this.cronJobManager.cronjobs.isEmpty) { if (this.cronJobManager.cronjobs.isEmpty) {
this.cronJobManager.stop(); this.cronJobManager.stop();
} }
} }
/**
* deschedules a task
* @param task
*/
public async descheduleTask(task: Task) { public async descheduleTask(task: Task) {
await this.descheduleTaskByName(task.name); await this.descheduleTaskByName(task.name);
} }
/**
* returns all schedules of a specific task
* @param taskNameArg
*/
public getSchedulesForTaskName(taskNameArg: string) {}
/** public getScheduleForTaskName(taskName: string): string | null {
* starts the taskmanager const task = this.getTaskByName(taskName);
*/ return task && task.cronJob ? task.cronJob.cronExpression : null;
public start() { }
public async start() {
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.start();
}
this.cronJobManager.start(); this.cronJobManager.start();
} }
/** public async stop() {
* stops the taskmanager
*/
public stop() {
this.cronJobManager.stop(); this.cronJobManager.stop();
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.stop();
}
} }
} }

View File

@ -1,6 +1,6 @@
import * as plugins from './taskbuffer.plugins.js'; import * as plugins from './taskbuffer.plugins.js';
import { Task, ITaskFunction } from './taskbuffer.classes.task.js'; import { Task, type ITaskFunction } from './taskbuffer.classes.task.js';
/** /**
* TaskOnce is run exactly once, no matter how often it is triggered * TaskOnce is run exactly once, no matter how often it is triggered

View File

@ -1,9 +1,9 @@
import * as isounique from '@push.rocks/isounique';
import * as lik from '@push.rocks/lik'; import * as lik from '@push.rocks/lik';
import * as smartlog from '@push.rocks/smartlog'; import * as smartlog from '@push.rocks/smartlog';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as smartdelay from '@push.rocks/smartdelay'; import * as smartdelay from '@push.rocks/smartdelay';
import * as smartrx from '@push.rocks/smartrx'; import * as smartrx from '@push.rocks/smartrx';
import * as smarttime from '@push.rocks/smarttime'; import * as smarttime from '@push.rocks/smarttime';
import * as smartunique from '@push.rocks/smartunique';
export { isounique, lik, smartlog, smartpromise, smartdelay, smartrx, smarttime }; export { lik, smartlog, smartpromise, smartdelay, smartrx, smarttime, smartunique };

View File

@ -3,8 +3,12 @@
"experimentalDecorators": true, "experimentalDecorators": true,
"useDefineForClassFields": false, "useDefineForClassFields": false,
"target": "ES2022", "target": "ES2022",
"module": "ES2022", "module": "NodeNext",
"moduleResolution": "nodenext", "moduleResolution": "NodeNext",
"esModuleInterop": true "esModuleInterop": true,
} "verbatimModuleSyntax": true
},
"exclude": [
"dist_*/**/*.d.ts"
]
} }