Compare commits

..

10 Commits

18 changed files with 1634 additions and 351 deletions

View File

@@ -1,5 +1,45 @@
# Changelog
## 2026-02-15 - 6.0.1 - fix(taskbuffer)
no changes to commit
- Git diff shows no changes
- package.json current version is 6.0.0; no version bump required
## 2026-02-15 - 6.0.0 - BREAKING CHANGE(constraints)
make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook
- Rename ITaskConstraintGroupOptions.constraintKeyForTask -> constraintKeyForExecution(task, input?) and update TaskConstraintGroup.getConstraintKey signature
- Add optional shouldExecute(task, input?) hook; TaskManager checks shouldExecute before immediate runs, after acquiring slots, and when draining the constraint queue (queued tasks are skipped when shouldExecute returns false)
- Export ITaskExecution type and store constraintKeys on queued entries (IConstrainedTaskEntry.constraintKeys)
- Documentation and tests updated to demonstrate input-aware constraint keys and shouldExecute pruning
## 2026-02-15 - 5.0.1 - fix(tests)
add and tighten constraint-related tests covering return values, error propagation, concurrency, cooldown timing, and constraint removal
- Tightened cooldown timing assertion from >=100ms to >=250ms to reflect 300ms cooldown with 50ms tolerance.
- Added tests for queued task return values, error propagation when catchErrors is false, and error swallowing behavior when catchErrors is true.
- Added concurrency and cooldown interaction tests to ensure maxConcurrent is respected and batch timing is correct.
- Added test verifying removing a constraint group unblocks queued tasks and drain behavior completes correctly.
## 2026-02-15 - 5.0.0 - BREAKING CHANGE(taskbuffer)
Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests.
- Add TaskConstraintGroup class with per-key maxConcurrent, cooldownMs, and helper methods (canRun, acquireSlot, releaseSlot, getCooldownRemaining, getRunningCount, reset).
- Task generic signature extended to Task<T, TSteps, TData> and a new typed data property (data) with default {}.
- TaskManager now supports addConstraintGroup/removeConstraintGroup, triggerTaskConstrained, queues blocked tasks, drains queue with cooldown timers, and routes triggerTask/triggerTaskByName through the constraint system.
- Removed TaskRunner, plus Task APIs: blockingTasks, execDelay, finished promise and associated behavior have been removed (breaking changes).
- Exports and interfaces updated: TaskConstraintGroup and ITaskConstraintGroupOptions added; TaskRunner removed from public API.
- Updated README and added comprehensive tests for constraint behavior; adjusted other tests to remove TaskRunner usage and reflect new APIs.
## 2026-02-15 - 4.2.1 - fix(deps)
bump @push.rocks/smartlog and @types/node; update dependency list version and license link in docs
- package.json: update @push.rocks/smartlog from ^3.1.10 to ^3.1.11
- package.json: update @types/node from ^25.1.0 to ^25.2.3
- readme.hints.md: update 'Dependencies (as of v4.1.1)' to 'Dependencies (as of v4.2.0)' and reflect bumped dependency versions
- readme.md: change license link text to '[LICENSE](./license.md)'
## 2026-01-29 - 4.2.0 - feat(ts_web)
support TC39 'accessor' decorators for web components; bump dependencies and devDependencies; rename browser tests to .chromium.ts; move LICENSE to license.md and update readme

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/taskbuffer",
"version": "4.2.0",
"version": "6.0.1",
"private": false,
"description": "A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.",
"main": "dist_ts/index.js",
@@ -37,7 +37,7 @@
"@design.estate/dees-element": "^2.1.6",
"@push.rocks/lik": "^6.2.2",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartlog": "^3.1.10",
"@push.rocks/smartlog": "^3.1.11",
"@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smarttime": "^4.1.1",
@@ -48,7 +48,7 @@
"@git.zone/tsbundle": "^2.8.3",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tstest": "^3.1.8",
"@types/node": "^25.1.0"
"@types/node": "^25.2.3"
},
"files": [
"ts/**/*",

91
pnpm-lock.yaml generated
View File

@@ -18,8 +18,8 @@ importers:
specifier: ^3.0.5
version: 3.0.5
'@push.rocks/smartlog':
specifier: ^3.1.10
version: 3.1.10
specifier: ^3.1.11
version: 3.1.11
'@push.rocks/smartpromise':
specifier: ^4.2.3
version: 4.2.3
@@ -46,8 +46,8 @@ importers:
specifier: ^3.1.8
version: 3.1.8(socks@2.8.7)(typescript@5.9.3)
'@types/node':
specifier: ^25.1.0
version: 25.1.0
specifier: ^25.2.3
version: 25.2.3
packages:
@@ -767,8 +767,8 @@ packages:
'@push.rocks/smartlog-interfaces@3.0.2':
resolution: {integrity: sha512-8hGRTJehbsFSJxLhCQkA018mZtXVPxPTblbg9VaE/EqISRzUw+eosJ2EJV7M4Qu0eiTJZjnWnNLn8CkD77ziWw==}
'@push.rocks/smartlog@3.1.10':
resolution: {integrity: sha512-5pf5JyzOE2WTCUislNIW4EHePo1a7hiXB+jbil38+N5hW71AEwcPFe6oGxbp5w9ALlz66hV2+E+25R0SsxN+fQ==}
'@push.rocks/smartlog@3.1.11':
resolution: {integrity: sha512-zyLH8pQD2UD7l76wJBESEWXU1FSTBLOuRI0/DN139EYyMkwMq1+pdQKptTkJhhVL/OIj56oMg9SpJb4bJB7uKg==}
'@push.rocks/smartmanifest@2.0.2':
resolution: {integrity: sha512-QGc5C9vunjfUbYsPGz5bynV/mVmPHkrQDkWp8ZO8VJtK1GZe+njgbrNyxn2SUHR0IhSAbSXl1j4JvBqYf5eTVg==}
@@ -1344,9 +1344,6 @@ packages:
'@tsclass/tsclass@4.4.4':
resolution: {integrity: sha512-YZOAF+u+r4u5rCev2uUd1KBTBdfyFdtDmcv4wuN+864lMccbdfRICR3SlJwCfYS1lbeV3QNLYGD30wjRXgvCJA==}
'@tsclass/tsclass@9.2.0':
resolution: {integrity: sha512-A6ULEkQfYgOnCKQVQRt26O7PRzFo4PE2EoD25RAtnuFuVrNwGynYC20Vee2c8KAOyI7nQ/LaREki9KAX4AHOHQ==}
'@tsclass/tsclass@9.3.0':
resolution: {integrity: sha512-KD3oTUN3RGu67tgjNHgWWZGsdYipr1RUDxQ9MMKSgIJ6oNZ4q5m2rg0ibrgyHWkAjTPlHVa6kHP3uVOY+8bnHw==}
@@ -1431,8 +1428,8 @@ packages:
'@types/node@22.19.7':
resolution: {integrity: sha512-MciR4AKGHWl7xwxkBa6xUGxQJ4VBOmPTF7sL+iGzuahOFaO0jHCsuEfS80pan1ef4gWId1oWOweIhrDEYLuaOw==}
'@types/node@25.1.0':
resolution: {integrity: sha512-t7frlewr6+cbx+9Ohpl0NOTKXZNV9xHRmNOvql47BFJKcEG1CxtxlPEEe+gR9uhVWM4DwhnvTF110mIL4yP9RA==}
'@types/node@25.2.3':
resolution: {integrity: sha512-m0jEgYlYz+mDJZ2+F4v8D1AyQb+QzsNqRuI7xg1VQX/KlKS0qT9r1Mo16yo5F/MtifXFgaofIFsdFMox2SxIbQ==}
'@types/ping@0.4.4':
resolution: {integrity: sha512-ifvo6w2f5eJYlXm+HiVx67iJe8WZp87sfa683nlqED5Vnt9Z93onkokNoWqOG21EaE8fMxyKPobE+mkPEyxsdw==}
@@ -3573,7 +3570,7 @@ snapshots:
'@push.rocks/smartfeed': 1.4.0
'@push.rocks/smartfile': 11.2.7
'@push.rocks/smartjson': 5.2.0
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartlog-destination-devtools': 1.0.12
'@push.rocks/smartlog-interfaces': 3.0.2
'@push.rocks/smartmanifest': 2.0.2
@@ -4325,7 +4322,7 @@ snapshots:
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartfile': 13.1.2
'@push.rocks/smartfs': 1.3.1
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3
typescript: 5.9.3
@@ -4346,7 +4343,7 @@ snapshots:
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartfs': 1.3.1
'@push.rocks/smartinteract': 2.0.16
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartlog-destination-local': 9.0.2
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3
@@ -4372,7 +4369,7 @@ snapshots:
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartfile': 13.1.2
'@push.rocks/smartfs': 1.3.1
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartnpm': 2.0.6
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartrequest': 5.0.1
@@ -4407,7 +4404,7 @@ snapshots:
'@push.rocks/smartexpect': 2.5.0
'@push.rocks/smartfile': 11.2.7
'@push.rocks/smartjson': 5.2.0
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartmongo': 2.0.14(socks@2.8.7)
'@push.rocks/smartnetwork': 4.4.0
'@push.rocks/smartpath': 6.0.0
@@ -4808,7 +4805,7 @@ snapshots:
'@push.rocks/qenv': 6.1.3
'@push.rocks/smartfile': 11.2.7
'@push.rocks/smartjson': 5.2.0
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartrx': 3.0.10
@@ -4825,7 +4822,7 @@ snapshots:
'@api.global/typedrequest': 3.2.5
'@configvault.io/interfaces': 1.0.17
'@push.rocks/smartfile': 11.2.7
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartarchive@4.2.4':
@@ -4915,7 +4912,7 @@ snapshots:
'@push.rocks/smartcli@4.0.20':
dependencies:
'@push.rocks/lik': 6.2.2
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartobject': 1.0.12
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartrx': 3.0.10
@@ -4940,7 +4937,7 @@ snapshots:
dependencies:
'@push.rocks/lik': 6.2.2
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartmongo': 2.0.14(socks@2.8.7)
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartrx': 3.0.10
@@ -5114,18 +5111,18 @@ snapshots:
'@api.global/typedrequest-interfaces': 2.0.2
'@tsclass/tsclass': 4.4.4
'@push.rocks/smartlog@3.1.10':
'@push.rocks/smartlog@3.1.11':
dependencies:
'@api.global/typedrequest-interfaces': 3.0.19
'@push.rocks/consolecolor': 2.0.3
'@push.rocks/isounique': 1.0.5
'@push.rocks/smartclickhouse': 2.0.17
'@push.rocks/smartfile': 11.2.7
'@push.rocks/smarthash': 3.2.3
'@push.rocks/smarthash': 3.2.6
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smarttime': 4.1.1
'@push.rocks/webrequest': 3.0.37
'@tsclass/tsclass': 9.2.0
'@push.rocks/webrequest': 4.0.1
'@tsclass/tsclass': 9.3.0
'@push.rocks/smartmanifest@2.0.2': {}
@@ -5362,7 +5359,7 @@ snapshots:
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartenv': 5.0.13
'@push.rocks/smartjson': 5.2.0
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartrx': 3.0.10
'@push.rocks/smarttime': 4.1.1
@@ -5459,7 +5456,7 @@ snapshots:
'@design.estate/dees-element': 2.1.6
'@push.rocks/lik': 6.2.2
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartlog': 3.1.10
'@push.rocks/smartlog': 3.1.11
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartrx': 3.0.10
'@push.rocks/smarttime': 4.1.1
@@ -6015,10 +6012,6 @@ snapshots:
dependencies:
type-fest: 4.41.0
'@tsclass/tsclass@9.2.0':
dependencies:
type-fest: 4.41.0
'@tsclass/tsclass@9.3.0':
dependencies:
type-fest: 4.41.0
@@ -6030,27 +6023,27 @@ snapshots:
'@types/bn.js@5.2.0':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/body-parser@1.19.6':
dependencies:
'@types/connect': 3.4.38
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/buffer-json@2.0.3': {}
'@types/clean-css@4.2.11':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
source-map: 0.6.1
'@types/connect@3.4.38':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/cors@2.8.19':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/debug@4.1.12':
dependencies:
@@ -6058,7 +6051,7 @@ snapshots:
'@types/dns-packet@5.6.5':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/elliptic@6.4.18':
dependencies:
@@ -6066,7 +6059,7 @@ snapshots:
'@types/express-serve-static-core@5.1.1':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/qs': 6.14.0
'@types/range-parser': 1.2.7
'@types/send': 1.2.1
@@ -6080,7 +6073,7 @@ snapshots:
'@types/fs-extra@11.0.4':
dependencies:
'@types/jsonfile': 6.1.4
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/hast@3.0.4':
dependencies:
@@ -6102,7 +6095,7 @@ snapshots:
'@types/jsonfile@6.1.4':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/mdast@4.0.4':
dependencies:
@@ -6116,17 +6109,17 @@ snapshots:
'@types/mute-stream@0.0.4':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/node-forge@1.3.14':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/node@22.19.7':
dependencies:
undici-types: 6.21.0
'@types/node@25.1.0':
'@types/node@25.2.3':
dependencies:
undici-types: 7.16.0
@@ -6144,22 +6137,22 @@ snapshots:
'@types/send@1.2.1':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/serve-static@2.2.0':
dependencies:
'@types/http-errors': 2.0.5
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/symbol-tree@3.2.5': {}
'@types/tar-stream@3.1.4':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/through2@2.0.41':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/trusted-types@2.0.7': {}
@@ -6185,11 +6178,11 @@ snapshots:
'@types/ws@8.18.1':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
'@types/yauzl@2.10.3':
dependencies:
'@types/node': 25.1.0
'@types/node': 25.2.3
optional: true
'@ungap/structured-clone@1.3.0': {}
@@ -6602,7 +6595,7 @@ snapshots:
engine.io@6.6.4:
dependencies:
'@types/cors': 2.8.19
'@types/node': 25.1.0
'@types/node': 25.2.3
accepts: 1.3.8
base64id: 2.0.0
cookie: 0.7.2

View File

@@ -1,20 +1,44 @@
# Taskbuffer Hints
## Task Constraint System (v5.0.0+) — Breaking Changes
- **`TaskRunner` removed** — replaced by `TaskManager` + `TaskConstraintGroup`
- **`blockingTasks` removed** from `Task` — use `TaskConstraintGroup` with `maxConcurrent: 1`
- **`execDelay` removed** from `Task` — use `TaskConstraintGroup` with `cooldownMs`
- **`finished` promise removed** from `Task` — no longer needed
- **`Task` generic signature**: `Task<T, TSteps, TData>` (3rd param added for typed data)
### Task.data
- `Task` constructor accepts optional `data?: TData` (defaults to `{}`)
- Typed data bag accessible as `task.data`
### TaskConstraintGroup
- `new TaskConstraintGroup<TData>({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute? })`
- `constraintKeyForExecution(task, input?)` returns a string key (constraint applies) or `null` (skip). Receives both task and runtime input.
- `shouldExecute(task, input?)` — optional pre-execution check. Returns `false` to skip (deferred resolves `undefined`). Can be async.
- `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key
- `cooldownMs` (default: `0`) — minimum ms gap between completions per key
- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRunningCount(key)`, `reset()`
- `ITaskExecution<TData>` type exported from index — `{ task, input }` tuple
### TaskManager Constraint Integration
- `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)`
- `triggerTaskByName()`, `triggerTask()`, `addExecuteRemoveTask()`, cron callbacks all route through `triggerTaskConstrained()`
- `triggerTaskConstrained(task, input?)` — evaluates constraints, queues if blocked, drains after completion
- Cooldown-blocked entries auto-drain via timer
### Exported from index.ts
- `TaskConstraintGroup` class
- `ITaskConstraintGroupOptions` type
## Error Handling (v3.6.0+)
- `Task` now has `catchErrors` constructor option (default: `false`)
- Default behavior: `trigger()` rejects when taskFunction throws (breaking change from pre-3.6)
- Set `catchErrors: true` to swallow errors (old behavior) - returns `undefined` on error
- Error state tracked via `lastError?: Error`, `errorCount: number`, `clearError()`
- `getMetadata()` status uses all four values: `'idle'` | `'running'` | `'completed'` | `'failed'`
- All peripheral classes (Taskchain, Taskparallel, TaskRunner, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
- All peripheral classes (Taskchain, Taskparallel, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
- `console.log` calls replaced with `logger.log()` throughout
## Breaking API Rename (TaskRunner)
- `maxParrallelJobs``maxParallelJobs`
- `qeuedTasks``queuedTasks`
- JSDoc typos fixed: "qeue" → "queue", "wether" → "whether", "loose" → "lose"
- The `setMaxParallelJobs()` parameter also renamed from `maxParrallelJobsArg` to `maxParallelJobsArg`
## Error Context Improvements
- **TaskChain**: Errors now wrap the original with context: chain name, failing task name, and task index. Original error preserved via `.cause`
- **BufferRunner**: When `catchErrors: false`, buffered task errors now reject the trigger promise (via `CycleCounter.informOfCycleError`) instead of silently resolving with `undefined`
@@ -53,11 +77,11 @@
accessor count = 0;
```
## Dependencies (as of v4.1.1)
## Dependencies (as of v4.2.0)
- `@design.estate/dees-element` ^2.1.6 - TC39 decorators with `accessor` keyword
- `@push.rocks/lik` ^6.2.2 - Data structures
- `@push.rocks/smartdelay` ^3.0.5 - Delay utilities
- `@push.rocks/smartlog` ^3.1.10 - Logging
- `@push.rocks/smartlog` ^3.1.11 - Logging
- `@push.rocks/smartpromise` ^4.2.3 - Promise utilities
- `@push.rocks/smartrx` ^3.0.10 - RxJS wrapper
- `@push.rocks/smarttime` ^4.1.1 - Time/cron utilities
@@ -66,4 +90,4 @@
- `@git.zone/tsbundle` ^2.8.3 - Bundler (for browser tests)
- `@git.zone/tsrun` ^2.0.1 - TypeScript runner
- `@git.zone/tstest` ^3.1.8 - Test runner (supports `.chromium.ts` files)
- `@types/node` ^25.1.0 - Node.js type definitions
- `@types/node` ^25.2.3 - Node.js type definitions

377
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/taskbuffer 🚀
> **Modern TypeScript task orchestration with smart buffering, scheduling, labels, and real-time event streaming**
> **Modern TypeScript task orchestration with constraint-based concurrency, smart buffering, scheduling, labels, and real-time event streaming**
[![npm version](https://img.shields.io/npm/v/@push.rocks/taskbuffer.svg)](https://www.npmjs.com/package/@push.rocks/taskbuffer)
[![TypeScript](https://img.shields.io/badge/TypeScript-5.x-blue.svg)](https://www.typescriptlang.org/)
@@ -13,6 +13,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
## 🌟 Features
- **🎯 Type-Safe Task Management** — Full TypeScript support with generics and type inference
- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, and cooldown enforcement via `TaskConstraintGroup`
- **📊 Real-Time Progress Tracking** — Step-based progress with percentage weights
- **⚡ Smart Buffering** — Intelligent request debouncing and batching
- **⏰ Cron Scheduling** — Schedule tasks with cron expressions
@@ -49,6 +50,24 @@ const result = await greetTask.trigger('World');
console.log(result); // "Hello, World!"
```
### Task with Typed Data 📦
Every task can carry a typed data bag — perfect for constraint matching, routing, and metadata:
```typescript
const task = new Task<undefined, [], { domain: string; priority: number }>({
name: 'update-dns',
data: { domain: 'example.com', priority: 1 },
taskFunction: async () => {
// task.data is fully typed here
console.log(`Updating DNS for ${task.data.domain}`);
},
});
task.data.domain; // string — fully typed
task.data.priority; // number — fully typed
```
### Task with Steps & Progress 📊
```typescript
@@ -84,6 +103,225 @@ console.log(deployTask.getStepsMetadata()); // Step details with status
> **Note:** `notifyStep()` is fully type-safe — TypeScript only accepts step names you declared in the `steps` array when you use `as const`.
## 🔒 Task Constraints — Concurrency, Mutual Exclusion & Cooldowns
`TaskConstraintGroup` is the unified mechanism for controlling how tasks run relative to each other. It replaces older patterns like task runners, blocking tasks, and execution delays with a single, composable, key-based constraint system.
### Per-Key Mutual Exclusion
Ensure only one task runs at a time for a given key (e.g. per domain, per tenant, per resource):
```typescript
import { Task, TaskManager, TaskConstraintGroup } from '@push.rocks/taskbuffer';
const manager = new TaskManager();
// Only one DNS update per domain at a time
const domainMutex = new TaskConstraintGroup<{ domain: string }>({
name: 'domain-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?) => task.data.domain,
});
manager.addConstraintGroup(domainMutex);
const task1 = new Task<undefined, [], { domain: string }>({
name: 'update-a.com',
data: { domain: 'a.com' },
taskFunction: async () => { /* update DNS for a.com */ },
});
const task2 = new Task<undefined, [], { domain: string }>({
name: 'update-a.com-2',
data: { domain: 'a.com' },
taskFunction: async () => { /* another update for a.com */ },
});
manager.addTask(task1);
manager.addTask(task2);
// task2 waits until task1 finishes (same domain key)
await Promise.all([
manager.triggerTask(task1),
manager.triggerTask(task2),
]);
```
### Group Concurrency Limits
Cap how many tasks can run concurrently across a group:
```typescript
// Max 3 DNS updaters running globally at once
const dnsLimit = new TaskConstraintGroup<{ group: string }>({
name: 'dns-concurrency',
maxConcurrent: 3,
constraintKeyForExecution: (task) =>
task.data.group === 'dns' ? 'dns' : null, // null = skip constraint
});
manager.addConstraintGroup(dnsLimit);
```
### Cooldowns (Rate Limiting)
Enforce a minimum time gap between consecutive executions for the same key:
```typescript
// No more than one API call per domain every 11 seconds
const rateLimiter = new TaskConstraintGroup<{ domain: string }>({
name: 'api-rate-limit',
maxConcurrent: 1,
cooldownMs: 11000,
constraintKeyForExecution: (task) => task.data.domain,
});
manager.addConstraintGroup(rateLimiter);
```
### Global Concurrency Cap
Limit total concurrent tasks system-wide:
```typescript
const globalCap = new TaskConstraintGroup({
name: 'global-cap',
maxConcurrent: 10,
constraintKeyForExecution: () => 'all', // same key = shared limit
});
manager.addConstraintGroup(globalCap);
```
### Composing Multiple Constraints
Multiple constraint groups stack — a task only runs when **all** applicable constraints allow it:
```typescript
manager.addConstraintGroup(globalCap); // max 10 globally
manager.addConstraintGroup(domainMutex); // max 1 per domain
manager.addConstraintGroup(rateLimiter); // 11s cooldown per domain
// A task must satisfy ALL three constraints before it starts
await manager.triggerTask(dnsTask);
```
### Selective Constraints
Return `null` from `constraintKeyForExecution` to exempt a task from a constraint group:
```typescript
const constraint = new TaskConstraintGroup<{ priority: string }>({
name: 'low-priority-limit',
maxConcurrent: 2,
constraintKeyForExecution: (task) =>
task.data.priority === 'low' ? 'low-priority' : null, // high priority tasks skip this constraint
});
```
### Input-Aware Constraints 🎯
The `constraintKeyForExecution` function receives both the **task** and the **runtime input** passed to `trigger(input)`. This means the same task triggered with different inputs can be constrained independently:
```typescript
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
// Same TLD → serialized. Different TLDs → parallel.
const tldMutex = new TaskConstraintGroup({
name: 'tld-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
if (!input) return null;
return extractTLD(input); // "example.com", "other.org", etc.
},
});
manager.addConstraintGroup(tldMutex);
// These two serialize (same TLD "example.com")
const p1 = manager.triggerTaskConstrained(getCert, 'app.example.com');
const p2 = manager.triggerTaskConstrained(getCert, 'api.example.com');
// This runs in parallel (different TLD "other.org")
const p3 = manager.triggerTaskConstrained(getCert, 'my.other.org');
```
You can also combine `task.data` and `input` for composite keys:
```typescript
const providerDomain = new TaskConstraintGroup<{ provider: string }>({
name: 'provider-domain',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
return `${task.data.provider}:${input || 'default'}`;
},
});
```
### Pre-Execution Check with `shouldExecute` ✅
The `shouldExecute` callback runs right before a queued task executes. If it returns `false`, the task is skipped and its promise resolves with `undefined`. This is perfect for scenarios where a prior execution's outcome makes subsequent queued tasks unnecessary:
```typescript
const certCache = new Map<string, string>();
const certConstraint = new TaskConstraintGroup({
name: 'cert-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
shouldExecute: (task, input?: string) => {
if (!input) return true;
// Skip if a wildcard cert already covers this TLD
return certCache.get(extractTLD(input)) !== 'wildcard';
},
});
const getCert = new Task({
name: 'get-certificate',
taskFunction: async (domain: string) => {
const cert = await acme.getCert(domain);
if (cert.isWildcard) certCache.set(extractTLD(domain), 'wildcard');
return cert;
},
});
manager.addConstraintGroup(certConstraint);
manager.addTask(getCert);
const r1 = manager.triggerTaskConstrained(getCert, 'app.example.com'); // runs, gets wildcard
const r2 = manager.triggerTaskConstrained(getCert, 'api.example.com'); // queued → skipped!
const r3 = manager.triggerTaskConstrained(getCert, 'my.other.org'); // parallel (different TLD)
const [cert1, cert2, cert3] = await Promise.all([r1, r2, r3]);
// cert2 === undefined (skipped because wildcard already covers example.com)
```
**`shouldExecute` semantics:**
- Runs right before execution (after slot acquisition, before `trigger()`)
- Also checked on immediate (non-queued) triggers
- Returns `false` → skip execution, deferred resolves with `undefined`
- Can be async (return `Promise<boolean>`)
- Has closure access to external state modified by prior executions
- If multiple constraint groups have `shouldExecute`, **all** must return `true`
### How It Works
When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager:
1. Evaluates all registered constraint groups against the task and input
2. If no constraints apply (all matchers return `null`) → checks `shouldExecute` → runs or skips
3. If all applicable constraints have capacity → acquires slots → checks `shouldExecute` → runs or skips
4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained
5. Cooldown-blocked tasks auto-retry after the shortest remaining cooldown expires
6. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned
## 🎯 Core Concepts
### Task Buffering — Intelligent Request Management
@@ -95,7 +333,6 @@ const apiTask = new Task({
name: 'APIRequest',
buffered: true,
bufferMax: 5, // Maximum 5 concurrent executions
execDelay: 100, // Minimum 100ms between executions
taskFunction: async (endpoint) => {
return await fetch(endpoint).then((r) => r.json());
},
@@ -156,9 +393,9 @@ console.log(`Saved ${savedCount} items`);
Taskchain also supports dynamic mutation:
```typescript
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
```
Error context is rich — a chain failure includes the chain name, failing task name, task index, and preserves the original error as `.cause`.
@@ -220,27 +457,6 @@ await initTask.trigger(); // No-op
console.log(initTask.hasTriggered); // true
```
### TaskRunner — Managed Queue with Concurrency Control
Process a queue of tasks with a configurable parallelism limit:
```typescript
import { TaskRunner } from '@push.rocks/taskbuffer';
const runner = new TaskRunner();
runner.setMaxParallelJobs(3); // Run up to 3 tasks concurrently
await runner.start();
runner.addTask(taskA);
runner.addTask(taskB);
runner.addTask(taskC);
runner.addTask(taskD); // Queued until a slot opens
// When done:
await runner.stop();
```
## 🏷️ Labels — Multi-Tenant Task Filtering
Attach arbitrary key-value labels to any task for filtering, grouping, or multi-tenant isolation:
@@ -411,6 +627,10 @@ manager.addTask(deployTask);
manager.addAndScheduleTask(backupTask, '0 2 * * *'); // Daily at 2 AM
manager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes
// Register constraint groups
manager.addConstraintGroup(globalCap);
manager.addConstraintGroup(perDomainMutex);
// Query metadata
const meta = manager.getTaskMetadata('Deploy');
console.log(meta);
@@ -433,7 +653,7 @@ const allMeta = manager.getAllTasksMetadata();
const scheduled = manager.getScheduledTasks();
const nextRuns = manager.getNextScheduledRuns(5);
// Trigger by name
// Trigger by name (routes through constraints)
await manager.triggerTaskByName('Deploy');
// One-shot: add, execute, collect report, remove
@@ -462,6 +682,12 @@ manager.removeTask(task); // Removes from map and unsubscribes event forwarding
manager.descheduleTaskByName('Deploy'); // Remove cron schedule only
```
### Remove Constraint Groups
```typescript
manager.removeConstraintGroup('domain-mutex'); // By name
```
## 🎨 Web Component Dashboard
Visualize your tasks in real-time with the included Lit-based web component:
@@ -570,32 +796,6 @@ await task.trigger('SELECT * FROM users'); // Setup runs here
await task.trigger('SELECT * FROM orders'); // Setup skipped, pool reused
```
### Blocking Tasks
Make one task wait for another to finish before executing:
```typescript
const initTask = new Task({
name: 'Init',
taskFunction: async () => {
await initializeSystem();
},
});
const workerTask = new Task({
name: 'Worker',
taskFunction: async () => {
await doWork();
},
});
workerTask.blockingTasks.push(initTask);
// Triggering worker will automatically wait for init to complete
initTask.trigger();
workerTask.trigger(); // Waits until initTask.finished resolves
```
### Database Migration Pipeline
```typescript
@@ -616,15 +816,24 @@ try {
### Multi-Tenant SaaS Monitoring
Combine labels + events for a real-time multi-tenant dashboard:
Combine labels + events + constraints for a real-time multi-tenant system:
```typescript
const manager = new TaskManager();
// Per-tenant concurrency limit
const tenantLimit = new TaskConstraintGroup<{ tenantId: string }>({
name: 'tenant-concurrency',
maxConcurrent: 2,
constraintKeyForExecution: (task, input?) => task.data.tenantId,
});
manager.addConstraintGroup(tenantLimit);
// Create tenant-scoped tasks
function createTenantTask(tenantId: string, taskName: string, fn: () => Promise<any>) {
const task = new Task({
const task = new Task<undefined, [], { tenantId: string }>({
name: `${tenantId}:${taskName}`,
data: { tenantId },
labels: { tenantId },
taskFunction: fn,
});
@@ -653,15 +862,31 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| Class | Description |
| --- | --- |
| `Task<T, TSteps>` | Core task unit with optional step tracking, labels, and event streaming |
| `TaskManager` | Centralized orchestrator with scheduling, label queries, and aggregated events |
| `Task<T, TSteps, TData>` | Core task unit with typed data, optional step tracking, labels, and event streaming |
| `TaskManager` | Centralized orchestrator with constraint groups, scheduling, label queries, and aggregated events |
| `TaskConstraintGroup<TData>` | Concurrency, mutual exclusion, and cooldown constraints with key-based grouping |
| `Taskchain` | Sequential task executor with data flow between tasks |
| `Taskparallel` | Concurrent task executor via `Promise.all()` |
| `TaskOnce` | Single-execution guard |
| `TaskDebounced` | Debounced task using rxjs |
| `TaskRunner` | Sequential queue with configurable parallelism |
| `TaskStep` | Step tracking unit (internal, exposed via metadata) |
### Task Constructor Options
| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `taskFunction` | `ITaskFunction<T>` | *required* | The async function to execute |
| `name` | `string` | — | Task identifier (required for TaskManager) |
| `data` | `TData` | `{}` | Typed data bag for constraint matching and routing |
| `steps` | `ReadonlyArray<{name, description, percentage}>` | — | Step definitions for progress tracking |
| `buffered` | `boolean` | — | Enable request buffering |
| `bufferMax` | `number` | — | Max buffered calls |
| `preTask` | `Task \| () => Task` | — | Task to run before |
| `afterTask` | `Task \| () => Task` | — | Task to run after |
| `taskSetup` | `() => Promise<T>` | — | One-time setup function |
| `catchErrors` | `boolean` | `false` | Swallow errors instead of rejecting |
| `labels` | `Record<string, string>` | `{}` | Initial labels |
### Task Methods
| Method | Returns | Description |
@@ -682,6 +907,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| Property | Type | Description |
| --- | --- | --- |
| `name` | `string` | Task identifier |
| `data` | `TData` | Typed data bag |
| `running` | `boolean` | Whether the task is currently executing |
| `idle` | `boolean` | Inverse of `running` |
| `labels` | `Record<string, string>` | Attached labels |
@@ -690,7 +916,29 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| `errorCount` | `number` | Total error count across all runs |
| `runCount` | `number` | Total execution count |
| `lastRun` | `Date \| undefined` | Timestamp of last execution |
| `blockingTasks` | `Task[]` | Tasks that must finish before this one starts |
### TaskConstraintGroup Constructor Options
| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `name` | `string` | *required* | Constraint group identifier |
| `constraintKeyForExecution` | `(task, input?) => string \| null` | *required* | Returns key for grouping, or `null` to skip. Receives both the task and runtime input. |
| `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key |
| `cooldownMs` | `number` | `0` | Minimum ms between completions per key |
| `shouldExecute` | `(task, input?) => boolean \| Promise<boolean>` | — | Pre-execution check. Return `false` to skip; deferred resolves `undefined`. |
### TaskConstraintGroup Methods
| Method | Returns | Description |
| --- | --- | --- |
| `getConstraintKey(task, input?)` | `string \| null` | Get the constraint key for a task + input |
| `checkShouldExecute(task, input?)` | `Promise<boolean>` | Run the `shouldExecute` callback (defaults to `true`) |
| `canRun(key)` | `boolean` | Check if a slot is available |
| `acquireSlot(key)` | `void` | Claim a running slot |
| `releaseSlot(key)` | `void` | Release a slot and record completion time |
| `getCooldownRemaining(key)` | `number` | Milliseconds until cooldown expires |
| `getRunningCount(key)` | `number` | Current running count for key |
| `reset()` | `void` | Clear all state |
### TaskManager Methods
@@ -699,7 +947,11 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| `addTask(task)` | `void` | Register a task (wires event forwarding) |
| `removeTask(task)` | `void` | Remove task and unsubscribe events |
| `getTaskByName(name)` | `Task \| undefined` | Look up by name |
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name |
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name (routes through constraints) |
| `triggerTask(task)` | `Promise<any>` | Trigger directly (routes through constraints) |
| `triggerTaskConstrained(task, input?)` | `Promise<any>` | Core constraint evaluation entry point |
| `addConstraintGroup(group)` | `void` | Register a constraint group |
| `removeConstraintGroup(name)` | `void` | Remove a constraint group by name |
| `addAndScheduleTask(task, cron)` | `void` | Register + schedule |
| `scheduleTaskByName(name, cron)` | `void` | Schedule existing task |
| `descheduleTaskByName(name)` | `void` | Remove schedule |
@@ -719,6 +971,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| --- | --- | --- |
| `taskSubject` | `Subject<ITaskEvent>` | Aggregated events from all added tasks |
| `taskMap` | `ObjectMap<Task>` | Internal task registry |
| `constraintGroups` | `TaskConstraintGroup[]` | Registered constraint groups |
### Exported Types
@@ -726,18 +979,20 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
import type {
ITaskMetadata,
ITaskExecutionReport,
ITaskExecution,
IScheduledTaskInfo,
ITaskEvent,
TTaskEventType,
ITaskStep,
ITaskFunction,
ITaskConstraintGroupOptions,
StepNames,
} from '@push.rocks/taskbuffer';
```
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license.md](./license.md) file.
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

View File

@@ -137,38 +137,7 @@ tap.test('should reject Taskparallel when a child task throws', async () => {
expect(didReject).toBeTrue();
});
// Test 7: TaskRunner continues processing after a task error
tap.test('should continue TaskRunner queue after a task error', async () => {
const runner = new taskbuffer.TaskRunner();
const executionOrder: string[] = [];
const badTask = new taskbuffer.Task({
name: 'runner-bad-task',
taskFunction: async () => {
executionOrder.push('bad');
throw new Error('runner task failure');
},
});
const goodTask = new taskbuffer.Task({
name: 'runner-good-task',
taskFunction: async () => {
executionOrder.push('good');
},
});
await runner.start();
runner.addTask(badTask);
runner.addTask(goodTask);
// Wait for both tasks to be processed
await smartdelay.delayFor(500);
await runner.stop();
expect(executionOrder).toContain('bad');
expect(executionOrder).toContain('good');
});
// Test 8: BufferRunner handles errors without hanging
// Test 7: BufferRunner handles errors without hanging
tap.test('should handle BufferRunner errors without hanging', async () => {
let callCount = 0;
const bufferedTask = new taskbuffer.Task({

873
test/test.13.constraints.ts Normal file
View File

@@ -0,0 +1,873 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay';
// Test 1: Task data property — typed data accessible
tap.test('should have typed data property on task', async () => {
const task = new taskbuffer.Task<undefined, [], { domain: string; priority: number }>({
name: 'data-task',
data: { domain: 'example.com', priority: 1 },
taskFunction: async () => {},
});
expect(task.data.domain).toEqual('example.com');
expect(task.data.priority).toEqual(1);
});
// Test 2: Task data defaults to empty object
tap.test('should default data to empty object when not provided', async () => {
const task = new taskbuffer.Task({
name: 'no-data-task',
taskFunction: async () => {},
});
expect(task.data).toBeTruthy();
expect(typeof task.data).toEqual('object');
});
// Test 3: No-constraint passthrough — behavior unchanged
tap.test('should run tasks directly when no constraints are configured', async () => {
const manager = new taskbuffer.TaskManager();
let executed = false;
const task = new taskbuffer.Task({
name: 'passthrough-task',
taskFunction: async () => {
executed = true;
return 'done';
},
});
manager.addTask(task);
const result = await manager.triggerTaskByName('passthrough-task');
expect(executed).toBeTrue();
expect(result).toEqual('done');
await manager.stop();
});
// Test 4: Group concurrency — 3 tasks, max 2 concurrent, 3rd queues
tap.test('should enforce group concurrency limit', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({
name: 'concurrency-test',
maxConcurrent: 2,
constraintKeyForExecution: (task) =>
task.data.group === 'workers' ? 'workers' : null,
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task<undefined, [], { group: string }>({
name: `worker-${id}`,
data: { group: 'workers' },
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(200);
running--;
},
});
const t1 = makeTask(1);
const t2 = makeTask(2);
const t3 = makeTask(3);
manager.addTask(t1);
manager.addTask(t2);
manager.addTask(t3);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
manager.triggerTaskConstrained(t3),
]);
expect(maxRunning).toBeLessThanOrEqual(2);
await manager.stop();
});
// Test 5: Key-based mutual exclusion — same key sequential, different keys parallel
tap.test('should enforce key-based mutual exclusion', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup<{ domain: string }>({
name: 'domain-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task) => task.data.domain,
});
manager.addConstraintGroup(constraint);
const makeTask = (name: string, domain: string, delayMs: number) =>
new taskbuffer.Task<undefined, [], { domain: string }>({
name,
data: { domain },
taskFunction: async () => {
log.push(`${name}-start`);
await smartdelay.delayFor(delayMs);
log.push(`${name}-end`);
},
});
const taskA1 = makeTask('a1', 'a.com', 100);
const taskA2 = makeTask('a2', 'a.com', 100);
const taskB1 = makeTask('b1', 'b.com', 100);
manager.addTask(taskA1);
manager.addTask(taskA2);
manager.addTask(taskB1);
await Promise.all([
manager.triggerTaskConstrained(taskA1),
manager.triggerTaskConstrained(taskA2),
manager.triggerTaskConstrained(taskB1),
]);
// a1 and a2 should be sequential (same key)
const a1EndIdx = log.indexOf('a1-end');
const a2StartIdx = log.indexOf('a2-start');
expect(a2StartIdx).toBeGreaterThanOrEqual(a1EndIdx);
// b1 should start concurrently with a1 (different key)
const a1StartIdx = log.indexOf('a1-start');
const b1StartIdx = log.indexOf('b1-start');
// Both should start before a1 ends
expect(b1StartIdx).toBeLessThan(a1EndIdx);
await manager.stop();
});
// Test 6: Cooldown enforcement
tap.test('should enforce cooldown between task executions', async () => {
const manager = new taskbuffer.TaskManager();
const timestamps: number[] = [];
const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({
name: 'cooldown-test',
maxConcurrent: 1,
cooldownMs: 300,
constraintKeyForExecution: (task) => task.data.key,
});
manager.addConstraintGroup(constraint);
const makeTask = (name: string) =>
new taskbuffer.Task<undefined, [], { key: string }>({
name,
data: { key: 'shared' },
taskFunction: async () => {
timestamps.push(Date.now());
},
});
const t1 = makeTask('cool-1');
const t2 = makeTask('cool-2');
const t3 = makeTask('cool-3');
manager.addTask(t1);
manager.addTask(t2);
manager.addTask(t3);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
manager.triggerTaskConstrained(t3),
]);
// Each execution should be at least ~300ms apart (with 200ms tolerance)
for (let i = 1; i < timestamps.length; i++) {
const gap = timestamps[i] - timestamps[i - 1];
expect(gap).toBeGreaterThanOrEqual(250); // 300ms cooldown minus 50ms tolerance
}
await manager.stop();
});
// Test 7: Multiple constraint groups on one task
tap.test('should apply multiple constraint groups to one task', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const globalConstraint = new taskbuffer.TaskConstraintGroup({
name: 'global',
maxConcurrent: 3,
constraintKeyForExecution: () => 'all',
});
const groupConstraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({
name: 'group',
maxConcurrent: 1,
constraintKeyForExecution: (task) => task.data.group,
});
manager.addConstraintGroup(globalConstraint);
manager.addConstraintGroup(groupConstraint);
const makeTask = (name: string, group: string) =>
new taskbuffer.Task<undefined, [], { group: string }>({
name,
data: { group },
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
// Same group - should be serialized by group constraint
const t1 = makeTask('multi-1', 'A');
const t2 = makeTask('multi-2', 'A');
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
]);
// With group maxConcurrent: 1, only 1 should run at a time
expect(maxRunning).toBeLessThanOrEqual(1);
await manager.stop();
});
// Test 8: Matcher returns null — task runs unconstrained
tap.test('should run task unconstrained when matcher returns null', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup<{ skip: boolean }>({
name: 'selective',
maxConcurrent: 1,
constraintKeyForExecution: (task) => (task.data.skip ? null : 'constrained'),
});
manager.addConstraintGroup(constraint);
let unconstrained = false;
const task = new taskbuffer.Task<undefined, [], { skip: boolean }>({
name: 'skip-task',
data: { skip: true },
taskFunction: async () => {
unconstrained = true;
},
});
manager.addTask(task);
await manager.triggerTaskConstrained(task);
expect(unconstrained).toBeTrue();
await manager.stop();
});
// Test 9: Error handling — failed task releases slot, queue drains
tap.test('should release slot and drain queue when task fails', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({
name: 'error-drain',
maxConcurrent: 1,
constraintKeyForExecution: (task) => task.data.key,
});
manager.addConstraintGroup(constraint);
const failTask = new taskbuffer.Task<undefined, [], { key: string }>({
name: 'fail-task',
data: { key: 'shared' },
catchErrors: true,
taskFunction: async () => {
log.push('fail');
throw new Error('intentional');
},
});
const successTask = new taskbuffer.Task<undefined, [], { key: string }>({
name: 'success-task',
data: { key: 'shared' },
taskFunction: async () => {
log.push('success');
},
});
manager.addTask(failTask);
manager.addTask(successTask);
await Promise.all([
manager.triggerTaskConstrained(failTask),
manager.triggerTaskConstrained(successTask),
]);
expect(log).toContain('fail');
expect(log).toContain('success');
await manager.stop();
});
// Test 10: TaskManager integration — addConstraintGroup + triggerTaskByName
tap.test('should route triggerTaskByName through constraints', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'manager-integration',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'managed-1',
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
const t2 = new taskbuffer.Task({
name: 'managed-2',
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTaskByName('managed-1'),
manager.triggerTaskByName('managed-2'),
]);
expect(maxRunning).toBeLessThanOrEqual(1);
await manager.stop();
});
// Test 11: removeConstraintGroup removes by name
tap.test('should remove a constraint group by name', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'removable',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
expect(manager.constraintGroups.length).toEqual(1);
manager.removeConstraintGroup('removable');
expect(manager.constraintGroups.length).toEqual(0);
await manager.stop();
});
// Test 12: TaskConstraintGroup reset clears state
tap.test('should reset constraint group state', async () => {
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'resettable',
maxConcurrent: 2,
cooldownMs: 1000,
constraintKeyForExecution: () => 'key',
});
// Simulate usage
constraint.acquireSlot('key');
expect(constraint.getRunningCount('key')).toEqual(1);
constraint.releaseSlot('key');
expect(constraint.getCooldownRemaining('key')).toBeGreaterThan(0);
constraint.reset();
expect(constraint.getRunningCount('key')).toEqual(0);
expect(constraint.getCooldownRemaining('key')).toEqual(0);
});
// Test 13: Queued task returns correct result
tap.test('should return correct result from queued tasks', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'return-value-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'ret-1',
taskFunction: async () => {
await smartdelay.delayFor(100);
return 'result-A';
},
});
const t2 = new taskbuffer.Task({
name: 'ret-2',
taskFunction: async () => {
return 'result-B';
},
});
manager.addTask(t1);
manager.addTask(t2);
const [r1, r2] = await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
]);
expect(r1).toEqual('result-A');
expect(r2).toEqual('result-B');
await manager.stop();
});
// Test 14: Error propagation for queued tasks (catchErrors: false)
tap.test('should propagate errors from queued tasks (catchErrors: false)', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'error-propagation',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'err-first',
taskFunction: async () => {
await smartdelay.delayFor(100);
return 'ok';
},
});
const t2 = new taskbuffer.Task({
name: 'err-second',
catchErrors: false,
taskFunction: async () => {
throw new Error('queued-error');
},
});
manager.addTask(t1);
manager.addTask(t2);
const r1Promise = manager.triggerTaskConstrained(t1);
const r2Promise = manager.triggerTaskConstrained(t2);
const r1 = await r1Promise;
expect(r1).toEqual('ok');
let caughtError: Error | null = null;
try {
await r2Promise;
} catch (err) {
caughtError = err as Error;
}
expect(caughtError).toBeTruthy();
expect(caughtError!.message).toEqual('queued-error');
await manager.stop();
});
// Test 15: triggerTask() routes through constraints
tap.test('should route triggerTask() through constraints', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'trigger-task-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task({
name: `tt-${id}`,
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
const t1 = makeTask(1);
const t2 = makeTask(2);
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTask(t1),
manager.triggerTask(t2),
]);
expect(maxRunning).toBeLessThanOrEqual(1);
await manager.stop();
});
// Test 16: addExecuteRemoveTask() routes through constraints
tap.test('should route addExecuteRemoveTask() through constraints', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'add-execute-remove-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task({
name: `aer-${id}`,
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
return `done-${id}`;
},
});
const t1 = makeTask(1);
const t2 = makeTask(2);
const [report1, report2] = await Promise.all([
manager.addExecuteRemoveTask(t1),
manager.addExecuteRemoveTask(t2),
]);
expect(maxRunning).toBeLessThanOrEqual(1);
expect(report1.result).toEqual('done-1');
expect(report2.result).toEqual('done-2');
await manager.stop();
});
// Test 17: FIFO ordering of queued tasks
tap.test('should execute queued tasks in FIFO order', async () => {
const manager = new taskbuffer.TaskManager();
const executionOrder: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'fifo-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: string) =>
new taskbuffer.Task({
name: `fifo-${id}`,
taskFunction: async () => {
executionOrder.push(id);
await smartdelay.delayFor(50);
},
});
const tA = makeTask('A');
const tB = makeTask('B');
const tC = makeTask('C');
manager.addTask(tA);
manager.addTask(tB);
manager.addTask(tC);
await Promise.all([
manager.triggerTaskConstrained(tA),
manager.triggerTaskConstrained(tB),
manager.triggerTaskConstrained(tC),
]);
expect(executionOrder).toEqual(['A', 'B', 'C']);
await manager.stop();
});
// Test 18: Combined concurrency + cooldown
tap.test('should enforce both concurrency and cooldown together', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const timestamps: number[] = [];
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'combined-test',
maxConcurrent: 2,
cooldownMs: 200,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task({
name: `combo-${id}`,
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
timestamps.push(Date.now());
await smartdelay.delayFor(100);
running--;
},
});
const tasks = [makeTask(1), makeTask(2), makeTask(3), makeTask(4)];
for (const t of tasks) {
manager.addTask(t);
}
await Promise.all(tasks.map((t) => manager.triggerTaskConstrained(t)));
// Concurrency never exceeded 2
expect(maxRunning).toBeLessThanOrEqual(2);
// First 2 tasks start nearly together, 3rd task starts after first batch completes + cooldown
// First batch completes ~100ms after start, then 200ms cooldown
const gap = timestamps[2] - timestamps[0];
expect(gap).toBeGreaterThanOrEqual(250); // 100ms task + 200ms cooldown - 50ms tolerance
await manager.stop();
});
// Test 19: Constraint removal unblocks queued tasks
tap.test('should unblock queued tasks when constraint group is removed', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'removable-constraint',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'block-1',
taskFunction: async () => {
log.push('t1-start');
// Remove constraint while t1 is running so t2 runs unconstrained after drain
manager.removeConstraintGroup('removable-constraint');
await smartdelay.delayFor(100);
log.push('t1-end');
},
});
const t2 = new taskbuffer.Task({
name: 'block-2',
taskFunction: async () => {
log.push('t2-start');
log.push('t2-end');
},
});
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
]);
// Both tasks completed (drain didn't deadlock after constraint removal)
expect(log).toContain('t1-start');
expect(log).toContain('t1-end');
expect(log).toContain('t2-start');
expect(log).toContain('t2-end');
// t2 started after t1 completed (drain fires after t1 finishes)
const t1EndIdx = log.indexOf('t1-end');
const t2StartIdx = log.indexOf('t2-start');
expect(t2StartIdx).toBeGreaterThanOrEqual(t1EndIdx);
await manager.stop();
});
// Test 20: Intra-task concurrency by input — same task, different inputs, key extracts TLD
tap.test('should serialize same-TLD inputs and parallelize different-TLD inputs', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'tld-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (_task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
});
manager.addConstraintGroup(constraint);
const getCert = new taskbuffer.Task({
name: 'get-cert',
taskFunction: async (domain: string) => {
log.push(`${domain}-start`);
await smartdelay.delayFor(100);
log.push(`${domain}-end`);
},
});
manager.addTask(getCert);
await Promise.all([
manager.triggerTaskConstrained(getCert, 'a.example.com'),
manager.triggerTaskConstrained(getCert, 'b.example.com'),
manager.triggerTaskConstrained(getCert, 'c.other.org'),
]);
// a.example.com and b.example.com share TLD "example.com" → serialized
const aEndIdx = log.indexOf('a.example.com-end');
const bStartIdx = log.indexOf('b.example.com-start');
expect(bStartIdx).toBeGreaterThanOrEqual(aEndIdx);
// c.other.org has different TLD → runs in parallel with a.example.com
const aStartIdx = log.indexOf('a.example.com-start');
const cStartIdx = log.indexOf('c.other.org-start');
expect(cStartIdx).toBeLessThan(aEndIdx);
await manager.stop();
});
// Test 21: shouldExecute skips queued task based on external state
tap.test('should skip queued task when shouldExecute returns false', async () => {
const manager = new taskbuffer.TaskManager();
const execLog: string[] = [];
const certCache = new Map<string, string>();
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'cert-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (_task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
shouldExecute: (_task, input?: string) => {
if (!input) return true;
return certCache.get(extractTLD(input)) !== 'wildcard';
},
});
manager.addConstraintGroup(constraint);
const getCert = new taskbuffer.Task({
name: 'get-cert-skip',
taskFunction: async (domain: string) => {
execLog.push(domain);
// First execution sets wildcard in cache
certCache.set(extractTLD(domain), 'wildcard');
await smartdelay.delayFor(100);
return `cert-for-${domain}`;
},
});
manager.addTask(getCert);
const [r1, r2] = await Promise.all([
manager.triggerTaskConstrained(getCert, 'app.example.com'),
manager.triggerTaskConstrained(getCert, 'api.example.com'),
]);
// First ran, second was skipped
expect(execLog).toEqual(['app.example.com']);
expect(r1).toEqual('cert-for-app.example.com');
expect(r2).toEqual(undefined);
await manager.stop();
});
// Test 22: shouldExecute on immediate (non-queued) trigger
tap.test('should skip immediate trigger when shouldExecute returns false', async () => {
const manager = new taskbuffer.TaskManager();
let executed = false;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'always-skip',
maxConcurrent: 10,
constraintKeyForExecution: () => 'all',
shouldExecute: () => false,
});
manager.addConstraintGroup(constraint);
const task = new taskbuffer.Task({
name: 'skip-immediate',
taskFunction: async () => {
executed = true;
return 'should-not-see';
},
});
manager.addTask(task);
const result = await manager.triggerTaskConstrained(task);
expect(executed).toBeFalse();
expect(result).toEqual(undefined);
await manager.stop();
});
// Test 23: Mixed task.data + input constraint key
tap.test('should use both task.data and input in constraint key', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup<{ provider: string }>({
name: 'provider-domain',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
return `${task.data.provider}:${input || 'default'}`;
},
});
manager.addConstraintGroup(constraint);
const makeTask = (name: string, provider: string) =>
new taskbuffer.Task<undefined, [], { provider: string }>({
name,
data: { provider },
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
// Same provider + same domain input → should serialize
const t1 = makeTask('mixed-1', 'acme');
const t2 = makeTask('mixed-2', 'acme');
// Different provider + same domain → parallel
const t3 = makeTask('mixed-3', 'cloudflare');
manager.addTask(t1);
manager.addTask(t2);
manager.addTask(t3);
await Promise.all([
manager.triggerTaskConstrained(t1, 'example.com'),
manager.triggerTaskConstrained(t2, 'example.com'),
manager.triggerTaskConstrained(t3, 'example.com'),
]);
// t1 and t2 share key "acme:example.com" → serialized (max 1 at a time)
// t3 has key "cloudflare:example.com" → parallel with t1
// So maxRunning should be exactly 2 (t1 + t3, or t3 + t2)
expect(maxRunning).toBeLessThanOrEqual(2);
expect(maxRunning).toBeGreaterThanOrEqual(2);
await manager.stop();
});
export default tap.start();

