fix(core): update
This commit is contained in:
@ -23,11 +23,11 @@ export class BufferRunner {
|
||||
}
|
||||
|
||||
private _run(x) {
|
||||
const recursiveBufferRunner = x => {
|
||||
const recursiveBufferRunner = (x) => {
|
||||
if (this.bufferCounter >= 0) {
|
||||
this.running = true;
|
||||
this.task.running = true;
|
||||
Task.runTask(this.task, { x: x }).then(x => {
|
||||
Task.runTask(this.task, { x: x }).then((x) => {
|
||||
this.bufferCounter--;
|
||||
this.task.cycleCounter.informOfCycle(x);
|
||||
recursiveBufferRunner(x);
|
||||
|
@ -16,14 +16,14 @@ export class CycleCounter {
|
||||
const done = plugins.smartpromise.defer();
|
||||
const cycleObject: ICycleObject = {
|
||||
cycleCounter: cycleCountArg,
|
||||
deferred: done
|
||||
deferred: done,
|
||||
};
|
||||
this.cycleObjectArray.push(cycleObject);
|
||||
return done.promise;
|
||||
}
|
||||
public informOfCycle(x) {
|
||||
const newCycleObjectArray: ICycleObject[] = [];
|
||||
this.cycleObjectArray.forEach(cycleObjectArg => {
|
||||
this.cycleObjectArray.forEach((cycleObjectArg) => {
|
||||
cycleObjectArg.cycleCounter--;
|
||||
if (cycleObjectArg.cycleCounter <= 0) {
|
||||
cycleObjectArg.deferred.resolve(x);
|
||||
|
@ -1,2 +0,0 @@
|
||||
import plugins = require('./taskbuffer.plugins');
|
||||
import { Task, ITaskFunction } from './taskbuffer.classes.task';
|
@ -1,8 +1,9 @@
|
||||
import * as plugins from './taskbuffer.plugins';
|
||||
import * as helpers from './taskbuffer.classes.helpers';
|
||||
import { BufferRunner } from './taskbuffer.classes.bufferrunner';
|
||||
import { CycleCounter } from './taskbuffer.classes.cyclecounter';
|
||||
|
||||
import { logger } from './taskbuffer.logging';
|
||||
|
||||
export interface ITaskFunction {
|
||||
(x?: any): PromiseLike<any>;
|
||||
}
|
||||
@ -25,7 +26,7 @@ export class Task {
|
||||
}
|
||||
}
|
||||
|
||||
public static emptyTaskFunction: ITaskFunction = function(x) {
|
||||
public static emptyTaskFunction: ITaskFunction = function (x) {
|
||||
const done = plugins.smartpromise.defer();
|
||||
done.resolve();
|
||||
return done.promise;
|
||||
@ -75,7 +76,7 @@ export class Task {
|
||||
// handle options
|
||||
const options = {
|
||||
...{ x: undefined, touchedTasksArray: [] },
|
||||
...optionsArg
|
||||
...optionsArg,
|
||||
};
|
||||
const x = options.x;
|
||||
const touchedTasksArray: Task[] = options.touchedTasksArray;
|
||||
@ -96,7 +97,7 @@ export class Task {
|
||||
return done2.promise;
|
||||
}
|
||||
})
|
||||
.then(async x => {
|
||||
.then(async (x) => {
|
||||
// lets run the main task
|
||||
try {
|
||||
return await taskToRun.taskFunction(x);
|
||||
@ -104,7 +105,7 @@ export class Task {
|
||||
console.log(e);
|
||||
}
|
||||
})
|
||||
.then(x => {
|
||||
.then((x) => {
|
||||
if (taskToRun.afterTask && !Task.isTaskTouched(taskToRun.afterTask, touchedTasksArray)) {
|
||||
return Task.runTask(taskToRun.afterTask, { x: x, touchedTasksArray: touchedTasksArray });
|
||||
} else {
|
||||
@ -113,10 +114,10 @@ export class Task {
|
||||
return done2.promise;
|
||||
}
|
||||
})
|
||||
.then(x => {
|
||||
.then((x) => {
|
||||
done.resolve(x);
|
||||
})
|
||||
.catch(err => {
|
||||
.catch((err) => {
|
||||
console.log(err);
|
||||
});
|
||||
localDeferred.resolve();
|
||||
@ -128,6 +129,7 @@ export class Task {
|
||||
public name: string;
|
||||
public taskFunction: ITaskFunction;
|
||||
public buffered: boolean;
|
||||
public cronJob: plugins.smarttime.CronJob;
|
||||
|
||||
public bufferMax: number;
|
||||
public execDelay: number;
|
||||
@ -218,7 +220,7 @@ export class Task {
|
||||
if (stateArg === 'locked') {
|
||||
this._state = 'locked';
|
||||
} else {
|
||||
plugins.smartlog.defaultLogger.log('error', 'state type ' + stateArg + ' could not be set');
|
||||
logger.log('error', `state type ${stateArg} could not be set`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
|
||||
import * as plugins from './taskbuffer.plugins';
|
||||
import { Task } from './taskbuffer.classes.task';
|
||||
import helpers = require('./taskbuffer.classes.helpers');
|
||||
import { logger } from './taskbuffer.logging';
|
||||
|
||||
export class Taskchain extends Task {
|
||||
taskArray: Task[];
|
||||
@ -14,22 +14,22 @@ export class Taskchain extends Task {
|
||||
buffered?: boolean;
|
||||
bufferMax?: number;
|
||||
}) {
|
||||
let options = {
|
||||
const options = {
|
||||
...{
|
||||
name: 'unnamed Taskchain',
|
||||
log: false
|
||||
log: false,
|
||||
},
|
||||
...optionsArg,
|
||||
...{
|
||||
taskFunction: (x: any) => {
|
||||
// this is the function that gets executed when TaskChain is triggered
|
||||
let done = plugins.smartpromise.defer(); // this is the starting Deferred object
|
||||
const done = plugins.smartpromise.defer(); // this is the starting Deferred object
|
||||
let taskCounter = 0; // counter for iterating async over the taskArray
|
||||
let iterateTasks = x => {
|
||||
const iterateTasks = (x) => {
|
||||
if (typeof this.taskArray[taskCounter] !== 'undefined') {
|
||||
console.log(this.name + ' running: Task' + this.taskArray[taskCounter].name);
|
||||
this.taskArray[taskCounter].trigger(x).then(x => {
|
||||
plugins.smartlog.defaultLogger.log('info', this.taskArray[taskCounter].name);
|
||||
this.taskArray[taskCounter].trigger(x).then((x) => {
|
||||
logger.log('info', this.taskArray[taskCounter].name);
|
||||
taskCounter++;
|
||||
iterateTasks(x);
|
||||
});
|
||||
@ -40,8 +40,8 @@ export class Taskchain extends Task {
|
||||
};
|
||||
iterateTasks(x);
|
||||
return done.promise;
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
};
|
||||
super(options);
|
||||
this.taskArray = optionsArg.taskArray;
|
||||
|
@ -1,5 +1,6 @@
|
||||
import * as plugins from './taskbuffer.plugins';
|
||||
import { Task } from './taskbuffer.classes.task';
|
||||
import { threadId } from 'worker_threads';
|
||||
|
||||
export interface ICronJob {
|
||||
cronString: string;
|
||||
@ -8,8 +9,8 @@ export interface ICronJob {
|
||||
}
|
||||
|
||||
export class TaskManager {
|
||||
public taskMap = new plugins.lik.Objectmap<Task>();
|
||||
private cronJobMap = new plugins.lik.Objectmap<ICronJob>();
|
||||
public taskMap = new plugins.lik.ObjectMap<Task>();
|
||||
private cronJobManager = new plugins.smarttime.CronManager();
|
||||
|
||||
constructor() {
|
||||
// nothing here
|
||||
@ -20,7 +21,7 @@ export class TaskManager {
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public getTaskByName(taskNameArg): Task {
|
||||
return this.taskMap.find(itemArg => {
|
||||
return this.taskMap.find((itemArg) => {
|
||||
return itemArg.name === taskNameArg;
|
||||
});
|
||||
}
|
||||
@ -69,18 +70,11 @@ export class TaskManager {
|
||||
*/
|
||||
public scheduleTaskByName(taskNameArg: string, cronStringArg: string) {
|
||||
const taskToSchedule = this.getTaskByName(taskNameArg);
|
||||
const job = new plugins.cron.CronJob({
|
||||
cronTime: cronStringArg,
|
||||
onTick: () => {
|
||||
this.triggerTaskByName(taskNameArg);
|
||||
},
|
||||
start: true
|
||||
});
|
||||
this.cronJobMap.add({
|
||||
taskNameArg: taskToSchedule.name,
|
||||
cronString: cronStringArg,
|
||||
job
|
||||
const cronJob = this.cronJobManager.addCronjob(cronStringArg, async () => {
|
||||
await taskToSchedule.triggerBuffered();
|
||||
});
|
||||
taskToSchedule.cronJob = cronJob;
|
||||
this.cronJobManager.start();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -88,10 +82,14 @@ export class TaskManager {
|
||||
* @param taskNameArg
|
||||
*/
|
||||
public descheduleTaskByName(taskNameArg: string) {
|
||||
const descheduledCron = this.cronJobMap.findOneAndRemove(itemArg => {
|
||||
return itemArg.taskNameArg === taskNameArg;
|
||||
});
|
||||
descheduledCron.job.stop();
|
||||
const taskToDeSchedule = this.getTaskByName(taskNameArg);
|
||||
if (taskToDeSchedule.cronJob) {
|
||||
this.cronJobManager.removeCronjob(taskToDeSchedule.cronJob);
|
||||
taskToDeSchedule.cronJob = null;
|
||||
}
|
||||
if (this.cronJobManager.cronjobs.isEmpty) {
|
||||
this.cronJobManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -15,7 +15,7 @@ export class TaskOnce extends Task {
|
||||
this.hasTriggered = true;
|
||||
await optionsArg.taskFunction();
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
import * as plugins from './taskbuffer.plugins';
|
||||
import * as helpers from './taskbuffer.classes.helpers';
|
||||
import { Task } from './taskbuffer.classes.task';
|
||||
|
||||
export class Taskparallel extends Task {
|
||||
@ -11,13 +10,13 @@ export class Taskparallel extends Task {
|
||||
taskFunction: () => {
|
||||
const done = plugins.smartpromise.defer();
|
||||
const promiseArray: Promise<any>[] = []; // stores promises of all tasks, since they run in parallel
|
||||
this.taskArray.forEach(function(taskArg) {
|
||||
this.taskArray.forEach(function (taskArg) {
|
||||
promiseArray.push(taskArg.trigger());
|
||||
});
|
||||
Promise.all(promiseArray).then(done.resolve);
|
||||
return done.promise;
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
};
|
||||
super(options);
|
||||
this.taskArray = optionsArg.taskArray;
|
||||
|
@ -5,11 +5,11 @@ import { Task } from './taskbuffer.classes.task';
|
||||
export class TaskRunner {
|
||||
public maxParrallelJobs: number = 1;
|
||||
public status: 'stopped' | 'running' = 'stopped';
|
||||
public runningTasks: plugins.lik.Objectmap<Task> = new plugins.lik.Objectmap<Task>();
|
||||
public runningTasks: plugins.lik.ObjectMap<Task> = new plugins.lik.ObjectMap<Task>();
|
||||
public qeuedTasks: Task[] = [];
|
||||
|
||||
constructor() {
|
||||
this.runningTasks.eventSubject.subscribe(async eventArg => {
|
||||
this.runningTasks.eventSubject.subscribe(async (eventArg) => {
|
||||
this.checkExecution();
|
||||
});
|
||||
}
|
||||
|
3
ts/taskbuffer.logging.ts
Normal file
3
ts/taskbuffer.logging.ts
Normal file
@ -0,0 +1,3 @@
|
||||
import * as plugins from './taskbuffer.plugins';
|
||||
|
||||
export const logger = new plugins.smartlog.ConsoleLog();
|
@ -1,8 +1,7 @@
|
||||
import * as smartlog from '@pushrocks/smartlog';
|
||||
import cron from 'cron';
|
||||
import * as lik from '@pushrocks/lik';
|
||||
import * as rxjs from 'rxjs';
|
||||
import * as smartpromise from '@pushrocks/smartpromise';
|
||||
import * as smartdelay from '@pushrocks/smartdelay';
|
||||
import * as smarttime from '@pushrocks/smarttime';
|
||||
|
||||
export { smartlog, cron, lik, rxjs, smartpromise, smartdelay };
|
||||
export { smartlog, lik, smartpromise, smartdelay, smarttime };
|
||||
|
Reference in New Issue
Block a user