Compare commits

..

20 Commits

Author SHA1 Message Date
1cba9245fe 3.1.5 2023-08-15 12:02:45 +02:00
4b0db4da21 fix(core): update 2023-08-15 12:02:44 +02:00
a6cd0cb579 3.1.4 2023-08-15 11:39:32 +02:00
bd6109d5ea fix(core): update 2023-08-15 11:39:31 +02:00
aa632a5294 3.1.3 2023-08-13 09:37:24 +02:00
6499dd45cf fix(core): update 2023-08-13 09:37:23 +02:00
d04ed21607 3.1.2 2023-08-12 12:24:11 +02:00
bae776d4e9 fix(core): update 2023-08-12 12:24:10 +02:00
fcd7ea467e 3.1.1 2023-08-12 12:10:38 +02:00
e061b96056 fix(core): update 2023-08-12 12:10:37 +02:00
c2ce669f0c 3.1.0 2023-08-04 13:03:29 +02:00
05f91c3e35 feat(Task): Tasks can now be blocked by other tasks. 2023-08-04 13:03:28 +02:00
94e327c722 3.0.15 2023-08-04 11:58:54 +02:00
57a27604a7 fix(core): update 2023-08-04 11:58:53 +02:00
b077bd7a1b 3.0.14 2023-08-02 02:30:14 +02:00
f2c2dab782 fix(core): update 2023-08-02 02:30:13 +02:00
53a67c0ebe 3.0.13 2023-08-02 00:51:44 +02:00
5240a80cb3 fix(core): update 2023-08-02 00:51:43 +02:00
fa8be6b6d3 3.0.12 2023-08-02 00:07:22 +02:00
b5981d67cf fix(core): update 2023-08-02 00:07:21 +02:00
13 changed files with 2456 additions and 1381 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@push.rocks/taskbuffer",
"version": "3.0.11",
"version": "3.1.5",
"private": false,
"description": "flexible task management. TypeScript ready!",
"main": "dist_ts/index.js",
@ -29,21 +29,21 @@
},
"homepage": "https://gitlab.com/pushrocks/taskbuffer#readme",
"dependencies": {
"@push.rocks/isounique": "^1.0.5",
"@push.rocks/lik": "^6.0.0",
"@push.rocks/lik": "^6.0.5",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartlog": "^3.0.1",
"@push.rocks/smartlog": "^3.0.3",
"@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/smartrx": "^3.0.0",
"@push.rocks/smarttime": "^4.0.1"
"@push.rocks/smartrx": "^3.0.6",
"@push.rocks/smarttime": "^4.0.5",
"@push.rocks/smartunique": "^3.0.6"
},
"devDependencies": {
"@gitzone/tsbuild": "^2.1.63",
"@gitzone/tsbundle": "^2.0.6",
"@gitzone/tsrun": "^1.2.39",
"@gitzone/tstest": "^1.0.72",
"@push.rocks/tapbundle": "^5.0.4",
"@types/node": "^18.11.18"
"@gitzone/tsbuild": "^2.1.66",
"@gitzone/tsbundle": "^2.0.8",
"@gitzone/tsrun": "^1.2.44",
"@gitzone/tstest": "^1.0.77",
"@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.5.0"
},
"files": [
"ts/**/*",

3246
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -1,116 +1,89 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as taskbuffer from '../ts/index.js';
import * as smartpromise from '@push.rocks/smartpromise';
import * as smartdelay from '@push.rocks/smartdelay';
// setup some testData to work with
let testTask: taskbuffer.Task;
let testPreTask = new taskbuffer.Task({
taskFunction: async () => {
console.log('preTask executed');
},
preTask: testTask,
});
// some more tasks to test with
let task1Counter = 0; // how often task 1 is being executed
let task1 = new taskbuffer.Task({
name: 'Task 1',
taskFunction: () => {
let done = smartpromise.defer();
console.log('Task1 started');
setTimeout(() => {
task1Counter++;
console.log('Task1 executed');
done.resolve();
}, 5000);
return done.promise;
},
});
let task2 = new taskbuffer.Task({
name: 'Task 1',
taskFunction: async () => {
const done = smartpromise.defer();
console.log('Task2 started');
setTimeout(() => {
console.log('Task2 executed');
done.resolve();
}, 5000);
await done.promise;
},
});
let task3 = new taskbuffer.Task({
name: 'Task 3',
taskFunction: () => {
let done = smartpromise.defer();
console.log('Task3 started');
setTimeout(() => {
console.log('Task3 executed');
done.resolve();
}, 5000);
return done.promise;
},
});
tap.test('new Task() should return a new task', async () => {
testTask = new taskbuffer.Task({
const testTask = new taskbuffer.Task({
taskFunction: async () => {
console.log('executed twice');
},
preTask: testPreTask,
});
});
tap.test('expect testTask to be an instance of Task', async () => {
expect(testTask).toBeInstanceOf(taskbuffer.Task);
});
tap.test('expect testTask.idle is true', async () => {
if (!testTask.idle) {
throw new Error('testTask.idle is not true');
}
tap.test('should have bufferMax set to the provided value', async () => {
const task2 = new taskbuffer.Task({
taskFunction: async () => {},
});
expect(task2.bufferMax).toBeUndefined(); // test for a task without bufferMax set
const bufferedTask = new taskbuffer.Task({
taskFunction: async () => {},
buffered: true,
bufferMax: 3,
});
expect(bufferedTask.bufferMax).toEqual(3);
});
tap.test('testTask.running should be of type boolean and initially false', async () => {
expect(testTask.running).toBeTypeofBoolean();
// tslint:disable-next-line:no-unused-expression
expect(testTask.running).toBeFalse();
});
tap.test('testTask.trigger() should return Promise', async () => {
expect(testTask.trigger()).toBeInstanceOf(Promise);
});
tap.test('testTask.trigger() returned Promise should be fullfilled', async () => {
await testTask.trigger();
});
tap.test('expect to run a task without pre and afterTask errorless', async () => {
let localTestTask = new taskbuffer.Task({
taskFunction: async () => {
console.log('only once');
tap.test('should be able to trigger tasks multiple times', async () => {
let task1Counter = 0;
const task1 = new taskbuffer.Task({
name: 'Task 1',
taskFunction: () => {
let done = smartpromise.defer();
console.log('Task1 started');
setTimeout(() => {
task1Counter++;
console.log('Task1 executed');
done.resolve();
}, 5000);
return done.promise;
},
});
await localTestTask.trigger();
await task1.trigger();
await task1.trigger();
expect(task1Counter).toEqual(2);
});
tap.test('expect task to run in buffered mode', async () => {
let localTestTask = new taskbuffer.Task({
tap.test('should execute setup function before the task function', async () => {
const task2 = new taskbuffer.Task({
name: 'Task 2',
taskSetup: async () => {
console.log('this is the setup function for task 2. It should only run once.')
return {
nice: 'yes',
}
},
taskFunction: async (before, setupArg) => {
expect(setupArg).toEqual({ nice: 'yes' });
const done = smartpromise.defer();
console.log('Task2 started');
setTimeout(() => {
console.log('Task2 executed');
done.resolve();
}, 5000);
await done.promise;
},
});
await task2.trigger();
});
tap.test('should not exceed bufferMax when task is buffered', async () => {
let counter = 0;
const bufferedTask = new taskbuffer.Task({
taskFunction: async () => {
await smartdelay.delayFor(3000);
counter++;
await smartdelay.delayFor(2000);
counter--;
},
buffered: true,
bufferMax: 2,
});
localTestTask.trigger();
localTestTask.trigger();
localTestTask.trigger();
await localTestTask.trigger();
bufferedTask.trigger();
bufferedTask.trigger();
bufferedTask.trigger();
await smartdelay.delayFor(100);
expect(counter <= bufferedTask.bufferMax).toBeTrue();
});
tap.start();

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '3.0.11',
version: '3.1.5',
description: 'flexible task management. TypeScript ready!'
}

View File

@ -2,9 +2,10 @@ import { Task } from './taskbuffer.classes.task.js';
export class BufferRunner {
public task: Task;
// initialze by default
// initialize by default
public bufferCounter: number = 0;
constructor(taskArg: Task) {
constructor(taskArg: Task<any>) {
this.task = taskArg;
}
@ -13,7 +14,7 @@ export class BufferRunner {
this.bufferCounter++;
}
const returnPromise: Promise<any> = this.task.cycleCounter.getPromiseForCycle(
this.bufferCounter + 1
this.bufferCounter
);
if (!this.task.running) {
this._run(x);
@ -21,19 +22,13 @@ export class BufferRunner {
return returnPromise;
}
private _run(x: any) {
const recursiveBufferRunner = (x: any) => {
if (this.bufferCounter >= 0) {
this.task.running = true;
Task.runTask(this.task, { x: x }).then((x) => {
this.bufferCounter--; // this.bufferCounter drops below 0, the recursion stops.
this.task.cycleCounter.informOfCycle(x);
recursiveBufferRunner(x);
});
} else {
this.task.running = false;
}
};
recursiveBufferRunner(x);
private async _run(x: any) {
this.task.running = true;
while (this.bufferCounter > 0) {
const result = await Task.runTask(this.task, { x: x });
this.bufferCounter--;
this.task.cycleCounter.informOfCycle(result);
}
this.task.running = false;
}
}

View File

@ -9,7 +9,7 @@ export interface ICycleObject {
export class CycleCounter {
public task: Task;
public cycleObjectArray: ICycleObject[] = [];
constructor(taskArg: Task) {
constructor(taskArg: Task<any>) {
this.task = taskArg;
}
public getPromiseForCycle(cycleCountArg: number) {

View File

@ -2,13 +2,11 @@ import { Task } from './taskbuffer.classes.task.js';
import * as plugins from './taskbuffer.plugins.js';
/**
* constains all data for the final coordinator to actually make an informed decision
* Contains all data for the final coordinator to make an informed decision.
*/
export interface IDistributedTaskRequest {
/**
* this needs to correlate to the consultationResult
*/
submitterRandomId: string;
submitterId: string;
requestResponseId: string;
taskName: string;
taskVersion: string;
taskExecutionTime: number;
@ -18,24 +16,23 @@ export interface IDistributedTaskRequest {
}
export interface IDistributedTaskRequestResult {
/**
* this needs to correlate to the decisionInfoBasis
*/
submitterRandomId: string;
/**
* can be used while debugging
*/
submitterId: string;
requestResponseId: string;
considered: boolean;
rank: string;
rank: number;
reason: string;
shouldTrigger: boolean;
}
export abstract class AbstractDistributedCoordinator {
public abstract fireDistributedTaskRequest(
infoBasisArg: IDistributedTaskRequest
infoBasis: IDistributedTaskRequest
): Promise<IDistributedTaskRequestResult>;
public abstract updateDistributedTaskRequest(
infoBasisArg: IDistributedTaskRequest
infoBasis: IDistributedTaskRequest
): Promise<void>;
public abstract start(): Promise<void>;
public abstract stop(): Promise<void>;
}

View File

@ -4,20 +4,25 @@ import { CycleCounter } from './taskbuffer.classes.cyclecounter.js';
import { logger } from './taskbuffer.logging.js';
export interface ITaskFunction {
(x?: any): PromiseLike<any>;
export interface ITaskFunction<T = undefined> {
(x?: any, setupValue?: T): PromiseLike<any>;
}
export type TPreOrAfterTaskFunction = () => Task;
export interface ITaskSetupFunction<T = undefined> {
(): Promise<T>;
}
export class Task {
// STATIC
public static extractTask(preOrAfterTaskArg: Task | TPreOrAfterTaskFunction): Task {
export type TPreOrAfterTaskFunction = () => Task<any>;
export class Task<T = undefined> {
public static extractTask<T = undefined>(
preOrAfterTaskArg: Task<T> | TPreOrAfterTaskFunction
): Task<T> {
switch (true) {
case !preOrAfterTaskArg:
return null;
case preOrAfterTaskArg instanceof Task:
return preOrAfterTaskArg as Task;
return preOrAfterTaskArg as Task<T>;
case typeof preOrAfterTaskArg === 'function':
const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction;
return taskFunction();
@ -32,7 +37,7 @@ export class Task {
return done.promise;
};
public static isTask = (taskArg: Task): boolean => {
public static isTask = (taskArg: Task<any>): boolean => {
if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') {
return true;
} else {
@ -40,10 +45,10 @@ export class Task {
}
};
public static isTaskTouched = (
taskArg: Task | TPreOrAfterTaskFunction,
touchedTasksArray: Task[]
): boolean => {
public static isTaskTouched<T = undefined>(
taskArg: Task<T> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T>[]
): boolean {
const taskToCheck = Task.extractTask(taskArg);
let result = false;
for (const keyArg in touchedTasksArray) {
@ -52,44 +57,54 @@ export class Task {
}
}
return result;
};
}
public static runTask = async (
taskArg: Task | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task[] }
public static runTask = async <T>(
taskArg: Task<T> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T>[] }
) => {
// extracts the task in case it is specified as a return value of a function
const taskToRun = Task.extractTask(taskArg);
const done = plugins.smartpromise.defer();
// pay respect to execDelay
// Wait for all blocking tasks to finish
for (const task of taskToRun.blockingTasks) {
await task.finished;
}
if (!taskToRun.setupValue && taskToRun.taskSetup) {
taskToRun.setupValue = await taskToRun.taskSetup();
}
if (taskToRun.execDelay) {
await plugins.smartdelay.delayFor(taskToRun.execDelay);
}
// set running params
taskToRun.running = true;
done.promise.then(async () => {
taskToRun.running = false;
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
});
// handle options
const options = {
...{ x: undefined, touchedTasksArray: [] },
...optionsArg,
};
const x = options.x;
const touchedTasksArray: Task[] = options.touchedTasksArray;
const touchedTasksArray: Task<T>[] = options.touchedTasksArray;
touchedTasksArray.push(taskToRun);
// run the task cascade
const localDeferred = plugins.smartpromise.defer();
localDeferred.promise
.then(() => {
// lets run any preTask
if (taskToRun.preTask && !Task.isTaskTouched(taskToRun.preTask, touchedTasksArray)) {
return Task.runTask(taskToRun.preTask, { x, touchedTasksArray });
} else {
@ -99,9 +114,8 @@ export class Task {
}
})
.then(async (x) => {
// lets run the main task
try {
return await taskToRun.taskFunction(x);
return await taskToRun.taskFunction(x, taskToRun.setupValue);
} catch (e) {
console.log(e);
}
@ -125,16 +139,9 @@ export class Task {
return await done.promise;
};
// INSTANCE
// mandatory properties
public name: string;
/**
* the version of the task
* should follow semver
* might be important for DistributedCoordinator
*/
public version: string;
public taskFunction: ITaskFunction;
public taskFunction: ITaskFunction<T>;
public buffered: boolean;
public cronJob: plugins.smarttime.CronJob;
@ -142,62 +149,52 @@ export class Task {
public execDelay: number;
public timeout: number;
// tasks to run before and after
public preTask: Task | TPreOrAfterTaskFunction;
public afterTask: Task | TPreOrAfterTaskFunction;
public preTask: Task<T> | TPreOrAfterTaskFunction;
public afterTask: Task<T> | TPreOrAfterTaskFunction;
// Add a list to store the blocking tasks
public blockingTasks: Task[] = [];
// Add a promise that will resolve when the task has finished
private finished: Promise<void>;
private resolveFinished: () => void;
// initialize by default
public running: boolean = false;
public bufferRunner = new BufferRunner(this);
public cycleCounter = new CycleCounter(this);
public idle: boolean = true;
private _state: string = 'ready';
public get idle() {
return !this.running;
}
public taskSetup: ITaskSetupFunction<T>;
public setupValue: T;
constructor(optionsArg: {
/**
* the task function to run, must return promise
*/
taskFunction: ITaskFunction;
/**
* any other task to run before
*/
preTask?: Task | TPreOrAfterTaskFunction;
/**
* any other task to run after
*/
afterTask?: Task | TPreOrAfterTaskFunction;
/**
* wether this task should run buffered
*/
taskFunction: ITaskFunction<T>;
preTask?: Task<T> | TPreOrAfterTaskFunction;
afterTask?: Task<T> | TPreOrAfterTaskFunction;
buffered?: boolean;
/**
* the maximum buffer
*/
bufferMax?: number;
/**
* the execution delay, before the task is executed
* only makes sense when running in buffered mode
*/
execDelay?: number;
/**
* the name of the task
*/
name?: string;
taskSetup?: ITaskSetupFunction<T>;
}) {
this.taskFunction = optionsArg.taskFunction;
this.preTask = optionsArg.preTask;
this.afterTask = optionsArg.afterTask;
this.idle = !this.running;
this.buffered = optionsArg.buffered;
this.bufferMax = optionsArg.bufferMax;
this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name;
this.taskSetup = optionsArg.taskSetup;
// Create the finished promise
this.finished = new Promise((resolve) => {
this.resolveFinished = resolve;
});
}
/**
* trigger the task. Will trigger buffered if this.buffered is true
*/
public trigger(x?: any): Promise<any> {
if (this.buffered) {
return this.triggerBuffered(x);
@ -206,31 +203,11 @@ export class Task {
}
}
/**
* trigger task unbuffered.
* will actually run the task, not considering any buffered limits.
*/
public triggerUnBuffered(x?: any): Promise<any> {
return Task.runTask(this, { x: x });
return Task.runTask<T>(this, { x: x });
}
/**
* trigger task buffered.
* note: .trigger() also calls this function
*/
public triggerBuffered(x?: any): Promise<any> {
return this.bufferRunner.trigger(x);
}
get state(): string {
return this._state;
}
set state(stateArg: string) {
if (stateArg === 'locked') {
this._state = 'locked';
} else {
logger.log('error', `state type ${stateArg} could not be set`);
}
}
}

View File

@ -1,6 +1,6 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task, ITaskFunction } from './taskbuffer.classes.task.js';
import { Task, type ITaskFunction } from './taskbuffer.classes.task.js';
export class TaskDebounced<T = unknown> extends Task {
private _debouncedTaskFunction: ITaskFunction;

View File

@ -1,10 +1,10 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js';
import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js';
export interface ICronJob {
cronString: string;
taskNameArg: string;
taskName: string;
job: any;
}
@ -13,57 +13,37 @@ export interface ITaskManagerConstructorOptions {
}
export class TaskManager {
public randomId = plugins.isounique.uni();
public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task>();
private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null,
};
constructor(optionosArg: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, optionosArg);
constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, options);
}
/**
* checks if a task is already present
* @param taskNameArg
*/
public getTaskByName(taskNameArg: string): Task {
return this.taskMap.findSync((itemArg) => {
return itemArg.name === taskNameArg;
});
public getTaskByName(taskName: string): Task {
return this.taskMap.findSync((task) => task.name === taskName);
}
/**
* adds a Task to the TaskManager
* @param taskArg
*/
public addTask(taskArg: Task): void {
if (!taskArg.name) {
throw new Error('taskArg needs a name to be added to taskManager');
public addTask(task: Task): void {
if (!task.name) {
throw new Error('Task must have a name to be added to taskManager');
}
this.taskMap.add(taskArg);
this.taskMap.add(task);
}
/**
* adds and schedules a task at once
* @param taskArg
* @param cronStringArg
*/
public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
this.addTask(taskArg);
this.scheduleTaskByName(taskArg.name, cronStringArg);
public addAndScheduleTask(task: Task, cronString: string) {
this.addTask(task);
this.scheduleTaskByName(task.name, cronString);
}
/**
* triggers a task in the TaskManagerByName
* @param taskNameArg
*/
public triggerTaskByName(taskNameArg: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskNameArg);
public async triggerTaskByName(taskName: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskName);
if (!taskToTrigger) {
throw new Error(`There is no task with the name of ${taskNameArg}`);
throw new Error(`No task with the name ${taskName} found.`);
}
return taskToTrigger.trigger();
}
@ -72,88 +52,88 @@ export class TaskManager {
return task.trigger();
}
/**
* schedules the task by name
* @param taskNameArg
*/
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) {
const taskToSchedule = this.getTaskByName(taskNameArg);
const cronJob = this.cronJobManager.addCronjob(
cronStringArg,
async (triggerTimeArg: number) => {
console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
console.log(
`task >>${taskToSchedule.name}<< is ${
taskToSchedule.buffered
? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
: `unbuffered`
}`
);
if (this.options.distributedCoordinator) {
console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
const announcementResult =
await this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterRandomId: this.randomId,
status: 'requesting',
taskExecutionParallel: 1,
taskExecutionTime: triggerTimeArg,
taskExecutionTimeout: taskToSchedule.timeout,
taskName: taskToSchedule.name,
taskVersion: taskToSchedule.version,
});
if (!announcementResult.shouldTrigger) {
console.log('distributed coordinator result: NOT EXECUTING');
return;
} else {
console.log('distributed coordinator result: CHOSEN AND EXECUTING');
}
}
await taskToSchedule.trigger();
}
);
taskToSchedule.cronJob = cronJob;
public scheduleTaskByName(taskName: string, cronString: string) {
const taskToSchedule = this.getTaskByName(taskName);
if (!taskToSchedule) {
throw new Error(`No task with the name ${taskName} found.`);
}
this.handleTaskScheduling(taskToSchedule, cronString);
}
/**
* deschedules a task by name
* @param taskNameArg
*/
public descheduleTaskByName(taskNameArg: string) {
const taskToDeSchedule = this.getTaskByName(taskNameArg);
if (taskToDeSchedule.cronJob) {
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
taskToDeSchedule.cronJob = null;
private handleTaskScheduling(task: Task, cronString: string) {
const cronJob = this.cronJobManager.addCronjob(
cronString,
async (triggerTime: number) => {
this.logTaskState(task);
if (this.options.distributedCoordinator) {
const announcementResult = await this.performDistributedConsultation(task, triggerTime);
if (!announcementResult.shouldTrigger) {
console.log('Distributed coordinator result: NOT EXECUTING');
return;
} else {
console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
}
}
await task.trigger();
}
);
task.cronJob = cronJob;
}
private logTaskState(task: Task) {
console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
: `unbuffered`;
console.log(`Task >>${task.name}<< is ${bufferState}`);
}
private async performDistributedConsultation(task: Task, triggerTime: number): Promise<IDistributedTaskRequestResult> {
console.log('Found a distributed coordinator, performing consultation.');
return this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterId: this.randomId,
requestResponseId: plugins.smartunique.shortId(),
status: 'requesting',
taskExecutionParallel: 1,
taskExecutionTime: triggerTime,
taskExecutionTimeout: task.timeout,
taskName: task.name,
taskVersion: task.version,
});
}
public descheduleTaskByName(taskName: string) {
const task = this.getTaskByName(taskName);
if (task && task.cronJob) {
this.cronJobManager.removeCronjob(task.cronJob);
task.cronJob = null;
}
if (this.cronJobManager.cronjobs.isEmpty) {
this.cronJobManager.stop();
}
}
/**
* deschedules a task
* @param task
*/
public async descheduleTask(task: Task) {
await this.descheduleTaskByName(task.name);
}
/**
* returns all schedules of a specific task
* @param taskNameArg
*/
public getSchedulesForTaskName(taskNameArg: string) {}
/**
* starts the taskmanager
*/
public start() {
public getScheduleForTaskName(taskName: string): string | null {
const task = this.getTaskByName(taskName);
return task && task.cronJob ? task.cronJob.cronExpression : null;
}
public async start() {
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.start();
}
this.cronJobManager.start();
}
/**
* stops the taskmanager
*/
public stop() {
public async stop() {
this.cronJobManager.stop();
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.stop();
}
}
}

View File

@ -1,6 +1,6 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task, ITaskFunction } from './taskbuffer.classes.task.js';
import { Task, type ITaskFunction } from './taskbuffer.classes.task.js';
/**
* TaskOnce is run exactly once, no matter how often it is triggered

View File

@ -1,9 +1,9 @@
import * as isounique from '@push.rocks/isounique';
import * as lik from '@push.rocks/lik';
import * as smartlog from '@push.rocks/smartlog';
import * as smartpromise from '@push.rocks/smartpromise';
import * as smartdelay from '@push.rocks/smartdelay';
import * as smartrx from '@push.rocks/smartrx';
import * as smarttime from '@push.rocks/smarttime';
import * as smartunique from '@push.rocks/smartunique';
export { isounique, lik, smartlog, smartpromise, smartdelay, smartrx, smarttime };
export { lik, smartlog, smartpromise, smartdelay, smartrx, smarttime, smartunique };

View File

@ -5,6 +5,7 @@
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "nodenext",
"esModuleInterop": true
"esModuleInterop": true,
"verbatimModuleSyntax": true,
}
}