diff --git a/changelog.md b/changelog.md index 4f36a62..8d98c57 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-01-26 - 4.1.0 - feat(task) +add task labels and push-based task events + +- Introduce Task labels: Task accepts labels in constructor and exposes setLabel/getLabel/removeLabel/hasLabel; labels are included (shallow copy) in getMetadata(). +- Add push-based events: Task.eventSubject (rxjs Subject) emits 'started','step','completed','failed' with timestamp; 'step' includes stepName and 'failed' includes error string. +- Task now emits events during lifecycle: emits 'started' at run start, 'step' on notifyStep, and 'completed' or 'failed' when finished or errored. getMetadata() now includes labels. +- TaskManager aggregates task events into taskSubject, subscribes on addTask and unsubscribes on removeTask/stop; includes helper methods getTasksByLabel and getTasksMetadataByLabel. +- Public API updated: exported ITaskEvent and TTaskEventType in ts/index.ts and interfaces updated (labels in metadata, new event types). +- Tests and docs: added test/test.12.labels-and-events.ts and updated readme.hints.md to document labels and push-based events. + ## 2026-01-25 - 4.0.0 - BREAKING CHANGE(taskbuffer) Change default Task error handling: trigger() now rejects when taskFunction throws; add catchErrors option (default false) to preserve previous swallow behavior; track errors (lastError, errorCount) and expose them in metadata; improve error propagation and logging across runners, chains, parallels and debounced tasks; add tests and documentation for new behavior. diff --git a/readme.hints.md b/readme.hints.md index b9e6906..a384ffd 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -20,6 +20,20 @@ - **BufferRunner**: When `catchErrors: false`, buffered task errors now reject the trigger promise (via `CycleCounter.informOfCycleError`) instead of silently resolving with `undefined` - **TaskChain stubs completed**: `removeTask(task)` returns `boolean`, `shiftTask()` returns `Task | undefined` +## Task Labels (v4.1.0+) +- `Task` constructor accepts optional `labels?: Record` +- Helper methods: `setLabel(key, value)`, `getLabel(key)`, `removeLabel(key)`, `hasLabel(key, value?)` +- `getMetadata()` includes `labels` (shallow copy) +- `TaskManager.getTasksByLabel(key, value)` returns matching `Task[]` +- `TaskManager.getTasksMetadataByLabel(key, value)` returns matching `ITaskMetadata[]` + +## Push-Based Events (v4.1.0+) +- `Task.eventSubject`: rxjs `Subject` emitting `'started'`, `'step'`, `'completed'`, `'failed'` events +- `TaskManager.taskSubject`: aggregated `Subject` from all added tasks +- `TaskManager.removeTask(task)` unsubscribes and removes from map +- `TaskManager.stop()` cleans up all event subscriptions +- Exported types: `ITaskEvent`, `TTaskEventType` + ## Project Structure - Source in `ts/`, web components in `ts_web/` - Tests in `test/` - naming: `*.node.ts`, `*.browser.ts`, `*.both.ts` diff --git a/test/test.12.labels-and-events.ts b/test/test.12.labels-and-events.ts new file mode 100644 index 0000000..2aef6f5 --- /dev/null +++ b/test/test.12.labels-and-events.ts @@ -0,0 +1,249 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as taskbuffer from '../ts/index.js'; +import type { ITaskEvent } from '../ts/index.js'; + +// ─── Labels ─── + +tap.test('should accept labels in constructor', async () => { + const task = new taskbuffer.Task({ + name: 'labelled-task', + taskFunction: async () => 'ok', + labels: { userId: 'u1', tenantId: 't1' }, + }); + expect(task.labels).toEqual({ userId: 'u1', tenantId: 't1' }); +}); + +tap.test('should default labels to empty object', async () => { + const task = new taskbuffer.Task({ + name: 'no-labels-task', + taskFunction: async () => 'ok', + }); + expect(task.labels).toEqual({}); +}); + +tap.test('setLabel / getLabel / removeLabel / hasLabel should work', async () => { + const task = new taskbuffer.Task({ + name: 'label-helpers-task', + taskFunction: async () => 'ok', + }); + + task.setLabel('env', 'prod'); + expect(task.getLabel('env')).toEqual('prod'); + expect(task.hasLabel('env')).toBeTrue(); + expect(task.hasLabel('env', 'prod')).toBeTrue(); + expect(task.hasLabel('env', 'dev')).toBeFalse(); + expect(task.hasLabel('missing')).toBeFalse(); + + const removed = task.removeLabel('env'); + expect(removed).toBeTrue(); + expect(task.getLabel('env')).toBeUndefined(); + + const removedAgain = task.removeLabel('env'); + expect(removedAgain).toBeFalse(); +}); + +tap.test('getMetadata() should include labels', async () => { + const task = new taskbuffer.Task({ + name: 'metadata-labels-task', + taskFunction: async () => 'ok', + labels: { region: 'eu' }, + }); + + const meta = task.getMetadata(); + expect(meta.labels).toEqual({ region: 'eu' }); + + // Returned labels should be a copy + meta.labels!['region'] = 'us'; + expect(task.labels['region']).toEqual('eu'); +}); + +tap.test('TaskManager.getTasksByLabel should filter correctly', async () => { + const manager = new taskbuffer.TaskManager(); + const t1 = new taskbuffer.Task({ + name: 'label-filter-1', + taskFunction: async () => 'ok', + labels: { userId: 'alice' }, + }); + const t2 = new taskbuffer.Task({ + name: 'label-filter-2', + taskFunction: async () => 'ok', + labels: { userId: 'bob' }, + }); + const t3 = new taskbuffer.Task({ + name: 'label-filter-3', + taskFunction: async () => 'ok', + labels: { userId: 'alice' }, + }); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + const aliceTasks = manager.getTasksByLabel('userId', 'alice'); + expect(aliceTasks.length).toEqual(2); + expect(aliceTasks.map((t) => t.name).sort()).toEqual(['label-filter-1', 'label-filter-3']); + + const bobMeta = manager.getTasksMetadataByLabel('userId', 'bob'); + expect(bobMeta.length).toEqual(1); + expect(bobMeta[0].name).toEqual('label-filter-2'); + + await manager.stop(); +}); + +// ─── Events ─── + +tap.test('should emit started + completed on successful trigger', async () => { + const events: ITaskEvent[] = []; + const task = new taskbuffer.Task({ + name: 'event-success-task', + taskFunction: async () => 'ok', + }); + + task.eventSubject.subscribe((e) => events.push(e)); + await task.trigger(); + + expect(events.length).toEqual(2); + expect(events[0].type).toEqual('started'); + expect(events[1].type).toEqual('completed'); + expect(events[0].task.name).toEqual('event-success-task'); + expect(typeof events[0].timestamp).toEqual('number'); +}); + +tap.test('should emit step events on notifyStep', async () => { + const steps = [ + { name: 'build', description: 'Build artifacts', percentage: 50 }, + { name: 'deploy', description: 'Deploy to prod', percentage: 50 }, + ] as const; + + const events: ITaskEvent[] = []; + const task = new taskbuffer.Task({ + name: 'step-event-task', + steps, + taskFunction: async () => { + task.notifyStep('build'); + task.notifyStep('deploy'); + return 'done'; + }, + }); + + task.eventSubject.subscribe((e) => events.push(e)); + await task.trigger(); + + const stepEvents = events.filter((e) => e.type === 'step'); + expect(stepEvents.length).toEqual(2); + expect(stepEvents[0].stepName).toEqual('build'); + expect(stepEvents[1].stepName).toEqual('deploy'); +}); + +tap.test('should emit started + failed on error', async () => { + const events: ITaskEvent[] = []; + const task = new taskbuffer.Task({ + name: 'event-fail-task', + taskFunction: async () => { + throw new Error('boom'); + }, + }); + + task.eventSubject.subscribe((e) => events.push(e)); + + try { + await task.trigger(); + } catch { + // expected + } + + expect(events.length).toEqual(2); + expect(events[0].type).toEqual('started'); + expect(events[1].type).toEqual('failed'); + expect(events[1].error).toEqual('boom'); +}); + +tap.test('should emit failed via done.then path when catchErrors is true', async () => { + const events: ITaskEvent[] = []; + const task = new taskbuffer.Task({ + name: 'event-catch-fail-task', + catchErrors: true, + taskFunction: async () => { + throw new Error('swallowed'); + }, + }); + + task.eventSubject.subscribe((e) => events.push(e)); + await task.trigger(); + + const types = events.map((e) => e.type); + expect(types).toContain('started'); + expect(types).toContain('failed'); +}); + +tap.test('TaskManager.taskSubject should aggregate events from added tasks', async () => { + const manager = new taskbuffer.TaskManager(); + const events: ITaskEvent[] = []; + + const t1 = new taskbuffer.Task({ + name: 'agg-task-1', + taskFunction: async () => 'a', + }); + const t2 = new taskbuffer.Task({ + name: 'agg-task-2', + taskFunction: async () => 'b', + }); + + manager.addTask(t1); + manager.addTask(t2); + + manager.taskSubject.subscribe((e) => events.push(e)); + + await t1.trigger(); + await t2.trigger(); + + const names = [...new Set(events.map((e) => e.task.name))]; + expect(names.sort()).toEqual(['agg-task-1', 'agg-task-2']); + expect(events.filter((e) => e.type === 'started').length).toEqual(2); + expect(events.filter((e) => e.type === 'completed').length).toEqual(2); + + await manager.stop(); +}); + +tap.test('events should stop after removeTask', async () => { + const manager = new taskbuffer.TaskManager(); + const events: ITaskEvent[] = []; + + const task = new taskbuffer.Task({ + name: 'removable-event-task', + taskFunction: async () => 'ok', + }); + + manager.addTask(task); + manager.taskSubject.subscribe((e) => events.push(e)); + + await task.trigger(); + const countBefore = events.length; + expect(countBefore).toBeGreaterThan(0); + + manager.removeTask(task); + + // Trigger again — events should NOT appear on manager subject + await task.trigger(); + expect(events.length).toEqual(countBefore); + + await manager.stop(); +}); + +tap.test('event metadata snapshots should include correct labels', async () => { + const events: ITaskEvent[] = []; + const task = new taskbuffer.Task({ + name: 'labelled-event-task', + taskFunction: async () => 'ok', + labels: { team: 'platform' }, + }); + + task.eventSubject.subscribe((e) => events.push(e)); + await task.trigger(); + + for (const e of events) { + expect(e.task.labels).toEqual({ team: 'platform' }); + } +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 9d0be6b..943ad35 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '4.0.0', + version: '4.1.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' } diff --git a/ts/index.ts b/ts/index.ts index 7fcaa1a..2778a43 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -12,7 +12,7 @@ export { TaskStep } from './taskbuffer.classes.taskstep.js'; export type { ITaskStep } from './taskbuffer.classes.taskstep.js'; // Metadata interfaces -export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo } from './taskbuffer.interfaces.js'; +export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js'; import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js'; export { distributedCoordination }; diff --git a/ts/taskbuffer.classes.task.ts b/ts/taskbuffer.classes.task.ts index 0a6f5af..9da97c2 100644 --- a/ts/taskbuffer.classes.task.ts +++ b/ts/taskbuffer.classes.task.ts @@ -2,7 +2,7 @@ import * as plugins from './taskbuffer.plugins.js'; import { BufferRunner } from './taskbuffer.classes.bufferrunner.js'; import { CycleCounter } from './taskbuffer.classes.cyclecounter.js'; import { TaskStep, type ITaskStep } from './taskbuffer.classes.taskstep.js'; -import type { ITaskMetadata } from './taskbuffer.interfaces.js'; +import type { ITaskMetadata, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js'; import { logger } from './taskbuffer.logging.js'; @@ -91,6 +91,7 @@ export class Task { @@ -98,6 +99,7 @@ export class Task { taskToRun.running = false; + taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) }); // Resolve finished so blocking dependants don't hang taskToRun.resolveFinished(); @@ -218,6 +221,8 @@ export class Task = {}; + public readonly eventSubject = new plugins.smartrx.rxjs.Subject(); public get idle() { return !this.running; @@ -227,6 +232,38 @@ export class Task): void { + this.eventSubject.next({ + type, + task: this.getMetadata(), + timestamp: Date.now(), + ...extra, + }); + } + public taskSetup: ITaskSetupFunction; public setupValue: T; @@ -247,6 +284,7 @@ export class Task; steps?: TSteps; catchErrors?: boolean; + labels?: Record; }) { this.taskFunction = optionsArg.taskFunction; this.preTask = optionsArg.preTask; @@ -257,6 +295,7 @@ export class Task>(); + public readonly taskSubject = new plugins.smartrx.rxjs.Subject(); + private taskSubscriptions = new Map, plugins.smartrx.rxjs.Subscription>(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, @@ -38,6 +40,19 @@ export class TaskManager { throw new Error('Task must have a name to be added to taskManager'); } this.taskMap.add(task); + const subscription = task.eventSubject.subscribe((event) => { + this.taskSubject.next(event); + }); + this.taskSubscriptions.set(task, subscription); + } + + public removeTask(task: Task): void { + this.taskMap.remove(task); + const subscription = this.taskSubscriptions.get(task); + if (subscription) { + subscription.unsubscribe(); + this.taskSubscriptions.delete(task); + } } public addAndScheduleTask(task: Task, cronString: string) { @@ -150,6 +165,10 @@ export class TaskManager { if (this.options.distributedCoordinator) { await this.options.distributedCoordinator.stop(); } + for (const [, subscription] of this.taskSubscriptions) { + subscription.unsubscribe(); + } + this.taskSubscriptions.clear(); } // Get metadata for a specific task @@ -198,6 +217,14 @@ export class TaskManager { return scheduledRuns; } + public getTasksByLabel(key: string, value: string): Task[] { + return this.taskMap.getArray().filter(task => task.labels[key] === value); + } + + public getTasksMetadataByLabel(key: string, value: string): ITaskMetadata[] { + return this.getTasksByLabel(key, value).map(task => task.getMetadata()); + } + // Add, execute, and remove a task while collecting metadata public async addExecuteRemoveTask>( task: Task, @@ -236,7 +263,7 @@ export class TaskManager { }; // Remove task from manager - this.taskMap.remove(task); + this.removeTask(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { @@ -260,7 +287,7 @@ export class TaskManager { }; // Remove task from manager even on error - this.taskMap.remove(task); + this.removeTask(task); // Deschedule if it was scheduled if (options?.schedule && task.name) { diff --git a/ts/taskbuffer.interfaces.ts b/ts/taskbuffer.interfaces.ts index f8f1542..26ab5d1 100644 --- a/ts/taskbuffer.interfaces.ts +++ b/ts/taskbuffer.interfaces.ts @@ -17,6 +17,7 @@ export interface ITaskMetadata { timeout?: number; lastError?: string; errorCount?: number; + labels?: Record; } export interface ITaskExecutionReport { @@ -38,4 +39,14 @@ export interface IScheduledTaskInfo { lastRun?: Date; steps?: ITaskStep[]; metadata?: ITaskMetadata; +} + +export type TTaskEventType = 'started' | 'step' | 'completed' | 'failed'; + +export interface ITaskEvent { + type: TTaskEventType; + task: ITaskMetadata; + timestamp: number; + stepName?: string; // present when type === 'step' + error?: string; // present when type === 'failed' } \ No newline at end of file diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 9d0be6b..943ad35 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '4.0.0', + version: '4.1.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' }