View File

@@ -1,34 +0,0 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
let testTaskRunner: taskbuffer.TaskRunner;
tap.test('should create a valid taskrunner', async () => {
testTaskRunner = new taskbuffer.TaskRunner();
await testTaskRunner.start();
});
tap.test('should execute task when its scheduled', async (tools) => {
const done = tools.defer();
testTaskRunner.addTask(
new taskbuffer.Task({
taskFunction: async () => {
console.log('hi');
},
}),
);
testTaskRunner.addTask(
new taskbuffer.Task({
taskFunction: async () => {
console.log('there');
done.resolve();
},
}),
);
await done.promise;
});
tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.2.0',
version: '6.0.1',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -4,15 +4,15 @@ export { Taskchain } from './taskbuffer.classes.taskchain.js';
export { Taskparallel } from './taskbuffer.classes.taskparallel.js';
export { TaskManager } from './taskbuffer.classes.taskmanager.js';
export { TaskOnce } from './taskbuffer.classes.taskonce.js';
export { TaskRunner } from './taskbuffer.classes.taskrunner.js';
export { TaskDebounced } from './taskbuffer.classes.taskdebounced.js';
export { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
// Task step system
export { TaskStep } from './taskbuffer.classes.taskstep.js';
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
// Metadata interfaces
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js';
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution } from './taskbuffer.interfaces.js';
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
export { distributedCoordination };

