BREAKING CHANGE(taskbuffer): Change default Task error handling: trigger() now rejects when taskFunction throws; add catchErrors option (default false) to preserve previous swallow behavior; track errors (lastError, errorCount) and expose them in metadata; improve error propagation and logging across runners, chains, parallels and debounced tasks; add tests and documentation for new behavior.

This commit is contained in:
2026-01-25 23:29:00 +00:00
parent 905ca97b6a
commit 248383aab1
16 changed files with 575 additions and 63 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '3.5.0',
version: '4.0.0',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -1,4 +1,5 @@
import { Task } from './taskbuffer.classes.task.js';
import { logger } from './taskbuffer.logging.js';
export class BufferRunner {
public task: Task;
@@ -24,9 +25,19 @@ export class BufferRunner {
private async _run(x: any) {
this.task.running = true;
while (this.bufferCounter > 0) {
const result = await Task.runTask(this.task, { x: x });
this.bufferCounter--;
this.task.cycleCounter.informOfCycle(result);
try {
const result = await Task.runTask(this.task, { x: x });
this.bufferCounter--;
this.task.cycleCounter.informOfCycle(result);
} catch (err) {
logger.log('error', `BufferRunner: task "${this.task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
this.bufferCounter--;
if (this.task.catchErrors) {
this.task.cycleCounter.informOfCycle(undefined);
} else {
this.task.cycleCounter.informOfCycleError(err instanceof Error ? err : new Error(String(err)));
}
}
}
this.task.running = false;
}

View File

@@ -33,4 +33,16 @@ export class CycleCounter {
});
this.cycleObjectArray = newCycleObjectArray;
}
public informOfCycleError(err: Error) {
const newCycleObjectArray: ICycleObject[] = [];
this.cycleObjectArray.forEach((cycleObjectArg) => {
cycleObjectArg.cycleCounter--;
if (cycleObjectArg.cycleCounter <= 0) {
cycleObjectArg.deferred.reject(err);
} else {
newCycleObjectArray.push(cycleObjectArg);
}
});
this.cycleObjectArray = newCycleObjectArray;
}
}

View File

@@ -87,24 +87,37 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
taskToRun.running = true;
taskToRun.runCount++;
taskToRun.lastRun = new Date();
// Reset steps at the beginning of task execution
// Reset steps and error state at the beginning of task execution
taskToRun.resetSteps();
taskToRun.lastError = undefined;
done.promise.then(async () => {
taskToRun.running = false;
// Complete all steps when task finishes
taskToRun.completeAllSteps();
done.promise
.then(async () => {
taskToRun.running = false;
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Complete all steps when task finishes
taskToRun.completeAllSteps();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
})
.catch((err) => {
taskToRun.running = false;
// Resolve finished so blocking dependants don't hang
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
});
});
const options = {
...{ x: undefined, touchedTasksArray: [] },
@@ -133,7 +146,13 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
try {
return await taskToRun.taskFunction(x, taskToRun.setupValue);
} catch (e) {
console.log(e);
taskToRun.lastError = e instanceof Error ? e : new Error(String(e));
taskToRun.errorCount++;
logger.log('error', `Task "${taskToRun.name || 'unnamed'}" failed: ${taskToRun.lastError.message}`);
if (taskToRun.catchErrors) {
return undefined;
}
throw e;
}
})
.then((x) => {
@@ -155,10 +174,18 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
done.resolve(x);
})
.catch((err) => {
console.log(err);
done.reject(err);
});
localDeferred.resolve();
return await done.promise;
try {
return await done.promise;
} catch (err) {
if (taskToRun.catchErrors) {
return undefined;
}
throw err;
}
};
public name: string;
@@ -187,10 +214,19 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
public lastRun?: Date;
public runCount: number = 0;
// Error handling
public catchErrors: boolean = false;
public lastError?: Error;
public errorCount: number = 0;
public get idle() {
return !this.running;
}
public clearError(): void {
this.lastError = undefined;
}
public taskSetup: ITaskSetupFunction<T>;
public setupValue: T;
@@ -210,6 +246,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
name?: string;
taskSetup?: ITaskSetupFunction<T>;
steps?: TSteps;
catchErrors?: boolean;
}) {
this.taskFunction = optionsArg.taskFunction;
this.preTask = optionsArg.preTask;
@@ -219,6 +256,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name;
this.taskSetup = optionsArg.taskSetup;
this.catchErrors = optionsArg.catchErrors ?? false;
// Initialize steps if provided
if (optionsArg.steps) {
@@ -306,10 +344,21 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
// Get task metadata
public getMetadata(): ITaskMetadata {
let status: 'idle' | 'running' | 'completed' | 'failed';
if (this.running) {
status = 'running';
} else if (this.lastError) {
status = 'failed';
} else if (this.runCount > 0) {
status = 'completed';
} else {
status = 'idle';
}
return {
name: this.name || 'unnamed',
version: this.version,
status: this.running ? 'running' : 'idle',
status,
steps: this.getStepsMetadata(),
currentStep: this.currentStepName,
currentProgress: this.getProgress(),
@@ -318,6 +367,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
bufferMax: this.bufferMax,
timeout: this.timeout,
cronSchedule: this.cronJob?.cronExpression,
lastError: this.lastError?.message,
errorCount: this.errorCount,
};
}

View File

@@ -27,18 +27,20 @@ export class Taskchain extends Task {
let taskCounter = 0; // counter for iterating async over the taskArray
const iterateTasks = (x: any) => {
if (typeof this.taskArray[taskCounter] !== 'undefined') {
console.log(
this.name + ' running: Task' + this.taskArray[taskCounter].name,
);
logger.log('info', `${this.name} running: Task ${this.taskArray[taskCounter].name}`);
this.taskArray[taskCounter].trigger(x).then((x) => {
logger.log('info', this.taskArray[taskCounter].name);
taskCounter++;
iterateTasks(x);
}).catch((err) => {
const chainError = new Error(
`Taskchain "${this.name}": task "${this.taskArray[taskCounter].name || 'unnamed'}" (index ${taskCounter}) failed: ${err instanceof Error ? err.message : String(err)}`
);
(chainError as any).cause = err;
done.reject(chainError);
});
} else {
console.log(
'Taskchain "' + this.name + '" completed successfully',
);
logger.log('info', `Taskchain "${this.name}" completed successfully`);
done.resolve(x);
}
};
@@ -53,10 +55,15 @@ export class Taskchain extends Task {
addTask(taskArg: Task) {
this.taskArray.push(taskArg);
}
removeTask(taskArg: Task) {
// TODO:
removeTask(taskArg: Task): boolean {
const index = this.taskArray.indexOf(taskArg);
if (index === -1) {
return false;
}
this.taskArray.splice(index, 1);
return true;
}
shiftTask() {
// TODO:
shiftTask(): Task | undefined {
return this.taskArray.shift();
}
}

View File

@@ -1,6 +1,7 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task, type ITaskFunction } from './taskbuffer.classes.task.js';
import { logger } from './taskbuffer.logging.js';
export class TaskDebounced<T = unknown> extends Task {
private _debouncedTaskFunction: ITaskFunction;
@@ -22,8 +23,17 @@ export class TaskDebounced<T = unknown> extends Task {
.pipe(
plugins.smartrx.rxjs.ops.debounceTime(optionsArg.debounceTimeInMillis),
)
.subscribe((x) => {
this.taskFunction(x);
.subscribe({
next: async (x) => {
try {
await this.taskFunction(x);
} catch (err) {
logger.log('error', `TaskDebounced "${this.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
},
error: (err) => {
logger.log('error', `TaskDebounced "${this.name || 'unnamed'}" observable error: ${err instanceof Error ? err.message : String(err)}`);
},
});
}
}

View File

@@ -5,6 +5,7 @@ import {
type IDistributedTaskRequestResult,
} from './taskbuffer.classes.distributedcoordinator.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
export interface ICronJob {
cronString: string;
@@ -75,31 +76,35 @@ export class TaskManager {
triggerTime,
);
if (!announcementResult.shouldTrigger) {
console.log('Distributed coordinator result: NOT EXECUTING');
logger.log('info', 'Distributed coordinator result: NOT EXECUTING');
return;
} else {
console.log('Distributed coordinator result: CHOSEN AND EXECUTING');
logger.log('info', 'Distributed coordinator result: CHOSEN AND EXECUTING');
}
}
await task.trigger();
try {
await task.trigger();
} catch (err) {
logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
},
);
task.cronJob = cronJob;
}
private logTaskState(task: Task<any, any>) {
console.log(`Taskbuffer schedule triggered task >>${task.name}<<`);
logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
: `unbuffered`;
console.log(`Task >>${task.name}<< is ${bufferState}`);
logger.log('info', `Task >>${task.name}<< is ${bufferState}`);
}
private async performDistributedConsultation(
task: Task<any, any>,
triggerTime: number,
): Promise<IDistributedTaskRequestResult> {
console.log('Found a distributed coordinator, performing consultation.');
logger.log('info', 'Found a distributed coordinator, performing consultation.');
return this.options.distributedCoordinator.fireDistributedTaskRequest({
submitterId: this.randomId,

View File

@@ -13,7 +13,9 @@ export class Taskparallel extends Task {
this.taskArray.forEach(function (taskArg) {
promiseArray.push(taskArg.trigger());
});
Promise.all(promiseArray).then(done.resolve);
Promise.all(promiseArray)
.then((results) => done.resolve(results))
.catch((err) => done.reject(err));
return done.promise;
},
},

View File

@@ -1,13 +1,14 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { logger } from './taskbuffer.logging.js';
export class TaskRunner {
public maxParrallelJobs: number = 1;
public maxParallelJobs: number = 1;
public status: 'stopped' | 'running' = 'stopped';
public runningTasks: plugins.lik.ObjectMap<Task> =
new plugins.lik.ObjectMap<Task>();
public qeuedTasks: Task[] = [];
public queuedTasks: Task[] = [];
constructor() {
this.runningTasks.eventSubject.subscribe(async (eventArg) => {
@@ -16,19 +17,19 @@ export class TaskRunner {
}
/**
* adds a task to the qeue
* adds a task to the queue
*/
public addTask(taskArg: Task) {
this.qeuedTasks.push(taskArg);
this.queuedTasks.push(taskArg);
this.checkExecution();
}
/**
* set amount of parallel tasks
* be careful, you might loose dependability of tasks
* be careful, you might lose dependability of tasks
*/
public setMaxParallelJobs(maxParrallelJobsArg: number) {
this.maxParrallelJobs = maxParrallelJobsArg;
public setMaxParallelJobs(maxParallelJobsArg: number) {
this.maxParallelJobs = maxParallelJobsArg;
}
/**
@@ -39,17 +40,21 @@ export class TaskRunner {
}
/**
* checks wether execution is on point
* checks whether execution is on point
*/
public async checkExecution() {
if (
this.runningTasks.getArray().length < this.maxParrallelJobs &&
this.runningTasks.getArray().length < this.maxParallelJobs &&
this.status === 'running' &&
this.qeuedTasks.length > 0
this.queuedTasks.length > 0
) {
const nextJob = this.qeuedTasks.shift();
const nextJob = this.queuedTasks.shift();
this.runningTasks.add(nextJob);
await nextJob.trigger();
try {
await nextJob.trigger();
} catch (err) {
logger.log('error', `TaskRunner: task "${nextJob.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
this.runningTasks.remove(nextJob);
this.checkExecution();
}

View File

@@ -15,6 +15,8 @@ export interface ITaskMetadata {
buffered?: boolean;
bufferMax?: number;
timeout?: number;
lastError?: string;
errorCount?: number;
}
export interface ITaskExecutionReport {