feat(task): add task labels and push-based task events

This commit is contained in:
2026-01-26 00:39:30 +00:00
parent 9a3a3e3eab
commit 6030fb2805
9 changed files with 360 additions and 9 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.0.0',
version: '4.1.0',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -12,7 +12,7 @@ export { TaskStep } from './taskbuffer.classes.taskstep.js';
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
// Metadata interfaces
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo } from './taskbuffer.interfaces.js';
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js';
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
export { distributedCoordination };

View File

@@ -2,7 +2,7 @@ import * as plugins from './taskbuffer.plugins.js';
import { BufferRunner } from './taskbuffer.classes.bufferrunner.js';
import { CycleCounter } from './taskbuffer.classes.cyclecounter.js';
import { TaskStep, type ITaskStep } from './taskbuffer.classes.taskstep.js';
import type { ITaskMetadata } from './taskbuffer.interfaces.js';
import type { ITaskMetadata, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
@@ -91,6 +91,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
// Reset steps and error state at the beginning of task execution
taskToRun.resetSteps();
taskToRun.lastError = undefined;
taskToRun.emitEvent('started');
done.promise
.then(async () => {
@@ -98,6 +99,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
// Complete all steps when task finishes
taskToRun.completeAllSteps();
taskToRun.emitEvent(taskToRun.lastError ? 'failed' : 'completed');
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
@@ -109,6 +111,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
})
.catch((err) => {
taskToRun.running = false;
taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) });
// Resolve finished so blocking dependants don't hang
taskToRun.resolveFinished();
@@ -218,6 +221,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
public catchErrors: boolean = false;
public lastError?: Error;
public errorCount: number = 0;
public labels: Record<string, string> = {};
public readonly eventSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
public get idle() {
return !this.running;
@@ -227,6 +232,38 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.lastError = undefined;
}
public setLabel(key: string, value: string): void {
this.labels[key] = value;
}
public getLabel(key: string): string | undefined {
return this.labels[key];
}
public removeLabel(key: string): boolean {
if (key in this.labels) {
delete this.labels[key];
return true;
}
return false;
}
public hasLabel(key: string, value?: string): boolean {
if (value !== undefined) {
return this.labels[key] === value;
}
return key in this.labels;
}
private emitEvent(type: TTaskEventType, extra?: Partial<ITaskEvent>): void {
this.eventSubject.next({
type,
task: this.getMetadata(),
timestamp: Date.now(),
...extra,
});
}
public taskSetup: ITaskSetupFunction<T>;
public setupValue: T;
@@ -247,6 +284,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
taskSetup?: ITaskSetupFunction<T>;
steps?: TSteps;
catchErrors?: boolean;
labels?: Record<string, string>;
}) {
this.taskFunction = optionsArg.taskFunction;
this.preTask = optionsArg.preTask;
@@ -257,6 +295,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.name = optionsArg.name;
this.taskSetup = optionsArg.taskSetup;
this.catchErrors = optionsArg.catchErrors ?? false;
this.labels = optionsArg.labels ? { ...optionsArg.labels } : {};
// Initialize steps if provided
if (optionsArg.steps) {
@@ -309,8 +348,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
if (step) {
step.start();
this.currentStepName = stepName as string;
// Emit event for frontend updates (could be enhanced with event emitter)
this.emitEvent('step', { stepName: stepName as string });
if (this.name) {
logger.log('info', `Task ${this.name}: Starting step "${stepName}" - ${step.description}`);
}
@@ -369,6 +408,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
cronSchedule: this.cronJob?.cronExpression,
lastError: this.lastError?.message,
errorCount: this.errorCount,
labels: { ...this.labels },
};
}

View File

@@ -4,7 +4,7 @@ import {
AbstractDistributedCoordinator,
type IDistributedTaskRequestResult,
} from './taskbuffer.classes.distributedcoordinator.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo } from './taskbuffer.interfaces.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
export interface ICronJob {
@@ -20,6 +20,8 @@ export interface ITaskManagerConstructorOptions {
export class TaskManager {
public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
public readonly taskSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
private taskSubscriptions = new Map<Task<any, any>, plugins.smartrx.rxjs.Subscription>();
private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null,
@@ -38,6 +40,19 @@ export class TaskManager {
throw new Error('Task must have a name to be added to taskManager');
}
this.taskMap.add(task);
const subscription = task.eventSubject.subscribe((event) => {
this.taskSubject.next(event);
});
this.taskSubscriptions.set(task, subscription);
}
public removeTask(task: Task<any, any>): void {
this.taskMap.remove(task);
const subscription = this.taskSubscriptions.get(task);
if (subscription) {
subscription.unsubscribe();
this.taskSubscriptions.delete(task);
}
}
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
@@ -150,6 +165,10 @@ export class TaskManager {
if (this.options.distributedCoordinator) {
await this.options.distributedCoordinator.stop();
}
for (const [, subscription] of this.taskSubscriptions) {
subscription.unsubscribe();
}
this.taskSubscriptions.clear();
}
// Get metadata for a specific task
@@ -198,6 +217,14 @@ export class TaskManager {
return scheduledRuns;
}
public getTasksByLabel(key: string, value: string): Task<any, any>[] {
return this.taskMap.getArray().filter(task => task.labels[key] === value);
}
public getTasksMetadataByLabel(key: string, value: string): ITaskMetadata[] {
return this.getTasksByLabel(key, value).map(task => task.getMetadata());
}
// Add, execute, and remove a task while collecting metadata
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
task: Task<T, TSteps>,
@@ -236,7 +263,7 @@ export class TaskManager {
};
// Remove task from manager
this.taskMap.remove(task);
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
@@ -260,7 +287,7 @@ export class TaskManager {
};
// Remove task from manager even on error
this.taskMap.remove(task);
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {

View File

@@ -17,6 +17,7 @@ export interface ITaskMetadata {
timeout?: number;
lastError?: string;
errorCount?: number;
labels?: Record<string, string>;
}
export interface ITaskExecutionReport {
@@ -38,4 +39,14 @@ export interface IScheduledTaskInfo {
lastRun?: Date;
steps?: ITaskStep[];
metadata?: ITaskMetadata;
}
export type TTaskEventType = 'started' | 'step' | 'completed' | 'failed';
export interface ITaskEvent {
type: TTaskEventType;
task: ITaskMetadata;
timestamp: number;
stepName?: string; // present when type === 'step'
error?: string; // present when type === 'failed'
}