View File

@@ -6,7 +6,7 @@ export class BufferRunner {
// initialize by default
public bufferCounter: number = 0;
constructor(taskArg: Task<any>) {
constructor(taskArg: Task<any, any, any>) {
this.task = taskArg;
}

View File

@@ -9,7 +9,7 @@ export interface ICycleObject {
export class CycleCounter {
public task: Task;
public cycleObjectArray: ICycleObject[] = [];
constructor(taskArg: Task<any>) {
constructor(taskArg: Task<any, any, any>) {
this.task = taskArg;
}
public getPromiseForCycle(cycleCountArg: number) {

View File

@@ -19,18 +19,18 @@ export type TPreOrAfterTaskFunction = () => Task<any>;
// Type helper to extract step names from array
export type StepNames<T> = T extends ReadonlyArray<{ name: infer N }> ? N : never;
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []> {
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = [], TData extends Record<string, unknown> = Record<string, unknown>> {
public static extractTask<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
preOrAfterTaskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
): Task<T, TSteps> {
preOrAfterTaskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
): Task<T, TSteps, any> {
switch (true) {
case !preOrAfterTaskArg:
return null;
case preOrAfterTaskArg instanceof Task:
return preOrAfterTaskArg as Task<T, TSteps>;
return preOrAfterTaskArg as Task<T, TSteps, any>;
case typeof preOrAfterTaskArg === 'function':
const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction;
return taskFunction() as unknown as Task<T, TSteps>;
return taskFunction() as unknown as Task<T, TSteps, any>;
default:
return null;
}
@@ -42,7 +42,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
return done.promise;
};
public static isTask = (taskArg: Task<any>): boolean => {
public static isTask = (taskArg: Task<any, any, any>): boolean => {
if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') {
return true;
} else {
@@ -51,8 +51,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
};
public static isTaskTouched<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T, TSteps>[],
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T, TSteps, any>[],
): boolean {
const taskToCheck = Task.extractTask(taskArg);
let result = false;
@@ -65,25 +65,16 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
}
public static runTask = async <T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps>[] },
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps, any>[] },
) => {
const taskToRun = Task.extractTask(taskArg);
const done = plugins.smartpromise.defer();
// Wait for all blocking tasks to finish
for (const task of taskToRun.blockingTasks) {
await task.finished;
}
if (!taskToRun.setupValue && taskToRun.taskSetup) {
taskToRun.setupValue = await taskToRun.taskSetup();
}
if (taskToRun.execDelay) {
await plugins.smartdelay.delayFor(taskToRun.execDelay);
}
taskToRun.running = true;
taskToRun.runCount++;
taskToRun.lastRun = new Date();
@@ -100,26 +91,10 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
// Complete all steps when task finishes
taskToRun.completeAllSteps();
taskToRun.emitEvent(taskToRun.lastError ? 'failed' : 'completed');
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
})
.catch((err) => {
taskToRun.running = false;
taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) });
// Resolve finished so blocking dependants don't hang
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
});
const options = {
@@ -127,7 +102,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
...optionsArg,
};
const x = options.x;
const touchedTasksArray: Task<T, TSteps>[] = options.touchedTasksArray;
const touchedTasksArray: Task<T, TSteps, any>[] = options.touchedTasksArray;
touchedTasksArray.push(taskToRun);
@@ -198,18 +173,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
public cronJob: plugins.smarttime.CronJob;
public bufferMax: number;
public execDelay: number;
public timeout: number;
public preTask: Task<T, any> | TPreOrAfterTaskFunction;
public afterTask: Task<T, any> | TPreOrAfterTaskFunction;
public data: TData;
// Add a list to store the blocking tasks
public blockingTasks: Task[] = [];
// Add a promise that will resolve when the task has finished
private finished: Promise<void>;
private resolveFinished: () => void;
public preTask: Task<T, any, any> | TPreOrAfterTaskFunction;
public afterTask: Task<T, any, any> | TPreOrAfterTaskFunction;
public running: boolean = false;
public bufferRunner = new BufferRunner(this);
@@ -275,12 +244,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
constructor(optionsArg: {
taskFunction: ITaskFunction<T>;
preTask?: Task<T, any> | TPreOrAfterTaskFunction;
afterTask?: Task<T, any> | TPreOrAfterTaskFunction;
preTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
afterTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
buffered?: boolean;
bufferMax?: number;
execDelay?: number;
name?: string;
data?: TData;
taskSetup?: ITaskSetupFunction<T>;
steps?: TSteps;
catchErrors?: boolean;
@@ -291,8 +260,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.afterTask = optionsArg.afterTask;
this.buffered = optionsArg.buffered;
this.bufferMax = optionsArg.bufferMax;
this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name;
this.data = optionsArg.data ?? ({} as TData);
this.taskSetup = optionsArg.taskSetup;
this.catchErrors = optionsArg.catchErrors ?? false;
this.labels = optionsArg.labels ? { ...optionsArg.labels } : {};
@@ -309,11 +278,6 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.steps.set(stepConfig.name, step);
}
}
// Create the finished promise
this.finished = new Promise((resolve) => {
this.resolveFinished = resolve;
});
}
public trigger(x?: any): Promise<any> {

View File

@@ -0,0 +1,89 @@
import type { Task } from './taskbuffer.classes.task.js';
import type { ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js';
export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<string, unknown>> {
public name: string;
public maxConcurrent: number;
public cooldownMs: number;
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
private runningCounts = new Map<string, number>();
private lastCompletionTimes = new Map<string, number>();
constructor(options: ITaskConstraintGroupOptions<TData>) {
this.name = options.name;
this.constraintKeyForExecution = options.constraintKeyForExecution;
this.maxConcurrent = options.maxConcurrent ?? Infinity;
this.cooldownMs = options.cooldownMs ?? 0;
this.shouldExecuteFn = options.shouldExecute;
}
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
const key = this.constraintKeyForExecution(task, input);
return key ?? null;
}
public async checkShouldExecute(task: Task<any, any, TData>, input?: any): Promise<boolean> {
if (!this.shouldExecuteFn) {
return true;
}
return this.shouldExecuteFn(task, input);
}
public canRun(subGroupKey: string): boolean {
const running = this.runningCounts.get(subGroupKey) ?? 0;
if (running >= this.maxConcurrent) {
return false;
}
if (this.cooldownMs > 0) {
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
if (lastCompletion !== undefined) {
const elapsed = Date.now() - lastCompletion;
if (elapsed < this.cooldownMs) {
return false;
}
}
}
return true;
}
public acquireSlot(subGroupKey: string): void {
const current = this.runningCounts.get(subGroupKey) ?? 0;
this.runningCounts.set(subGroupKey, current + 1);
}
public releaseSlot(subGroupKey: string): void {
const current = this.runningCounts.get(subGroupKey) ?? 0;
const next = Math.max(0, current - 1);
if (next === 0) {
this.runningCounts.delete(subGroupKey);
} else {
this.runningCounts.set(subGroupKey, next);
}
this.lastCompletionTimes.set(subGroupKey, Date.now());
}
public getCooldownRemaining(subGroupKey: string): number {
if (this.cooldownMs <= 0) {
return 0;
}
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
if (lastCompletion === undefined) {
return 0;
}
const elapsed = Date.now() - lastCompletion;
return Math.max(0, this.cooldownMs - elapsed);
}
public getRunningCount(subGroupKey: string): number {
return this.runningCounts.get(subGroupKey) ?? 0;
}
public reset(): void {
this.runningCounts.clear();
this.lastCompletionTimes.clear();
}
}

View File

@@ -1,10 +1,11 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
import {
AbstractDistributedCoordinator,
type IDistributedTaskRequestResult,
} from './taskbuffer.classes.distributedcoordinator.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
export interface ICronJob {
@@ -19,23 +20,28 @@ export interface ITaskManagerConstructorOptions {
export class TaskManager {
public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
public taskMap = new plugins.lik.ObjectMap<Task<any, any, any>>();
public readonly taskSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
private taskSubscriptions = new Map<Task<any, any>, plugins.smartrx.rxjs.Subscription>();
private taskSubscriptions = new Map<Task<any, any, any>, plugins.smartrx.rxjs.Subscription>();
private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null,
};
// Constraint system
public constraintGroups: TaskConstraintGroup<any>[] = [];
private constraintQueue: IConstrainedTaskEntry[] = [];
private drainTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, options);
}
public getTaskByName(taskName: string): Task<any, any> {
public getTaskByName(taskName: string): Task<any, any, any> {
return this.taskMap.findSync((task) => task.name === taskName);
}
public addTask(task: Task<any, any>): void {
public addTask(task: Task<any, any, any>): void {
if (!task.name) {
throw new Error('Task must have a name to be added to taskManager');
}
@@ -46,7 +52,7 @@ export class TaskManager {
this.taskSubscriptions.set(task, subscription);
}
public removeTask(task: Task<any, any>): void {
public removeTask(task: Task<any, any, any>): void {
this.taskMap.remove(task);
const subscription = this.taskSubscriptions.get(task);
if (subscription) {
@@ -55,21 +61,170 @@ export class TaskManager {
}
}
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
public addAndScheduleTask(task: Task<any, any, any>, cronString: string) {
this.addTask(task);
this.scheduleTaskByName(task.name, cronString);
}
// Constraint group management
public addConstraintGroup(group: TaskConstraintGroup<any>): void {
this.constraintGroups.push(group);
}
public removeConstraintGroup(name: string): void {
this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name);
}
// Core constraint evaluation
public async triggerTaskConstrained(task: Task<any, any, any>, input?: any): Promise<any> {
// Gather applicable constraints
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(task, input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply → check shouldExecute then trigger directly
if (applicableGroups.length === 0) {
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
return undefined;
}
return task.trigger(input);
}
// Check if all constraints allow running
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
return this.executeWithConstraintTracking(task, input, applicableGroups);
}
// Blocked → enqueue with deferred promise and cached constraint keys
const deferred = plugins.smartpromise.defer<any>();
const constraintKeys = new Map<string, string>();
for (const { group, key } of applicableGroups) {
constraintKeys.set(group.name, key);
}
this.constraintQueue.push({ task, input, deferred, constraintKeys });
return deferred.promise;
}
private async checkAllShouldExecute(task: Task<any, any, any>, input?: any): Promise<boolean> {
for (const group of this.constraintGroups) {
const shouldRun = await group.checkShouldExecute(task, input);
if (!shouldRun) {
return false;
}
}
return true;
}
private async executeWithConstraintTracking(
task: Task<any, any, any>,
input: any,
groups: Array<{ group: TaskConstraintGroup<any>; key: string }>,
): Promise<any> {
// Acquire slots synchronously to prevent race conditions
for (const { group, key } of groups) {
group.acquireSlot(key);
}
// Check shouldExecute after acquiring slots
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
// Release slots and drain queue
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
return undefined;
}
try {
return await task.trigger(input);
} finally {
// Release slots
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
}
}
private drainConstraintQueue(): void {
let shortestCooldown = Infinity;
const stillQueued: IConstrainedTaskEntry[] = [];
for (const entry of this.constraintQueue) {
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(entry.task, entry.input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply anymore (group removed?) → check shouldExecute then run
if (applicableGroups.length === 0) {
this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => {
if (!shouldRun) {
entry.deferred.resolve(undefined);
return;
}
entry.task.trigger(entry.input).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
});
continue;
}
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
// executeWithConstraintTracking handles shouldExecute check internally
this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
} else {
stillQueued.push(entry);
// Track shortest cooldown for timer scheduling
for (const { group, key } of applicableGroups) {
const remaining = group.getCooldownRemaining(key);
if (remaining > 0 && remaining < shortestCooldown) {
shortestCooldown = remaining;
}
}
}
}
this.constraintQueue = stillQueued;
// Schedule next drain if there are cooldown-blocked entries
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
if (stillQueued.length > 0 && shortestCooldown < Infinity) {
this.drainTimer = setTimeout(() => {
this.drainTimer = null;
this.drainConstraintQueue();
}, shortestCooldown + 1);
}
}
public async triggerTaskByName(taskName: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskName);
if (!taskToTrigger) {
throw new Error(`No task with the name ${taskName} found.`);
}
return taskToTrigger.trigger();
return this.triggerTaskConstrained(taskToTrigger);
}
public async triggerTask(task: Task<any, any>) {
return task.trigger();
public async triggerTask(task: Task<any, any, any>) {
return this.triggerTaskConstrained(task);
}
public scheduleTaskByName(taskName: string, cronString: string) {
@@ -80,7 +235,7 @@ export class TaskManager {
this.handleTaskScheduling(taskToSchedule, cronString);
}
private handleTaskScheduling(task: Task<any, any>, cronString: string) {
private handleTaskScheduling(task: Task<any, any, any>, cronString: string) {
const cronJob = this.cronJobManager.addCronjob(
cronString,
async (triggerTime: number) => {
@@ -98,7 +253,7 @@ export class TaskManager {
}
}
try {
await task.trigger();
await this.triggerTaskConstrained(task);
} catch (err) {
logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
@@ -107,7 +262,7 @@ export class TaskManager {
task.cronJob = cronJob;
}
private logTaskState(task: Task<any, any>) {
private logTaskState(task: Task<any, any, any>) {
logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
@@ -116,7 +271,7 @@ export class TaskManager {
}
private async performDistributedConsultation(
task: Task<any, any>,
task: Task<any, any, any>,
triggerTime: number,
): Promise<IDistributedTaskRequestResult> {
logger.log('info', 'Found a distributed coordinator, performing consultation.');
@@ -144,7 +299,7 @@ export class TaskManager {
}
}
public async descheduleTask(task: Task<any, any>) {
public async descheduleTask(task: Task<any, any, any>) {
await this.descheduleTaskByName(task.name);
}
@@ -169,6 +324,10 @@ export class TaskManager {
subscription.unsubscribe();
}
this.taskSubscriptions.clear();
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
}
// Get metadata for a specific task
@@ -186,7 +345,7 @@ export class TaskManager {
// Get scheduled tasks with their schedules and next run times
public getScheduledTasks(): IScheduledTaskInfo[] {
const scheduledTasks: IScheduledTaskInfo[] = [];
for (const task of this.taskMap.getArray()) {
if (task.cronJob) {
scheduledTasks.push({
@@ -199,7 +358,7 @@ export class TaskManager {
});
}
}
return scheduledTasks;
}
@@ -213,11 +372,11 @@ export class TaskManager {
}))
.sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime())
.slice(0, limit);
return scheduledRuns;
}
public getTasksByLabel(key: string, value: string): Task<any, any>[] {
public getTasksByLabel(key: string, value: string): Task<any, any, any>[] {
return this.taskMap.getArray().filter(task => task.labels[key] === value);
}
@@ -227,7 +386,7 @@ export class TaskManager {
// Add, execute, and remove a task while collecting metadata
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
task: Task<T, TSteps>,
task: Task<T, TSteps, any>,
options?: {
schedule?: string;
trackProgress?: boolean;
@@ -235,19 +394,18 @@ export class TaskManager {
): Promise<ITaskExecutionReport> {
// Add task to manager
this.addTask(task);
// Optionally schedule it
if (options?.schedule) {
this.scheduleTaskByName(task.name!, options.schedule);
}
const startTime = Date.now();
const progressUpdates: Array<{ stepName: string; timestamp: number }> = [];
try {
// Execute the task
const result = await task.trigger();
// Execute the task through constraints
const result = await this.triggerTaskConstrained(task);
// Collect execution report
const report: ITaskExecutionReport = {
taskName: task.name || 'unnamed',
@@ -261,15 +419,15 @@ export class TaskManager {
progress: task.getProgress(),
result,
};
// Remove task from manager
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
return report;
} catch (error) {
// Create error report
@@ -285,15 +443,15 @@ export class TaskManager {
progress: task.getProgress(),
error: error as Error,
};
// Remove task from manager even on error
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
throw errorReport;
}
}

View File

@@ -1,69 +0,0 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { logger } from './taskbuffer.logging.js';
export class TaskRunner {
public maxParallelJobs: number = 1;
public status: 'stopped' | 'running' = 'stopped';
public runningTasks: plugins.lik.ObjectMap<Task> =
new plugins.lik.ObjectMap<Task>();
public queuedTasks: Task[] = [];
constructor() {
this.runningTasks.eventSubject.subscribe(async (eventArg) => {
this.checkExecution();
});
}
/**
* adds a task to the queue
*/
public addTask(taskArg: Task) {
this.queuedTasks.push(taskArg);
this.checkExecution();
}
/**
* set amount of parallel tasks
* be careful, you might lose dependability of tasks
*/
public setMaxParallelJobs(maxParallelJobsArg: number) {
this.maxParallelJobs = maxParallelJobsArg;
}
/**
* starts the task queue
*/
public async start() {
this.status = 'running';
}
/**
* checks whether execution is on point
*/
public async checkExecution() {
if (
this.runningTasks.getArray().length < this.maxParallelJobs &&
this.status === 'running' &&
this.queuedTasks.length > 0
) {
const nextJob = this.queuedTasks.shift();
this.runningTasks.add(nextJob);
try {
await nextJob.trigger();
} catch (err) {
logger.log('error', `TaskRunner: task "${nextJob.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
this.runningTasks.remove(nextJob);
this.checkExecution();
}
}
/**
* stops the task queue
*/
public async stop() {
this.status = 'stopped';
}
}

View File

@@ -1,4 +1,25 @@
import type { ITaskStep } from './taskbuffer.classes.taskstep.js';
import type { Task } from './taskbuffer.classes.task.js';
export interface ITaskConstraintGroupOptions<TData extends Record<string, unknown> = Record<string, unknown>> {
name: string;
constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
maxConcurrent?: number; // default: Infinity
cooldownMs?: number; // default: 0
shouldExecute?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
}
export interface ITaskExecution<TData extends Record<string, unknown> = Record<string, unknown>> {
task: Task<any, any, TData>;
input: any;
}
export interface IConstrainedTaskEntry {
task: Task<any, any, any>;
input: any;
deferred: import('@push.rocks/smartpromise').Deferred<any>;
constraintKeys: Map<string, string>; // groupName -> key
}
export interface ITaskMetadata {
name: string;

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.2.0',
version: '6.0.1',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}