Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
1cba9245fe | |||
4b0db4da21 | |||
a6cd0cb579 | |||
bd6109d5ea | |||
aa632a5294 | |||
6499dd45cf | |||
d04ed21607 | |||
bae776d4e9 | |||
fcd7ea467e | |||
e061b96056 |
12
package.json
12
package.json
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/taskbuffer",
|
"name": "@push.rocks/taskbuffer",
|
||||||
"version": "3.1.0",
|
"version": "3.1.5",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "flexible task management. TypeScript ready!",
|
"description": "flexible task management. TypeScript ready!",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
@ -29,21 +29,21 @@
|
|||||||
},
|
},
|
||||||
"homepage": "https://gitlab.com/pushrocks/taskbuffer#readme",
|
"homepage": "https://gitlab.com/pushrocks/taskbuffer#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@push.rocks/isounique": "^1.0.5",
|
"@push.rocks/lik": "^6.0.5",
|
||||||
"@push.rocks/lik": "^6.0.3",
|
|
||||||
"@push.rocks/smartdelay": "^3.0.5",
|
"@push.rocks/smartdelay": "^3.0.5",
|
||||||
"@push.rocks/smartlog": "^3.0.3",
|
"@push.rocks/smartlog": "^3.0.3",
|
||||||
"@push.rocks/smartpromise": "^4.0.3",
|
"@push.rocks/smartpromise": "^4.0.3",
|
||||||
"@push.rocks/smartrx": "^3.0.6",
|
"@push.rocks/smartrx": "^3.0.6",
|
||||||
"@push.rocks/smarttime": "^4.0.4"
|
"@push.rocks/smarttime": "^4.0.5",
|
||||||
|
"@push.rocks/smartunique": "^3.0.6"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@gitzone/tsbuild": "^2.1.66",
|
"@gitzone/tsbuild": "^2.1.66",
|
||||||
"@gitzone/tsbundle": "^2.0.8",
|
"@gitzone/tsbundle": "^2.0.8",
|
||||||
"@gitzone/tsrun": "^1.2.44",
|
"@gitzone/tsrun": "^1.2.44",
|
||||||
"@gitzone/tstest": "^1.0.77",
|
"@gitzone/tstest": "^1.0.77",
|
||||||
"@push.rocks/tapbundle": "^5.0.12",
|
"@push.rocks/tapbundle": "^5.0.15",
|
||||||
"@types/node": "^20.4.5"
|
"@types/node": "^20.5.0"
|
||||||
},
|
},
|
||||||
"files": [
|
"files": [
|
||||||
"ts/**/*",
|
"ts/**/*",
|
||||||
|
1124
pnpm-lock.yaml
generated
1124
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/taskbuffer',
|
name: '@push.rocks/taskbuffer',
|
||||||
version: '3.1.0',
|
version: '3.1.5',
|
||||||
description: 'flexible task management. TypeScript ready!'
|
description: 'flexible task management. TypeScript ready!'
|
||||||
}
|
}
|
||||||
|
@ -2,13 +2,11 @@ import { Task } from './taskbuffer.classes.task.js';
|
|||||||
import * as plugins from './taskbuffer.plugins.js';
|
import * as plugins from './taskbuffer.plugins.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* constains all data for the final coordinator to actually make an informed decision
|
* Contains all data for the final coordinator to make an informed decision.
|
||||||
*/
|
*/
|
||||||
export interface IDistributedTaskRequest {
|
export interface IDistributedTaskRequest {
|
||||||
/**
|
submitterId: string;
|
||||||
* this needs to correlate to the consultationResult
|
requestResponseId: string;
|
||||||
*/
|
|
||||||
submitterRandomId: string;
|
|
||||||
taskName: string;
|
taskName: string;
|
||||||
taskVersion: string;
|
taskVersion: string;
|
||||||
taskExecutionTime: number;
|
taskExecutionTime: number;
|
||||||
@ -18,24 +16,23 @@ export interface IDistributedTaskRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface IDistributedTaskRequestResult {
|
export interface IDistributedTaskRequestResult {
|
||||||
/**
|
submitterId: string;
|
||||||
* this needs to correlate to the decisionInfoBasis
|
requestResponseId: string;
|
||||||
*/
|
|
||||||
submitterRandomId: string;
|
|
||||||
/**
|
|
||||||
* can be used while debugging
|
|
||||||
*/
|
|
||||||
considered: boolean;
|
considered: boolean;
|
||||||
rank: string;
|
rank: number;
|
||||||
reason: string;
|
reason: string;
|
||||||
shouldTrigger: boolean;
|
shouldTrigger: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export abstract class AbstractDistributedCoordinator {
|
export abstract class AbstractDistributedCoordinator {
|
||||||
public abstract fireDistributedTaskRequest(
|
public abstract fireDistributedTaskRequest(
|
||||||
infoBasisArg: IDistributedTaskRequest
|
infoBasis: IDistributedTaskRequest
|
||||||
): Promise<IDistributedTaskRequestResult>;
|
): Promise<IDistributedTaskRequestResult>;
|
||||||
|
|
||||||
public abstract updateDistributedTaskRequest(
|
public abstract updateDistributedTaskRequest(
|
||||||
infoBasisArg: IDistributedTaskRequest
|
infoBasis: IDistributedTaskRequest
|
||||||
): Promise<void>;
|
): Promise<void>;
|
||||||
|
|
||||||
|
public abstract start(): Promise<void>;
|
||||||
|
public abstract stop(): Promise<void>;
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
import * as plugins from './taskbuffer.plugins.js';
|
import * as plugins from './taskbuffer.plugins.js';
|
||||||
import { Task } from './taskbuffer.classes.task.js';
|
import { Task } from './taskbuffer.classes.task.js';
|
||||||
import { AbstractDistributedCoordinator } from './taskbuffer.classes.distributedcoordinator.js';
|
import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult } from './taskbuffer.classes.distributedcoordinator.js';
|
||||||
|
|
||||||
export interface ICronJob {
|
export interface ICronJob {
|
||||||
cronString: string;
|
cronString: string;
|
||||||
taskNameArg: string;
|
taskName: string;
|
||||||
job: any;
|
job: any;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -13,57 +13,37 @@ export interface ITaskManagerConstructorOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export class TaskManager {
|
export class TaskManager {
|
||||||
public randomId = plugins.isounique.uni();
|
public randomId = plugins.smartunique.shortId();
|
||||||
public taskMap = new plugins.lik.ObjectMap<Task>();
|
public taskMap = new plugins.lik.ObjectMap<Task>();
|
||||||
private cronJobManager = new plugins.smarttime.CronManager();
|
private cronJobManager = new plugins.smarttime.CronManager();
|
||||||
|
|
||||||
public options: ITaskManagerConstructorOptions = {
|
public options: ITaskManagerConstructorOptions = {
|
||||||
distributedCoordinator: null,
|
distributedCoordinator: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
constructor(optionsArg: ITaskManagerConstructorOptions = {}) {
|
constructor(options: ITaskManagerConstructorOptions = {}) {
|
||||||
this.options = Object.assign(this.options, optionsArg);
|
this.options = Object.assign(this.options, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public getTaskByName(taskName: string): Task {
|
||||||
* checks if a task is already present
|
return this.taskMap.findSync((task) => task.name === taskName);
|
||||||
* @param taskNameArg
|
|
||||||
*/
|
|
||||||
public getTaskByName(taskNameArg: string): Task {
|
|
||||||
return this.taskMap.findSync((itemArg) => {
|
|
||||||
return itemArg.name === taskNameArg;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public addTask(task: Task): void {
|
||||||
* adds a Task to the TaskManager
|
if (!task.name) {
|
||||||
* @param taskArg
|
throw new Error('Task must have a name to be added to taskManager');
|
||||||
*/
|
|
||||||
public addTask(taskArg: Task): void {
|
|
||||||
if (!taskArg.name) {
|
|
||||||
throw new Error('taskArg needs a name to be added to taskManager');
|
|
||||||
}
|
}
|
||||||
this.taskMap.add(taskArg);
|
this.taskMap.add(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public addAndScheduleTask(task: Task, cronString: string) {
|
||||||
* adds and schedules a task at once
|
this.addTask(task);
|
||||||
* @param taskArg
|
this.scheduleTaskByName(task.name, cronString);
|
||||||
* @param cronStringArg
|
|
||||||
*/
|
|
||||||
public addAndScheduleTask(taskArg: Task, cronStringArg: string) {
|
|
||||||
this.addTask(taskArg);
|
|
||||||
this.scheduleTaskByName(taskArg.name, cronStringArg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public async triggerTaskByName(taskName: string): Promise<any> {
|
||||||
* triggers a task in the TaskManagerByName
|
const taskToTrigger = this.getTaskByName(taskName);
|
||||||
* @param taskNameArg
|
|
||||||
*/
|
|
||||||
public triggerTaskByName(taskNameArg: string): Promise<any> {
|
|
||||||
const taskToTrigger = this.getTaskByName(taskNameArg);
|
|
||||||
if (!taskToTrigger) {
|
if (!taskToTrigger) {
|
||||||
throw new Error(`There is no task with the name of ${taskNameArg}`);
|
throw new Error(`No task with the name ${taskName} found.`);
|
||||||
}
|
}
|
||||||
return taskToTrigger.trigger();
|
return taskToTrigger.trigger();
|
||||||
}
|
}
|
||||||
@ -72,96 +52,88 @@ export class TaskManager {
|
|||||||
return task.trigger();
|
return task.trigger();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public scheduleTaskByName(taskName: string, cronString: string) {
|
||||||
* schedules the task by name
|
const taskToSchedule = this.getTaskByName(taskName);
|
||||||
* @param taskNameArg
|
if (!taskToSchedule) {
|
||||||
*/
|
throw new Error(`No task with the name ${taskName} found.`);
|
||||||
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) {
|
}
|
||||||
const taskToSchedule = this.getTaskByName(taskNameArg);
|
this.handleTaskScheduling(taskToSchedule, cronString);
|
||||||
const cronJob = this.cronJobManager.addCronjob(
|
|
||||||
cronStringArg,
|
|
||||||
async (triggerTimeArg: number) => {
|
|
||||||
console.log(`taskbuffer schedule triggered task >>${taskToSchedule.name}<<`);
|
|
||||||
console.log(
|
|
||||||
`task >>${taskToSchedule.name}<< is ${
|
|
||||||
taskToSchedule.buffered
|
|
||||||
? `buffered with max ${taskToSchedule.bufferMax} buffered calls`
|
|
||||||
: `unbuffered`
|
|
||||||
}`
|
|
||||||
);
|
|
||||||
if (this.options.distributedCoordinator) {
|
|
||||||
console.log(`Found a distrubuted coordinator, performing distributed consultation.`);
|
|
||||||
const announcementResult =
|
|
||||||
await this.options.distributedCoordinator.fireDistributedTaskRequest({
|
|
||||||
submitterRandomId: this.randomId,
|
|
||||||
status: 'requesting',
|
|
||||||
taskExecutionParallel: 1,
|
|
||||||
taskExecutionTime: triggerTimeArg,
|
|
||||||
taskExecutionTimeout: taskToSchedule.timeout,
|
|
||||||
taskName: taskToSchedule.name,
|
|
||||||
taskVersion: taskToSchedule.version,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!announcementResult.shouldTrigger) {
|
|
||||||
console.log('distributed coordinator result: NOT EXECUTING');
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
console.log('distributed coordinator result: CHOSEN AND EXECUTING');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await taskToSchedule.trigger();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
taskToSchedule.cronJob = cronJob;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private handleTaskScheduling(task: Task, cronString: string) {
|
||||||
* deschedules a task by name
|
const cronJob = this.cronJobManager.addCronjob(
|
||||||
* @param taskNameArg
|
cronString,
|
||||||
*/
|
async (triggerTime: number) => {
|
||||||
public descheduleTaskByName(taskNameArg: string) {
|
this.logTaskState(task);
|
||||||
const taskToDeSchedule = this.getTaskByName(taskNameArg);
|
if (this.options.distributedCoordinator) {
|
||||||
if (taskToDeSchedule.cronJob) {
|
const announcementResult = await this.performDistributedConsultation(task, triggerTime);
|
||||||
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
|
if (!announcementResult.shouldTrigger) {
|
||||||
taskToDeSchedule.cronJob = null;
|
console.log('Distributed coordinator result: NOT EXECUTING');
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await task.trigger();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
task.cronJob = cronJob;
|
||||||
|
}
|
||||||
|
|
||||||
|
private logTaskState(task: Task) {
|
||||||
|
console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
|
||||||
|
const bufferState = task.buffered
|
||||||
|
? `buffered with max ${task.bufferMax} buffered calls`
|
||||||
|
: `unbuffered`;
|
||||||
|
console.log(`Task >>${task.name}<< is ${bufferState}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async performDistributedConsultation(task: Task, triggerTime: number): Promise<IDistributedTaskRequestResult> {
|
||||||
|
console.log('Found a distributed coordinator, performing consultation.');
|
||||||
|
|
||||||
|
return this.options.distributedCoordinator.fireDistributedTaskRequest({
|
||||||
|
submitterId: this.randomId,
|
||||||
|
requestResponseId: plugins.smartunique.shortId(),
|
||||||
|
status: 'requesting',
|
||||||
|
taskExecutionParallel: 1,
|
||||||
|
taskExecutionTime: triggerTime,
|
||||||
|
taskExecutionTimeout: task.timeout,
|
||||||
|
taskName: task.name,
|
||||||
|
taskVersion: task.version,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public descheduleTaskByName(taskName: string) {
|
||||||
|
const task = this.getTaskByName(taskName);
|
||||||
|
if (task && task.cronJob) {
|
||||||
|
this.cronJobManager.removeCronjob(task.cronJob);
|
||||||
|
task.cronJob = null;
|
||||||
}
|
}
|
||||||
if (this.cronJobManager.cronjobs.isEmpty) {
|
if (this.cronJobManager.cronjobs.isEmpty) {
|
||||||
this.cronJobManager.stop();
|
this.cronJobManager.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* deschedules a task
|
|
||||||
* @param task
|
|
||||||
*/
|
|
||||||
public async descheduleTask(task: Task) {
|
public async descheduleTask(task: Task) {
|
||||||
await this.descheduleTaskByName(task.name);
|
await this.descheduleTaskByName(task.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public getScheduleForTaskName(taskName: string): string | null {
|
||||||
* returns the schedule of a specific task
|
const task = this.getTaskByName(taskName);
|
||||||
* @param taskNameArg
|
return task && task.cronJob ? task.cronJob.cronExpression : null;
|
||||||
*/
|
|
||||||
public getScheduleForTaskName(taskNameArg: string): string | null {
|
|
||||||
const task = this.getTaskByName(taskNameArg);
|
|
||||||
if (!task || !task.cronJob) {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
return task.cronJob.cronExpression;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
public async start() {
|
||||||
/**
|
if (this.options.distributedCoordinator) {
|
||||||
* starts the taskmanager
|
await this.options.distributedCoordinator.start();
|
||||||
*/
|
}
|
||||||
public start() {
|
|
||||||
this.cronJobManager.start();
|
this.cronJobManager.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public async stop() {
|
||||||
* stops the taskmanager
|
|
||||||
*/
|
|
||||||
public stop() {
|
|
||||||
this.cronJobManager.stop();
|
this.cronJobManager.stop();
|
||||||
|
if (this.options.distributedCoordinator) {
|
||||||
|
await this.options.distributedCoordinator.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import * as isounique from '@push.rocks/isounique';
|
|
||||||
import * as lik from '@push.rocks/lik';
|
import * as lik from '@push.rocks/lik';
|
||||||
import * as smartlog from '@push.rocks/smartlog';
|
import * as smartlog from '@push.rocks/smartlog';
|
||||||
import * as smartpromise from '@push.rocks/smartpromise';
|
import * as smartpromise from '@push.rocks/smartpromise';
|
||||||
import * as smartdelay from '@push.rocks/smartdelay';
|
import * as smartdelay from '@push.rocks/smartdelay';
|
||||||
import * as smartrx from '@push.rocks/smartrx';
|
import * as smartrx from '@push.rocks/smartrx';
|
||||||
import * as smarttime from '@push.rocks/smarttime';
|
import * as smarttime from '@push.rocks/smarttime';
|
||||||
|
import * as smartunique from '@push.rocks/smartunique';
|
||||||
|
|
||||||
export { isounique, lik, smartlog, smartpromise, smartdelay, smartrx, smarttime };
|
export { lik, smartlog, smartpromise, smartdelay, smartrx, smarttime, smartunique };
|
||||||
|
Reference in New Issue
Block a user