From d3e8ff1a11987051ff710e2e896127f0c3b6aab9 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 15 Feb 2026 12:20:01 +0000 Subject: [PATCH] BREAKING CHANGE(taskbuffer): Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests. --- changelog.md | 10 + readme.hints.md | 36 +- readme.md | 281 ++++++++++--- test/test.11.errorhandling.ts | 33 +- test/test.13.constraints.ts | 391 +++++++++++++++++++ test/test.taskrunner.ts | 34 -- ts/00_commitinfo_data.ts | 2 +- ts/index.ts | 4 +- ts/taskbuffer.classes.bufferrunner.ts | 2 +- ts/taskbuffer.classes.cyclecounter.ts | 2 +- ts/taskbuffer.classes.task.ts | 72 +--- ts/taskbuffer.classes.taskconstraintgroup.ts | 80 ++++ ts/taskbuffer.classes.taskmanager.ts | 188 +++++++-- ts/taskbuffer.classes.taskrunner.ts | 69 ---- ts/taskbuffer.interfaces.ts | 14 + ts_web/00_commitinfo_data.ts | 2 +- 16 files changed, 924 insertions(+), 296 deletions(-) create mode 100644 test/test.13.constraints.ts delete mode 100644 test/test.taskrunner.ts create mode 100644 ts/taskbuffer.classes.taskconstraintgroup.ts delete mode 100644 ts/taskbuffer.classes.taskrunner.ts diff --git a/changelog.md b/changelog.md index fd5a099..5655f09 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-02-15 - 5.0.0 - BREAKING CHANGE(taskbuffer) +Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests. + +- Add TaskConstraintGroup class with per-key maxConcurrent, cooldownMs, and helper methods (canRun, acquireSlot, releaseSlot, getCooldownRemaining, getRunningCount, reset). +- Task generic signature extended to Task and a new typed data property (data) with default {}. +- TaskManager now supports addConstraintGroup/removeConstraintGroup, triggerTaskConstrained, queues blocked tasks, drains queue with cooldown timers, and routes triggerTask/triggerTaskByName through the constraint system. +- Removed TaskRunner, plus Task APIs: blockingTasks, execDelay, finished promise and associated behavior have been removed (breaking changes). +- Exports and interfaces updated: TaskConstraintGroup and ITaskConstraintGroupOptions added; TaskRunner removed from public API. +- Updated README and added comprehensive tests for constraint behavior; adjusted other tests to remove TaskRunner usage and reflect new APIs. + ## 2026-02-15 - 4.2.1 - fix(deps) bump @push.rocks/smartlog and @types/node; update dependency list version and license link in docs diff --git a/readme.hints.md b/readme.hints.md index da4f9b9..acbbd93 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -1,20 +1,42 @@ # Taskbuffer Hints +## Task Constraint System (v5.0.0+) — Breaking Changes +- **`TaskRunner` removed** — replaced by `TaskManager` + `TaskConstraintGroup` +- **`blockingTasks` removed** from `Task` — use `TaskConstraintGroup` with `maxConcurrent: 1` +- **`execDelay` removed** from `Task` — use `TaskConstraintGroup` with `cooldownMs` +- **`finished` promise removed** from `Task` — no longer needed +- **`Task` generic signature**: `Task` (3rd param added for typed data) + +### Task.data +- `Task` constructor accepts optional `data?: TData` (defaults to `{}`) +- Typed data bag accessible as `task.data` + +### TaskConstraintGroup +- `new TaskConstraintGroup({ name, constraintKeyForTask, maxConcurrent?, cooldownMs? })` +- `constraintKeyForTask(task)` returns a string key (constraint applies) or `null` (skip) +- `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key +- `cooldownMs` (default: `0`) — minimum ms gap between completions per key +- Methods: `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRunningCount(key)`, `reset()` + +### TaskManager Constraint Integration +- `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)` +- `triggerTaskByName()`, `triggerTask()`, `addExecuteRemoveTask()`, cron callbacks all route through `triggerTaskConstrained()` +- `triggerTaskConstrained(task, input?)` — evaluates constraints, queues if blocked, drains after completion +- Cooldown-blocked entries auto-drain via timer + +### Exported from index.ts +- `TaskConstraintGroup` class +- `ITaskConstraintGroupOptions` type + ## Error Handling (v3.6.0+) - `Task` now has `catchErrors` constructor option (default: `false`) - Default behavior: `trigger()` rejects when taskFunction throws (breaking change from pre-3.6) - Set `catchErrors: true` to swallow errors (old behavior) - returns `undefined` on error - Error state tracked via `lastError?: Error`, `errorCount: number`, `clearError()` - `getMetadata()` status uses all four values: `'idle'` | `'running'` | `'completed'` | `'failed'` -- All peripheral classes (Taskchain, Taskparallel, TaskRunner, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling +- All peripheral classes (Taskchain, Taskparallel, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling - `console.log` calls replaced with `logger.log()` throughout -## Breaking API Rename (TaskRunner) -- `maxParrallelJobs` → `maxParallelJobs` -- `qeuedTasks` → `queuedTasks` -- JSDoc typos fixed: "qeue" → "queue", "wether" → "whether", "loose" → "lose" -- The `setMaxParallelJobs()` parameter also renamed from `maxParrallelJobsArg` to `maxParallelJobsArg` - ## Error Context Improvements - **TaskChain**: Errors now wrap the original with context: chain name, failing task name, and task index. Original error preserved via `.cause` - **BufferRunner**: When `catchErrors: false`, buffered task errors now reject the trigger promise (via `CycleCounter.informOfCycleError`) instead of silently resolving with `undefined` diff --git a/readme.md b/readme.md index 09df812..a864e1e 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,6 @@ # @push.rocks/taskbuffer 🚀 -> **Modern TypeScript task orchestration with smart buffering, scheduling, labels, and real-time event streaming** +> **Modern TypeScript task orchestration with constraint-based concurrency, smart buffering, scheduling, labels, and real-time event streaming** [![npm version](https://img.shields.io/npm/v/@push.rocks/taskbuffer.svg)](https://www.npmjs.com/package/@push.rocks/taskbuffer) [![TypeScript](https://img.shields.io/badge/TypeScript-5.x-blue.svg)](https://www.typescriptlang.org/) @@ -13,6 +13,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community ## 🌟 Features - **🎯 Type-Safe Task Management** — Full TypeScript support with generics and type inference +- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, and cooldown enforcement via `TaskConstraintGroup` - **📊 Real-Time Progress Tracking** — Step-based progress with percentage weights - **⚡ Smart Buffering** — Intelligent request debouncing and batching - **⏰ Cron Scheduling** — Schedule tasks with cron expressions @@ -49,6 +50,24 @@ const result = await greetTask.trigger('World'); console.log(result); // "Hello, World!" ``` +### Task with Typed Data 📦 + +Every task can carry a typed data bag — perfect for constraint matching, routing, and metadata: + +```typescript +const task = new Task({ + name: 'update-dns', + data: { domain: 'example.com', priority: 1 }, + taskFunction: async () => { + // task.data is fully typed here + console.log(`Updating DNS for ${task.data.domain}`); + }, +}); + +task.data.domain; // string — fully typed +task.data.priority; // number — fully typed +``` + ### Task with Steps & Progress 📊 ```typescript @@ -84,6 +103,132 @@ console.log(deployTask.getStepsMetadata()); // Step details with status > **Note:** `notifyStep()` is fully type-safe — TypeScript only accepts step names you declared in the `steps` array when you use `as const`. +## 🔒 Task Constraints — Concurrency, Mutual Exclusion & Cooldowns + +`TaskConstraintGroup` is the unified mechanism for controlling how tasks run relative to each other. It replaces older patterns like task runners, blocking tasks, and execution delays with a single, composable, key-based constraint system. + +### Per-Key Mutual Exclusion + +Ensure only one task runs at a time for a given key (e.g. per domain, per tenant, per resource): + +```typescript +import { Task, TaskManager, TaskConstraintGroup } from '@push.rocks/taskbuffer'; + +const manager = new TaskManager(); + +// Only one DNS update per domain at a time +const domainMutex = new TaskConstraintGroup<{ domain: string }>({ + name: 'domain-mutex', + maxConcurrent: 1, + constraintKeyForTask: (task) => task.data.domain, +}); + +manager.addConstraintGroup(domainMutex); + +const task1 = new Task({ + name: 'update-a.com', + data: { domain: 'a.com' }, + taskFunction: async () => { /* update DNS for a.com */ }, +}); + +const task2 = new Task({ + name: 'update-a.com-2', + data: { domain: 'a.com' }, + taskFunction: async () => { /* another update for a.com */ }, +}); + +manager.addTask(task1); +manager.addTask(task2); + +// task2 waits until task1 finishes (same domain key) +await Promise.all([ + manager.triggerTask(task1), + manager.triggerTask(task2), +]); +``` + +### Group Concurrency Limits + +Cap how many tasks can run concurrently across a group: + +```typescript +// Max 3 DNS updaters running globally at once +const dnsLimit = new TaskConstraintGroup<{ group: string }>({ + name: 'dns-concurrency', + maxConcurrent: 3, + constraintKeyForTask: (task) => + task.data.group === 'dns' ? 'dns' : null, // null = skip constraint +}); + +manager.addConstraintGroup(dnsLimit); +``` + +### Cooldowns (Rate Limiting) + +Enforce a minimum time gap between consecutive executions for the same key: + +```typescript +// No more than one API call per domain every 11 seconds +const rateLimiter = new TaskConstraintGroup<{ domain: string }>({ + name: 'api-rate-limit', + maxConcurrent: 1, + cooldownMs: 11000, + constraintKeyForTask: (task) => task.data.domain, +}); + +manager.addConstraintGroup(rateLimiter); +``` + +### Global Concurrency Cap + +Limit total concurrent tasks system-wide: + +```typescript +const globalCap = new TaskConstraintGroup({ + name: 'global-cap', + maxConcurrent: 10, + constraintKeyForTask: () => 'all', // same key = shared limit +}); + +manager.addConstraintGroup(globalCap); +``` + +### Composing Multiple Constraints + +Multiple constraint groups stack — a task only runs when **all** applicable constraints allow it: + +```typescript +manager.addConstraintGroup(globalCap); // max 10 globally +manager.addConstraintGroup(domainMutex); // max 1 per domain +manager.addConstraintGroup(rateLimiter); // 11s cooldown per domain + +// A task must satisfy ALL three constraints before it starts +await manager.triggerTask(dnsTask); +``` + +### Selective Constraints + +Return `null` from `constraintKeyForTask` to exempt a task from a constraint group: + +```typescript +const constraint = new TaskConstraintGroup<{ priority: string }>({ + name: 'low-priority-limit', + maxConcurrent: 2, + constraintKeyForTask: (task) => + task.data.priority === 'low' ? 'low-priority' : null, // high priority tasks skip this constraint +}); +``` + +### How It Works + +When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager: + +1. Evaluates all registered constraint groups against the task +2. If no constraints apply (all matchers return `null`) → runs immediately +3. If all applicable constraints have capacity → acquires slots and runs +4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained +5. Cooldown-blocked tasks auto-retry after the shortest remaining cooldown expires + ## 🎯 Core Concepts ### Task Buffering — Intelligent Request Management @@ -95,7 +240,6 @@ const apiTask = new Task({ name: 'APIRequest', buffered: true, bufferMax: 5, // Maximum 5 concurrent executions - execDelay: 100, // Minimum 100ms between executions taskFunction: async (endpoint) => { return await fetch(endpoint).then((r) => r.json()); }, @@ -156,9 +300,9 @@ console.log(`Saved ${savedCount} items`); Taskchain also supports dynamic mutation: ```typescript -pipeline.addTask(newTask); // Append to chain -pipeline.removeTask(oldTask); // Remove by reference (returns boolean) -pipeline.shiftTask(); // Remove & return first task +pipeline.addTask(newTask); // Append to chain +pipeline.removeTask(oldTask); // Remove by reference (returns boolean) +pipeline.shiftTask(); // Remove & return first task ``` Error context is rich — a chain failure includes the chain name, failing task name, task index, and preserves the original error as `.cause`. @@ -220,27 +364,6 @@ await initTask.trigger(); // No-op console.log(initTask.hasTriggered); // true ``` -### TaskRunner — Managed Queue with Concurrency Control - -Process a queue of tasks with a configurable parallelism limit: - -```typescript -import { TaskRunner } from '@push.rocks/taskbuffer'; - -const runner = new TaskRunner(); -runner.setMaxParallelJobs(3); // Run up to 3 tasks concurrently - -await runner.start(); - -runner.addTask(taskA); -runner.addTask(taskB); -runner.addTask(taskC); -runner.addTask(taskD); // Queued until a slot opens - -// When done: -await runner.stop(); -``` - ## 🏷️ Labels — Multi-Tenant Task Filtering Attach arbitrary key-value labels to any task for filtering, grouping, or multi-tenant isolation: @@ -411,6 +534,10 @@ manager.addTask(deployTask); manager.addAndScheduleTask(backupTask, '0 2 * * *'); // Daily at 2 AM manager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes +// Register constraint groups +manager.addConstraintGroup(globalCap); +manager.addConstraintGroup(perDomainMutex); + // Query metadata const meta = manager.getTaskMetadata('Deploy'); console.log(meta); @@ -433,7 +560,7 @@ const allMeta = manager.getAllTasksMetadata(); const scheduled = manager.getScheduledTasks(); const nextRuns = manager.getNextScheduledRuns(5); -// Trigger by name +// Trigger by name (routes through constraints) await manager.triggerTaskByName('Deploy'); // One-shot: add, execute, collect report, remove @@ -462,6 +589,12 @@ manager.removeTask(task); // Removes from map and unsubscribes event forwarding manager.descheduleTaskByName('Deploy'); // Remove cron schedule only ``` +### Remove Constraint Groups + +```typescript +manager.removeConstraintGroup('domain-mutex'); // By name +``` + ## 🎨 Web Component Dashboard Visualize your tasks in real-time with the included Lit-based web component: @@ -570,32 +703,6 @@ await task.trigger('SELECT * FROM users'); // Setup runs here await task.trigger('SELECT * FROM orders'); // Setup skipped, pool reused ``` -### Blocking Tasks - -Make one task wait for another to finish before executing: - -```typescript -const initTask = new Task({ - name: 'Init', - taskFunction: async () => { - await initializeSystem(); - }, -}); - -const workerTask = new Task({ - name: 'Worker', - taskFunction: async () => { - await doWork(); - }, -}); - -workerTask.blockingTasks.push(initTask); - -// Triggering worker will automatically wait for init to complete -initTask.trigger(); -workerTask.trigger(); // Waits until initTask.finished resolves -``` - ### Database Migration Pipeline ```typescript @@ -616,15 +723,24 @@ try { ### Multi-Tenant SaaS Monitoring -Combine labels + events for a real-time multi-tenant dashboard: +Combine labels + events + constraints for a real-time multi-tenant system: ```typescript const manager = new TaskManager(); +// Per-tenant concurrency limit +const tenantLimit = new TaskConstraintGroup<{ tenantId: string }>({ + name: 'tenant-concurrency', + maxConcurrent: 2, + constraintKeyForTask: (task) => task.data.tenantId, +}); +manager.addConstraintGroup(tenantLimit); + // Create tenant-scoped tasks function createTenantTask(tenantId: string, taskName: string, fn: () => Promise) { - const task = new Task({ + const task = new Task({ name: `${tenantId}:${taskName}`, + data: { tenantId }, labels: { tenantId }, taskFunction: fn, }); @@ -653,15 +769,31 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | Class | Description | | --- | --- | -| `Task` | Core task unit with optional step tracking, labels, and event streaming | -| `TaskManager` | Centralized orchestrator with scheduling, label queries, and aggregated events | +| `Task` | Core task unit with typed data, optional step tracking, labels, and event streaming | +| `TaskManager` | Centralized orchestrator with constraint groups, scheduling, label queries, and aggregated events | +| `TaskConstraintGroup` | Concurrency, mutual exclusion, and cooldown constraints with key-based grouping | | `Taskchain` | Sequential task executor with data flow between tasks | | `Taskparallel` | Concurrent task executor via `Promise.all()` | | `TaskOnce` | Single-execution guard | | `TaskDebounced` | Debounced task using rxjs | -| `TaskRunner` | Sequential queue with configurable parallelism | | `TaskStep` | Step tracking unit (internal, exposed via metadata) | +### Task Constructor Options + +| Option | Type | Default | Description | +| --- | --- | --- | --- | +| `taskFunction` | `ITaskFunction` | *required* | The async function to execute | +| `name` | `string` | — | Task identifier (required for TaskManager) | +| `data` | `TData` | `{}` | Typed data bag for constraint matching and routing | +| `steps` | `ReadonlyArray<{name, description, percentage}>` | — | Step definitions for progress tracking | +| `buffered` | `boolean` | — | Enable request buffering | +| `bufferMax` | `number` | — | Max buffered calls | +| `preTask` | `Task \| () => Task` | — | Task to run before | +| `afterTask` | `Task \| () => Task` | — | Task to run after | +| `taskSetup` | `() => Promise` | — | One-time setup function | +| `catchErrors` | `boolean` | `false` | Swallow errors instead of rejecting | +| `labels` | `Record` | `{}` | Initial labels | + ### Task Methods | Method | Returns | Description | @@ -682,6 +814,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | Property | Type | Description | | --- | --- | --- | | `name` | `string` | Task identifier | +| `data` | `TData` | Typed data bag | | `running` | `boolean` | Whether the task is currently executing | | `idle` | `boolean` | Inverse of `running` | | `labels` | `Record` | Attached labels | @@ -690,7 +823,27 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | `errorCount` | `number` | Total error count across all runs | | `runCount` | `number` | Total execution count | | `lastRun` | `Date \| undefined` | Timestamp of last execution | -| `blockingTasks` | `Task[]` | Tasks that must finish before this one starts | + +### TaskConstraintGroup Constructor Options + +| Option | Type | Default | Description | +| --- | --- | --- | --- | +| `name` | `string` | *required* | Constraint group identifier | +| `constraintKeyForTask` | `(task) => string \| null` | *required* | Returns key for grouping, or `null` to skip | +| `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key | +| `cooldownMs` | `number` | `0` | Minimum ms between completions per key | + +### TaskConstraintGroup Methods + +| Method | Returns | Description | +| --- | --- | --- | +| `getConstraintKey(task)` | `string \| null` | Get the constraint key for a task | +| `canRun(key)` | `boolean` | Check if a slot is available | +| `acquireSlot(key)` | `void` | Claim a running slot | +| `releaseSlot(key)` | `void` | Release a slot and record completion time | +| `getCooldownRemaining(key)` | `number` | Milliseconds until cooldown expires | +| `getRunningCount(key)` | `number` | Current running count for key | +| `reset()` | `void` | Clear all state | ### TaskManager Methods @@ -699,7 +852,11 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | `addTask(task)` | `void` | Register a task (wires event forwarding) | | `removeTask(task)` | `void` | Remove task and unsubscribe events | | `getTaskByName(name)` | `Task \| undefined` | Look up by name | -| `triggerTaskByName(name)` | `Promise` | Trigger by name | +| `triggerTaskByName(name)` | `Promise` | Trigger by name (routes through constraints) | +| `triggerTask(task)` | `Promise` | Trigger directly (routes through constraints) | +| `triggerTaskConstrained(task, input?)` | `Promise` | Core constraint evaluation entry point | +| `addConstraintGroup(group)` | `void` | Register a constraint group | +| `removeConstraintGroup(name)` | `void` | Remove a constraint group by name | | `addAndScheduleTask(task, cron)` | `void` | Register + schedule | | `scheduleTaskByName(name, cron)` | `void` | Schedule existing task | | `descheduleTaskByName(name)` | `void` | Remove schedule | @@ -719,6 +876,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | --- | --- | --- | | `taskSubject` | `Subject` | Aggregated events from all added tasks | | `taskMap` | `ObjectMap` | Internal task registry | +| `constraintGroups` | `TaskConstraintGroup[]` | Registered constraint groups | ### Exported Types @@ -731,13 +889,14 @@ import type { TTaskEventType, ITaskStep, ITaskFunction, + ITaskConstraintGroupOptions, StepNames, } from '@push.rocks/taskbuffer'; ``` ## License and Legal Information -This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./license.md) file. +This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. diff --git a/test/test.11.errorhandling.ts b/test/test.11.errorhandling.ts index a812537..d1d7931 100644 --- a/test/test.11.errorhandling.ts +++ b/test/test.11.errorhandling.ts @@ -137,38 +137,7 @@ tap.test('should reject Taskparallel when a child task throws', async () => { expect(didReject).toBeTrue(); }); -// Test 7: TaskRunner continues processing after a task error -tap.test('should continue TaskRunner queue after a task error', async () => { - const runner = new taskbuffer.TaskRunner(); - const executionOrder: string[] = []; - - const badTask = new taskbuffer.Task({ - name: 'runner-bad-task', - taskFunction: async () => { - executionOrder.push('bad'); - throw new Error('runner task failure'); - }, - }); - const goodTask = new taskbuffer.Task({ - name: 'runner-good-task', - taskFunction: async () => { - executionOrder.push('good'); - }, - }); - - await runner.start(); - runner.addTask(badTask); - runner.addTask(goodTask); - - // Wait for both tasks to be processed - await smartdelay.delayFor(500); - await runner.stop(); - - expect(executionOrder).toContain('bad'); - expect(executionOrder).toContain('good'); -}); - -// Test 8: BufferRunner handles errors without hanging +// Test 7: BufferRunner handles errors without hanging tap.test('should handle BufferRunner errors without hanging', async () => { let callCount = 0; const bufferedTask = new taskbuffer.Task({ diff --git a/test/test.13.constraints.ts b/test/test.13.constraints.ts new file mode 100644 index 0000000..a22232c --- /dev/null +++ b/test/test.13.constraints.ts @@ -0,0 +1,391 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as taskbuffer from '../ts/index.js'; +import * as smartdelay from '@push.rocks/smartdelay'; + +// Test 1: Task data property — typed data accessible +tap.test('should have typed data property on task', async () => { + const task = new taskbuffer.Task({ + name: 'data-task', + data: { domain: 'example.com', priority: 1 }, + taskFunction: async () => {}, + }); + + expect(task.data.domain).toEqual('example.com'); + expect(task.data.priority).toEqual(1); +}); + +// Test 2: Task data defaults to empty object +tap.test('should default data to empty object when not provided', async () => { + const task = new taskbuffer.Task({ + name: 'no-data-task', + taskFunction: async () => {}, + }); + + expect(task.data).toBeTruthy(); + expect(typeof task.data).toEqual('object'); +}); + +// Test 3: No-constraint passthrough — behavior unchanged +tap.test('should run tasks directly when no constraints are configured', async () => { + const manager = new taskbuffer.TaskManager(); + let executed = false; + + const task = new taskbuffer.Task({ + name: 'passthrough-task', + taskFunction: async () => { + executed = true; + return 'done'; + }, + }); + + manager.addTask(task); + const result = await manager.triggerTaskByName('passthrough-task'); + expect(executed).toBeTrue(); + expect(result).toEqual('done'); + await manager.stop(); +}); + +// Test 4: Group concurrency — 3 tasks, max 2 concurrent, 3rd queues +tap.test('should enforce group concurrency limit', async () => { + const manager = new taskbuffer.TaskManager(); + let running = 0; + let maxRunning = 0; + + const constraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({ + name: 'concurrency-test', + maxConcurrent: 2, + constraintKeyForTask: (task) => + task.data.group === 'workers' ? 'workers' : null, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `worker-${id}`, + data: { group: 'workers' }, + taskFunction: async () => { + running++; + maxRunning = Math.max(maxRunning, running); + await smartdelay.delayFor(200); + running--; + }, + }); + + const t1 = makeTask(1); + const t2 = makeTask(2); + const t3 = makeTask(3); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + manager.triggerTaskConstrained(t3), + ]); + + expect(maxRunning).toBeLessThanOrEqual(2); + await manager.stop(); +}); + +// Test 5: Key-based mutual exclusion — same key sequential, different keys parallel +tap.test('should enforce key-based mutual exclusion', async () => { + const manager = new taskbuffer.TaskManager(); + const log: string[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup<{ domain: string }>({ + name: 'domain-mutex', + maxConcurrent: 1, + constraintKeyForTask: (task) => task.data.domain, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (name: string, domain: string, delayMs: number) => + new taskbuffer.Task({ + name, + data: { domain }, + taskFunction: async () => { + log.push(`${name}-start`); + await smartdelay.delayFor(delayMs); + log.push(`${name}-end`); + }, + }); + + const taskA1 = makeTask('a1', 'a.com', 100); + const taskA2 = makeTask('a2', 'a.com', 100); + const taskB1 = makeTask('b1', 'b.com', 100); + + manager.addTask(taskA1); + manager.addTask(taskA2); + manager.addTask(taskB1); + + await Promise.all([ + manager.triggerTaskConstrained(taskA1), + manager.triggerTaskConstrained(taskA2), + manager.triggerTaskConstrained(taskB1), + ]); + + // a1 and a2 should be sequential (same key) + const a1EndIdx = log.indexOf('a1-end'); + const a2StartIdx = log.indexOf('a2-start'); + expect(a2StartIdx).toBeGreaterThanOrEqual(a1EndIdx); + + // b1 should start concurrently with a1 (different key) + const a1StartIdx = log.indexOf('a1-start'); + const b1StartIdx = log.indexOf('b1-start'); + // Both should start before a1 ends + expect(b1StartIdx).toBeLessThan(a1EndIdx); + + await manager.stop(); +}); + +// Test 6: Cooldown enforcement +tap.test('should enforce cooldown between task executions', async () => { + const manager = new taskbuffer.TaskManager(); + const timestamps: number[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({ + name: 'cooldown-test', + maxConcurrent: 1, + cooldownMs: 300, + constraintKeyForTask: (task) => task.data.key, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (name: string) => + new taskbuffer.Task({ + name, + data: { key: 'shared' }, + taskFunction: async () => { + timestamps.push(Date.now()); + }, + }); + + const t1 = makeTask('cool-1'); + const t2 = makeTask('cool-2'); + const t3 = makeTask('cool-3'); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + manager.triggerTaskConstrained(t3), + ]); + + // Each execution should be at least ~300ms apart (with 200ms tolerance) + for (let i = 1; i < timestamps.length; i++) { + const gap = timestamps[i] - timestamps[i - 1]; + expect(gap).toBeGreaterThanOrEqual(100); // 300ms cooldown minus tolerance + } + + await manager.stop(); +}); + +// Test 7: Multiple constraint groups on one task +tap.test('should apply multiple constraint groups to one task', async () => { + const manager = new taskbuffer.TaskManager(); + let running = 0; + let maxRunning = 0; + + const globalConstraint = new taskbuffer.TaskConstraintGroup({ + name: 'global', + maxConcurrent: 3, + constraintKeyForTask: () => 'all', + }); + + const groupConstraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({ + name: 'group', + maxConcurrent: 1, + constraintKeyForTask: (task) => task.data.group, + }); + + manager.addConstraintGroup(globalConstraint); + manager.addConstraintGroup(groupConstraint); + + const makeTask = (name: string, group: string) => + new taskbuffer.Task({ + name, + data: { group }, + taskFunction: async () => { + running++; + maxRunning = Math.max(maxRunning, running); + await smartdelay.delayFor(100); + running--; + }, + }); + + // Same group - should be serialized by group constraint + const t1 = makeTask('multi-1', 'A'); + const t2 = makeTask('multi-2', 'A'); + + manager.addTask(t1); + manager.addTask(t2); + + await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + ]); + + // With group maxConcurrent: 1, only 1 should run at a time + expect(maxRunning).toBeLessThanOrEqual(1); + await manager.stop(); +}); + +// Test 8: Matcher returns null — task runs unconstrained +tap.test('should run task unconstrained when matcher returns null', async () => { + const manager = new taskbuffer.TaskManager(); + + const constraint = new taskbuffer.TaskConstraintGroup<{ skip: boolean }>({ + name: 'selective', + maxConcurrent: 1, + constraintKeyForTask: (task) => (task.data.skip ? null : 'constrained'), + }); + manager.addConstraintGroup(constraint); + + let unconstrained = false; + const task = new taskbuffer.Task({ + name: 'skip-task', + data: { skip: true }, + taskFunction: async () => { + unconstrained = true; + }, + }); + + manager.addTask(task); + await manager.triggerTaskConstrained(task); + expect(unconstrained).toBeTrue(); + await manager.stop(); +}); + +// Test 9: Error handling — failed task releases slot, queue drains +tap.test('should release slot and drain queue when task fails', async () => { + const manager = new taskbuffer.TaskManager(); + const log: string[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({ + name: 'error-drain', + maxConcurrent: 1, + constraintKeyForTask: (task) => task.data.key, + }); + manager.addConstraintGroup(constraint); + + const failTask = new taskbuffer.Task({ + name: 'fail-task', + data: { key: 'shared' }, + catchErrors: true, + taskFunction: async () => { + log.push('fail'); + throw new Error('intentional'); + }, + }); + + const successTask = new taskbuffer.Task({ + name: 'success-task', + data: { key: 'shared' }, + taskFunction: async () => { + log.push('success'); + }, + }); + + manager.addTask(failTask); + manager.addTask(successTask); + + await Promise.all([ + manager.triggerTaskConstrained(failTask), + manager.triggerTaskConstrained(successTask), + ]); + + expect(log).toContain('fail'); + expect(log).toContain('success'); + await manager.stop(); +}); + +// Test 10: TaskManager integration — addConstraintGroup + triggerTaskByName +tap.test('should route triggerTaskByName through constraints', async () => { + const manager = new taskbuffer.TaskManager(); + let running = 0; + let maxRunning = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'manager-integration', + maxConcurrent: 1, + constraintKeyForTask: () => 'all', + }); + manager.addConstraintGroup(constraint); + + const t1 = new taskbuffer.Task({ + name: 'managed-1', + taskFunction: async () => { + running++; + maxRunning = Math.max(maxRunning, running); + await smartdelay.delayFor(100); + running--; + }, + }); + + const t2 = new taskbuffer.Task({ + name: 'managed-2', + taskFunction: async () => { + running++; + maxRunning = Math.max(maxRunning, running); + await smartdelay.delayFor(100); + running--; + }, + }); + + manager.addTask(t1); + manager.addTask(t2); + + await Promise.all([ + manager.triggerTaskByName('managed-1'), + manager.triggerTaskByName('managed-2'), + ]); + + expect(maxRunning).toBeLessThanOrEqual(1); + await manager.stop(); +}); + +// Test 11: removeConstraintGroup removes by name +tap.test('should remove a constraint group by name', async () => { + const manager = new taskbuffer.TaskManager(); + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'removable', + maxConcurrent: 1, + constraintKeyForTask: () => 'all', + }); + + manager.addConstraintGroup(constraint); + expect(manager.constraintGroups.length).toEqual(1); + + manager.removeConstraintGroup('removable'); + expect(manager.constraintGroups.length).toEqual(0); + await manager.stop(); +}); + +// Test 12: TaskConstraintGroup reset clears state +tap.test('should reset constraint group state', async () => { + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'resettable', + maxConcurrent: 2, + cooldownMs: 1000, + constraintKeyForTask: () => 'key', + }); + + // Simulate usage + constraint.acquireSlot('key'); + expect(constraint.getRunningCount('key')).toEqual(1); + + constraint.releaseSlot('key'); + expect(constraint.getCooldownRemaining('key')).toBeGreaterThan(0); + + constraint.reset(); + expect(constraint.getRunningCount('key')).toEqual(0); + expect(constraint.getCooldownRemaining('key')).toEqual(0); +}); + +export default tap.start(); diff --git a/test/test.taskrunner.ts b/test/test.taskrunner.ts deleted file mode 100644 index d1e8553..0000000 --- a/test/test.taskrunner.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { tap, expect } from '@git.zone/tstest/tapbundle'; - -import * as taskbuffer from '../ts/index.js'; - -let testTaskRunner: taskbuffer.TaskRunner; - -tap.test('should create a valid taskrunner', async () => { - testTaskRunner = new taskbuffer.TaskRunner(); - await testTaskRunner.start(); -}); - -tap.test('should execute task when its scheduled', async (tools) => { - const done = tools.defer(); - testTaskRunner.addTask( - new taskbuffer.Task({ - taskFunction: async () => { - console.log('hi'); - }, - }), - ); - - testTaskRunner.addTask( - new taskbuffer.Task({ - taskFunction: async () => { - console.log('there'); - done.resolve(); - }, - }), - ); - - await done.promise; -}); - -tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 945d300..56ca4f4 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '4.2.1', + version: '5.0.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' } diff --git a/ts/index.ts b/ts/index.ts index 2778a43..26060ea 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -4,15 +4,15 @@ export { Taskchain } from './taskbuffer.classes.taskchain.js'; export { Taskparallel } from './taskbuffer.classes.taskparallel.js'; export { TaskManager } from './taskbuffer.classes.taskmanager.js'; export { TaskOnce } from './taskbuffer.classes.taskonce.js'; -export { TaskRunner } from './taskbuffer.classes.taskrunner.js'; export { TaskDebounced } from './taskbuffer.classes.taskdebounced.js'; +export { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js'; // Task step system export { TaskStep } from './taskbuffer.classes.taskstep.js'; export type { ITaskStep } from './taskbuffer.classes.taskstep.js'; // Metadata interfaces -export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js'; +export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js'; import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js'; export { distributedCoordination }; diff --git a/ts/taskbuffer.classes.bufferrunner.ts b/ts/taskbuffer.classes.bufferrunner.ts index 58e0c3c..6b18c8c 100644 --- a/ts/taskbuffer.classes.bufferrunner.ts +++ b/ts/taskbuffer.classes.bufferrunner.ts @@ -6,7 +6,7 @@ export class BufferRunner { // initialize by default public bufferCounter: number = 0; - constructor(taskArg: Task) { + constructor(taskArg: Task) { this.task = taskArg; } diff --git a/ts/taskbuffer.classes.cyclecounter.ts b/ts/taskbuffer.classes.cyclecounter.ts index e8e0ecd..7116d35 100644 --- a/ts/taskbuffer.classes.cyclecounter.ts +++ b/ts/taskbuffer.classes.cyclecounter.ts @@ -9,7 +9,7 @@ export interface ICycleObject { export class CycleCounter { public task: Task; public cycleObjectArray: ICycleObject[] = []; - constructor(taskArg: Task) { + constructor(taskArg: Task) { this.task = taskArg; } public getPromiseForCycle(cycleCountArg: number) { diff --git a/ts/taskbuffer.classes.task.ts b/ts/taskbuffer.classes.task.ts index 9da97c2..6217349 100644 --- a/ts/taskbuffer.classes.task.ts +++ b/ts/taskbuffer.classes.task.ts @@ -19,18 +19,18 @@ export type TPreOrAfterTaskFunction = () => Task; // Type helper to extract step names from array export type StepNames = T extends ReadonlyArray<{ name: infer N }> ? N : never; -export class Task = []> { +export class Task = [], TData extends Record = Record> { public static extractTask = []>( - preOrAfterTaskArg: Task | TPreOrAfterTaskFunction, - ): Task { + preOrAfterTaskArg: Task | TPreOrAfterTaskFunction, + ): Task { switch (true) { case !preOrAfterTaskArg: return null; case preOrAfterTaskArg instanceof Task: - return preOrAfterTaskArg as Task; + return preOrAfterTaskArg as Task; case typeof preOrAfterTaskArg === 'function': const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction; - return taskFunction() as unknown as Task; + return taskFunction() as unknown as Task; default: return null; } @@ -42,7 +42,7 @@ export class Task): boolean => { + public static isTask = (taskArg: Task): boolean => { if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') { return true; } else { @@ -51,8 +51,8 @@ export class Task = []>( - taskArg: Task | TPreOrAfterTaskFunction, - touchedTasksArray: Task[], + taskArg: Task | TPreOrAfterTaskFunction, + touchedTasksArray: Task[], ): boolean { const taskToCheck = Task.extractTask(taskArg); let result = false; @@ -65,25 +65,16 @@ export class Task = []>( - taskArg: Task | TPreOrAfterTaskFunction, - optionsArg: { x?: any; touchedTasksArray?: Task[] }, + taskArg: Task | TPreOrAfterTaskFunction, + optionsArg: { x?: any; touchedTasksArray?: Task[] }, ) => { const taskToRun = Task.extractTask(taskArg); const done = plugins.smartpromise.defer(); - // Wait for all blocking tasks to finish - for (const task of taskToRun.blockingTasks) { - await task.finished; - } - if (!taskToRun.setupValue && taskToRun.taskSetup) { taskToRun.setupValue = await taskToRun.taskSetup(); } - if (taskToRun.execDelay) { - await plugins.smartdelay.delayFor(taskToRun.execDelay); - } - taskToRun.running = true; taskToRun.runCount++; taskToRun.lastRun = new Date(); @@ -100,26 +91,10 @@ export class Task { - taskToRun.resolveFinished = resolve; - }); }) .catch((err) => { taskToRun.running = false; taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) }); - - // Resolve finished so blocking dependants don't hang - taskToRun.resolveFinished(); - - // Create a new finished promise for the next run - taskToRun.finished = new Promise((resolve) => { - taskToRun.resolveFinished = resolve; - }); }); const options = { @@ -127,7 +102,7 @@ export class Task[] = options.touchedTasksArray; + const touchedTasksArray: Task[] = options.touchedTasksArray; touchedTasksArray.push(taskToRun); @@ -198,18 +173,12 @@ export class Task | TPreOrAfterTaskFunction; - public afterTask: Task | TPreOrAfterTaskFunction; + public data: TData; - // Add a list to store the blocking tasks - public blockingTasks: Task[] = []; - - // Add a promise that will resolve when the task has finished - private finished: Promise; - private resolveFinished: () => void; + public preTask: Task | TPreOrAfterTaskFunction; + public afterTask: Task | TPreOrAfterTaskFunction; public running: boolean = false; public bufferRunner = new BufferRunner(this); @@ -275,12 +244,12 @@ export class Task; - preTask?: Task | TPreOrAfterTaskFunction; - afterTask?: Task | TPreOrAfterTaskFunction; + preTask?: Task | TPreOrAfterTaskFunction; + afterTask?: Task | TPreOrAfterTaskFunction; buffered?: boolean; bufferMax?: number; - execDelay?: number; name?: string; + data?: TData; taskSetup?: ITaskSetupFunction; steps?: TSteps; catchErrors?: boolean; @@ -291,8 +260,8 @@ export class Task { - this.resolveFinished = resolve; - }); } public trigger(x?: any): Promise { diff --git a/ts/taskbuffer.classes.taskconstraintgroup.ts b/ts/taskbuffer.classes.taskconstraintgroup.ts new file mode 100644 index 0000000..67bfc8e --- /dev/null +++ b/ts/taskbuffer.classes.taskconstraintgroup.ts @@ -0,0 +1,80 @@ +import type { Task } from './taskbuffer.classes.task.js'; +import type { ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js'; + +export class TaskConstraintGroup = Record> { + public name: string; + public maxConcurrent: number; + public cooldownMs: number; + private constraintKeyForTask: (task: Task) => string | null | undefined; + + private runningCounts = new Map(); + private lastCompletionTimes = new Map(); + + constructor(options: ITaskConstraintGroupOptions) { + this.name = options.name; + this.constraintKeyForTask = options.constraintKeyForTask; + this.maxConcurrent = options.maxConcurrent ?? Infinity; + this.cooldownMs = options.cooldownMs ?? 0; + } + + public getConstraintKey(task: Task): string | null { + const key = this.constraintKeyForTask(task); + return key ?? null; + } + + public canRun(subGroupKey: string): boolean { + const running = this.runningCounts.get(subGroupKey) ?? 0; + if (running >= this.maxConcurrent) { + return false; + } + + if (this.cooldownMs > 0) { + const lastCompletion = this.lastCompletionTimes.get(subGroupKey); + if (lastCompletion !== undefined) { + const elapsed = Date.now() - lastCompletion; + if (elapsed < this.cooldownMs) { + return false; + } + } + } + + return true; + } + + public acquireSlot(subGroupKey: string): void { + const current = this.runningCounts.get(subGroupKey) ?? 0; + this.runningCounts.set(subGroupKey, current + 1); + } + + public releaseSlot(subGroupKey: string): void { + const current = this.runningCounts.get(subGroupKey) ?? 0; + const next = Math.max(0, current - 1); + if (next === 0) { + this.runningCounts.delete(subGroupKey); + } else { + this.runningCounts.set(subGroupKey, next); + } + this.lastCompletionTimes.set(subGroupKey, Date.now()); + } + + public getCooldownRemaining(subGroupKey: string): number { + if (this.cooldownMs <= 0) { + return 0; + } + const lastCompletion = this.lastCompletionTimes.get(subGroupKey); + if (lastCompletion === undefined) { + return 0; + } + const elapsed = Date.now() - lastCompletion; + return Math.max(0, this.cooldownMs - elapsed); + } + + public getRunningCount(subGroupKey: string): number { + return this.runningCounts.get(subGroupKey) ?? 0; + } + + public reset(): void { + this.runningCounts.clear(); + this.lastCompletionTimes.clear(); + } +} diff --git a/ts/taskbuffer.classes.taskmanager.ts b/ts/taskbuffer.classes.taskmanager.ts index 92d29bf..8f0dd16 100644 --- a/ts/taskbuffer.classes.taskmanager.ts +++ b/ts/taskbuffer.classes.taskmanager.ts @@ -1,10 +1,11 @@ import * as plugins from './taskbuffer.plugins.js'; import { Task } from './taskbuffer.classes.task.js'; +import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js'; import { AbstractDistributedCoordinator, type IDistributedTaskRequestResult, } from './taskbuffer.classes.distributedcoordinator.js'; -import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js'; +import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js'; import { logger } from './taskbuffer.logging.js'; export interface ICronJob { @@ -19,23 +20,28 @@ export interface ITaskManagerConstructorOptions { export class TaskManager { public randomId = plugins.smartunique.shortId(); - public taskMap = new plugins.lik.ObjectMap>(); + public taskMap = new plugins.lik.ObjectMap>(); public readonly taskSubject = new plugins.smartrx.rxjs.Subject(); - private taskSubscriptions = new Map, plugins.smartrx.rxjs.Subscription>(); + private taskSubscriptions = new Map, plugins.smartrx.rxjs.Subscription>(); private cronJobManager = new plugins.smarttime.CronManager(); public options: ITaskManagerConstructorOptions = { distributedCoordinator: null, }; + // Constraint system + public constraintGroups: TaskConstraintGroup[] = []; + private constraintQueue: IConstrainedTaskEntry[] = []; + private drainTimer: ReturnType | null = null; + constructor(options: ITaskManagerConstructorOptions = {}) { this.options = Object.assign(this.options, options); } - public getTaskByName(taskName: string): Task { + public getTaskByName(taskName: string): Task { return this.taskMap.findSync((task) => task.name === taskName); } - public addTask(task: Task): void { + public addTask(task: Task): void { if (!task.name) { throw new Error('Task must have a name to be added to taskManager'); } @@ -46,7 +52,7 @@ export class TaskManager { this.taskSubscriptions.set(task, subscription); } - public removeTask(task: Task): void { + public removeTask(task: Task): void { this.taskMap.remove(task); const subscription = this.taskSubscriptions.get(task); if (subscription) { @@ -55,21 +61,134 @@ export class TaskManager { } } - public addAndScheduleTask(task: Task, cronString: string) { + public addAndScheduleTask(task: Task, cronString: string) { this.addTask(task); this.scheduleTaskByName(task.name, cronString); } + // Constraint group management + public addConstraintGroup(group: TaskConstraintGroup): void { + this.constraintGroups.push(group); + } + + public removeConstraintGroup(name: string): void { + this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name); + } + + // Core constraint evaluation + public async triggerTaskConstrained(task: Task, input?: any): Promise { + // Gather applicable constraints + const applicableGroups: Array<{ group: TaskConstraintGroup; key: string }> = []; + for (const group of this.constraintGroups) { + const key = group.getConstraintKey(task); + if (key !== null) { + applicableGroups.push({ group, key }); + } + } + + // No constraints apply → trigger directly + if (applicableGroups.length === 0) { + return task.trigger(input); + } + + // Check if all constraints allow running + const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key)); + if (allCanRun) { + return this.executeWithConstraintTracking(task, input, applicableGroups); + } + + // Blocked → enqueue with deferred promise + const deferred = plugins.smartpromise.defer(); + this.constraintQueue.push({ task, input, deferred }); + return deferred.promise; + } + + private async executeWithConstraintTracking( + task: Task, + input: any, + groups: Array<{ group: TaskConstraintGroup; key: string }>, + ): Promise { + // Acquire slots + for (const { group, key } of groups) { + group.acquireSlot(key); + } + + try { + return await task.trigger(input); + } finally { + // Release slots + for (const { group, key } of groups) { + group.releaseSlot(key); + } + this.drainConstraintQueue(); + } + } + + private drainConstraintQueue(): void { + let shortestCooldown = Infinity; + const stillQueued: IConstrainedTaskEntry[] = []; + + for (const entry of this.constraintQueue) { + const applicableGroups: Array<{ group: TaskConstraintGroup; key: string }> = []; + for (const group of this.constraintGroups) { + const key = group.getConstraintKey(entry.task); + if (key !== null) { + applicableGroups.push({ group, key }); + } + } + + // No constraints apply anymore (group removed?) → run directly + if (applicableGroups.length === 0) { + entry.task.trigger(entry.input).then( + (result) => entry.deferred.resolve(result), + (err) => entry.deferred.reject(err), + ); + continue; + } + + const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key)); + if (allCanRun) { + this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then( + (result) => entry.deferred.resolve(result), + (err) => entry.deferred.reject(err), + ); + } else { + stillQueued.push(entry); + // Track shortest cooldown for timer scheduling + for (const { group, key } of applicableGroups) { + const remaining = group.getCooldownRemaining(key); + if (remaining > 0 && remaining < shortestCooldown) { + shortestCooldown = remaining; + } + } + } + } + + this.constraintQueue = stillQueued; + + // Schedule next drain if there are cooldown-blocked entries + if (this.drainTimer) { + clearTimeout(this.drainTimer); + this.drainTimer = null; + } + if (stillQueued.length > 0 && shortestCooldown < Infinity) { + this.drainTimer = setTimeout(() => { + this.drainTimer = null; + this.drainConstraintQueue(); + }, shortestCooldown + 1); + } + } + public async triggerTaskByName(taskName: string): Promise { const taskToTrigger = this.getTaskByName(taskName); if (!taskToTrigger) { throw new Error(`No task with the name ${taskName} found.`); } - return taskToTrigger.trigger(); + return this.triggerTaskConstrained(taskToTrigger); } - public async triggerTask(task: Task) { - return task.trigger(); + public async triggerTask(task: Task) { + return this.triggerTaskConstrained(task); } public scheduleTaskByName(taskName: string, cronString: string) { @@ -80,7 +199,7 @@ export class TaskManager { this.handleTaskScheduling(taskToSchedule, cronString); } - private handleTaskScheduling(task: Task, cronString: string) { + private handleTaskScheduling(task: Task, cronString: string) { const cronJob = this.cronJobManager.addCronjob( cronString, async (triggerTime: number) => { @@ -98,7 +217,7 @@ export class TaskManager { } } try { - await task.trigger(); + await this.triggerTaskConstrained(task); } catch (err) { logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`); } @@ -107,7 +226,7 @@ export class TaskManager { task.cronJob = cronJob; } - private logTaskState(task: Task) { + private logTaskState(task: Task) { logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`); const bufferState = task.buffered ? `buffered with max ${task.bufferMax} buffered calls` @@ -116,7 +235,7 @@ export class TaskManager { } private async performDistributedConsultation( - task: Task, + task: Task, triggerTime: number, ): Promise { logger.log('info', 'Found a distributed coordinator, performing consultation.'); @@ -144,7 +263,7 @@ export class TaskManager { } } - public async descheduleTask(task: Task) { + public async descheduleTask(task: Task) { await this.descheduleTaskByName(task.name); } @@ -169,6 +288,10 @@ export class TaskManager { subscription.unsubscribe(); } this.taskSubscriptions.clear(); + if (this.drainTimer) { + clearTimeout(this.drainTimer); + this.drainTimer = null; + } } // Get metadata for a specific task @@ -186,7 +309,7 @@ export class TaskManager { // Get scheduled tasks with their schedules and next run times public getScheduledTasks(): IScheduledTaskInfo[] { const scheduledTasks: IScheduledTaskInfo[] = []; - + for (const task of this.taskMap.getArray()) { if (task.cronJob) { scheduledTasks.push({ @@ -199,7 +322,7 @@ export class TaskManager { }); } } - + return scheduledTasks; } @@ -213,11 +336,11 @@ export class TaskManager { })) .sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime()) .slice(0, limit); - + return scheduledRuns; } - public getTasksByLabel(key: string, value: string): Task[] { + public getTasksByLabel(key: string, value: string): Task[] { return this.taskMap.getArray().filter(task => task.labels[key] === value); } @@ -227,7 +350,7 @@ export class TaskManager { // Add, execute, and remove a task while collecting metadata public async addExecuteRemoveTask>( - task: Task, + task: Task, options?: { schedule?: string; trackProgress?: boolean; @@ -235,19 +358,18 @@ export class TaskManager { ): Promise { // Add task to manager this.addTask(task); - + // Optionally schedule it if (options?.schedule) { this.scheduleTaskByName(task.name!, options.schedule); } - + const startTime = Date.now(); - const progressUpdates: Array<{ stepName: string; timestamp: number }> = []; - + try { - // Execute the task - const result = await task.trigger(); - + // Execute the task through constraints + const result = await this.triggerTaskConstrained(task); + // Collect execution report const report: ITaskExecutionReport = { taskName: task.name || 'unnamed', @@ -261,15 +383,15 @@ export class TaskManager { progress: task.getProgress(), result, }; - + // Remove task from manager this.removeTask(task); - + // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } - + return report; } catch (error) { // Create error report @@ -285,15 +407,15 @@ export class TaskManager { progress: task.getProgress(), error: error as Error, }; - + // Remove task from manager even on error this.removeTask(task); - + // Deschedule if it was scheduled if (options?.schedule && task.name) { this.descheduleTaskByName(task.name); } - + throw errorReport; } } diff --git a/ts/taskbuffer.classes.taskrunner.ts b/ts/taskbuffer.classes.taskrunner.ts deleted file mode 100644 index 0250664..0000000 --- a/ts/taskbuffer.classes.taskrunner.ts +++ /dev/null @@ -1,69 +0,0 @@ -import * as plugins from './taskbuffer.plugins.js'; - -import { Task } from './taskbuffer.classes.task.js'; -import { logger } from './taskbuffer.logging.js'; - -export class TaskRunner { - public maxParallelJobs: number = 1; - public status: 'stopped' | 'running' = 'stopped'; - public runningTasks: plugins.lik.ObjectMap = - new plugins.lik.ObjectMap(); - public queuedTasks: Task[] = []; - - constructor() { - this.runningTasks.eventSubject.subscribe(async (eventArg) => { - this.checkExecution(); - }); - } - - /** - * adds a task to the queue - */ - public addTask(taskArg: Task) { - this.queuedTasks.push(taskArg); - this.checkExecution(); - } - - /** - * set amount of parallel tasks - * be careful, you might lose dependability of tasks - */ - public setMaxParallelJobs(maxParallelJobsArg: number) { - this.maxParallelJobs = maxParallelJobsArg; - } - - /** - * starts the task queue - */ - public async start() { - this.status = 'running'; - } - - /** - * checks whether execution is on point - */ - public async checkExecution() { - if ( - this.runningTasks.getArray().length < this.maxParallelJobs && - this.status === 'running' && - this.queuedTasks.length > 0 - ) { - const nextJob = this.queuedTasks.shift(); - this.runningTasks.add(nextJob); - try { - await nextJob.trigger(); - } catch (err) { - logger.log('error', `TaskRunner: task "${nextJob.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`); - } - this.runningTasks.remove(nextJob); - this.checkExecution(); - } - } - - /** - * stops the task queue - */ - public async stop() { - this.status = 'stopped'; - } -} diff --git a/ts/taskbuffer.interfaces.ts b/ts/taskbuffer.interfaces.ts index 26ab5d1..4c1215f 100644 --- a/ts/taskbuffer.interfaces.ts +++ b/ts/taskbuffer.interfaces.ts @@ -1,4 +1,18 @@ import type { ITaskStep } from './taskbuffer.classes.taskstep.js'; +import type { Task } from './taskbuffer.classes.task.js'; + +export interface ITaskConstraintGroupOptions = Record> { + name: string; + constraintKeyForTask: (task: Task) => string | null | undefined; + maxConcurrent?: number; // default: Infinity + cooldownMs?: number; // default: 0 +} + +export interface IConstrainedTaskEntry { + task: Task; + input: any; + deferred: import('@push.rocks/smartpromise').Deferred; +} export interface ITaskMetadata { name: string; diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 945d300..56ca4f4 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '4.2.1', + version: '5.0.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' }