Compare commits

...

10 Commits

23 changed files with 3614 additions and 3080 deletions

View File

@@ -1,5 +1,48 @@
# Changelog
## 2026-02-15 - 6.0.0 - BREAKING CHANGE(constraints)
make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook
- Rename ITaskConstraintGroupOptions.constraintKeyForTask -> constraintKeyForExecution(task, input?) and update TaskConstraintGroup.getConstraintKey signature
- Add optional shouldExecute(task, input?) hook; TaskManager checks shouldExecute before immediate runs, after acquiring slots, and when draining the constraint queue (queued tasks are skipped when shouldExecute returns false)
- Export ITaskExecution type and store constraintKeys on queued entries (IConstrainedTaskEntry.constraintKeys)
- Documentation and tests updated to demonstrate input-aware constraint keys and shouldExecute pruning
## 2026-02-15 - 5.0.1 - fix(tests)
add and tighten constraint-related tests covering return values, error propagation, concurrency, cooldown timing, and constraint removal
- Tightened cooldown timing assertion from >=100ms to >=250ms to reflect 300ms cooldown with 50ms tolerance.
- Added tests for queued task return values, error propagation when catchErrors is false, and error swallowing behavior when catchErrors is true.
- Added concurrency and cooldown interaction tests to ensure maxConcurrent is respected and batch timing is correct.
- Added test verifying removing a constraint group unblocks queued tasks and drain behavior completes correctly.
## 2026-02-15 - 5.0.0 - BREAKING CHANGE(taskbuffer)
Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests.
- Add TaskConstraintGroup class with per-key maxConcurrent, cooldownMs, and helper methods (canRun, acquireSlot, releaseSlot, getCooldownRemaining, getRunningCount, reset).
- Task generic signature extended to Task<T, TSteps, TData> and a new typed data property (data) with default {}.
- TaskManager now supports addConstraintGroup/removeConstraintGroup, triggerTaskConstrained, queues blocked tasks, drains queue with cooldown timers, and routes triggerTask/triggerTaskByName through the constraint system.
- Removed TaskRunner, plus Task APIs: blockingTasks, execDelay, finished promise and associated behavior have been removed (breaking changes).
- Exports and interfaces updated: TaskConstraintGroup and ITaskConstraintGroupOptions added; TaskRunner removed from public API.
- Updated README and added comprehensive tests for constraint behavior; adjusted other tests to remove TaskRunner usage and reflect new APIs.
## 2026-02-15 - 4.2.1 - fix(deps)
bump @push.rocks/smartlog and @types/node; update dependency list version and license link in docs
- package.json: update @push.rocks/smartlog from ^3.1.10 to ^3.1.11
- package.json: update @types/node from ^25.1.0 to ^25.2.3
- readme.hints.md: update 'Dependencies (as of v4.1.1)' to 'Dependencies (as of v4.2.0)' and reflect bumped dependency versions
- readme.md: change license link text to '[LICENSE](./license.md)'
## 2026-01-29 - 4.2.0 - feat(ts_web)
support TC39 'accessor' decorators for web components; bump dependencies and devDependencies; rename browser tests to .chromium.ts; move LICENSE to license.md and update readme
- Convert web component class fields to use the TC39 'accessor' keyword in ts_web/taskbuffer-dashboard.ts to be compatible with @design.estate/dees-element v2.1.6
- Bump @design.estate/dees-element to ^2.1.6 and update devDependencies (@git.zone/tsbuild, @git.zone/tsbundle, @git.zone/tsrun, @git.zone/tstest, @types/node) to newer versions
- Replace test/test.10.webcomponent.browser.ts with test/test.10.webcomponent.chromium.ts and update testing guidance in readme.hints.md to prefer .chromium.ts
- Move LICENSE file content to license.md and update readme.md to reference the new license file
- Small test cleanups: remove obsolete tslint:disable comments
## 2026-01-26 - 4.1.1 - fix(ts_web)
fix web dashboard typings and update generated commit info

View File

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/taskbuffer",
"version": "4.1.1",
"version": "6.0.0",
"private": false,
"description": "A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.",
"main": "dist_ts/index.js",
@@ -27,28 +27,28 @@
"debounced tasks",
"distributed coordination"
],
"author": "Lossless GmbH",
"author": "Task Venture Capital GmbH <hello@task.vc>",
"license": "MIT",
"bugs": {
"url": "https://code.foss.global/push.rocks/taskbuffer/issues"
},
"homepage": "https://code.foss.global/push.rocks/taskbuffer#readme",
"dependencies": {
"@design.estate/dees-element": "^2.1.3",
"@design.estate/dees-element": "^2.1.6",
"@push.rocks/lik": "^6.2.2",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartlog": "^3.1.10",
"@push.rocks/smartlog": "^3.1.11",
"@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smarttime": "^4.1.1",
"@push.rocks/smartunique": "^3.0.9"
},
"devDependencies": {
"@git.zone/tsbuild": "^3.1.2",
"@git.zone/tsbundle": "^2.6.3",
"@git.zone/tsrun": "^2.0.0",
"@git.zone/tstest": "^3.1.3",
"@types/node": "^24.10.1"
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbundle": "^2.8.3",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tstest": "^3.1.8",
"@types/node": "^25.2.3"
},
"files": [
"ts/**/*",

4750
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +1,44 @@
# Taskbuffer Hints
## Task Constraint System (v5.0.0+) — Breaking Changes
- **`TaskRunner` removed** — replaced by `TaskManager` + `TaskConstraintGroup`
- **`blockingTasks` removed** from `Task` — use `TaskConstraintGroup` with `maxConcurrent: 1`
- **`execDelay` removed** from `Task` — use `TaskConstraintGroup` with `cooldownMs`
- **`finished` promise removed** from `Task` — no longer needed
- **`Task` generic signature**: `Task<T, TSteps, TData>` (3rd param added for typed data)
### Task.data
- `Task` constructor accepts optional `data?: TData` (defaults to `{}`)
- Typed data bag accessible as `task.data`
### TaskConstraintGroup
- `new TaskConstraintGroup<TData>({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute? })`
- `constraintKeyForExecution(task, input?)` returns a string key (constraint applies) or `null` (skip). Receives both task and runtime input.
- `shouldExecute(task, input?)` — optional pre-execution check. Returns `false` to skip (deferred resolves `undefined`). Can be async.
- `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key
- `cooldownMs` (default: `0`) — minimum ms gap between completions per key
- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRunningCount(key)`, `reset()`
- `ITaskExecution<TData>` type exported from index — `{ task, input }` tuple
### TaskManager Constraint Integration
- `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)`
- `triggerTaskByName()`, `triggerTask()`, `addExecuteRemoveTask()`, cron callbacks all route through `triggerTaskConstrained()`
- `triggerTaskConstrained(task, input?)` — evaluates constraints, queues if blocked, drains after completion
- Cooldown-blocked entries auto-drain via timer
### Exported from index.ts
- `TaskConstraintGroup` class
- `ITaskConstraintGroupOptions` type
## Error Handling (v3.6.0+)
- `Task` now has `catchErrors` constructor option (default: `false`)
- Default behavior: `trigger()` rejects when taskFunction throws (breaking change from pre-3.6)
- Set `catchErrors: true` to swallow errors (old behavior) - returns `undefined` on error
- Error state tracked via `lastError?: Error`, `errorCount: number`, `clearError()`
- `getMetadata()` status uses all four values: `'idle'` | `'running'` | `'completed'` | `'failed'`
- All peripheral classes (Taskchain, Taskparallel, TaskRunner, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
- All peripheral classes (Taskchain, Taskparallel, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
- `console.log` calls replaced with `logger.log()` throughout
## Breaking API Rename (TaskRunner)
- `maxParrallelJobs``maxParallelJobs`
- `qeuedTasks``queuedTasks`
- JSDoc typos fixed: "qeue" → "queue", "wether" → "whether", "loose" → "lose"
- The `setMaxParallelJobs()` parameter also renamed from `maxParrallelJobsArg` to `maxParallelJobsArg`
## Error Context Improvements
- **TaskChain**: Errors now wrap the original with context: chain name, failing task name, and task index. Original error preserved via `.cause`
- **BufferRunner**: When `catchErrors: false`, buffered task errors now reject the trigger promise (via `CycleCounter.informOfCycleError`) instead of silently resolving with `undefined`
@@ -36,7 +60,34 @@
## Project Structure
- Source in `ts/`, web components in `ts_web/`
- Tests in `test/` - naming: `*.node.ts`, `*.browser.ts`, `*.both.ts`
- Tests in `test/` - naming: `*.node.ts`, `*.chromium.ts` (preferred), `*.both.ts`
- Note: `*.browser.ts` is deprecated, use `*.chromium.ts` for browser tests
- Logger: `ts/taskbuffer.logging.ts` exports `logger` (ConsoleLog from smartlog)
- Build: `pnpm build` (tsbuild tsfolders)
- Test: `pnpm test` or `tstest test/test.XX.name.ts --verbose`
## Web Components (ts_web/)
- Uses `@design.estate/dees-element` with TC39 decorators
- Decorators require the `accessor` keyword:
```typescript
@property({ type: String })
accessor myProp = 'default';
@state()
accessor count = 0;
```
## Dependencies (as of v4.2.0)
- `@design.estate/dees-element` ^2.1.6 - TC39 decorators with `accessor` keyword
- `@push.rocks/lik` ^6.2.2 - Data structures
- `@push.rocks/smartdelay` ^3.0.5 - Delay utilities
- `@push.rocks/smartlog` ^3.1.11 - Logging
- `@push.rocks/smartpromise` ^4.2.3 - Promise utilities
- `@push.rocks/smartrx` ^3.0.10 - RxJS wrapper
- `@push.rocks/smarttime` ^4.1.1 - Time/cron utilities
- `@push.rocks/smartunique` ^3.0.9 - Unique ID generation
- `@git.zone/tsbuild` ^4.1.2 - Build tool
- `@git.zone/tsbundle` ^2.8.3 - Bundler (for browser tests)
- `@git.zone/tsrun` ^2.0.1 - TypeScript runner
- `@git.zone/tstest` ^3.1.8 - Test runner (supports `.chromium.ts` files)
- `@types/node` ^25.2.3 - Node.js type definitions

375
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/taskbuffer 🚀
> **Modern TypeScript task orchestration with smart buffering, scheduling, labels, and real-time event streaming**
> **Modern TypeScript task orchestration with constraint-based concurrency, smart buffering, scheduling, labels, and real-time event streaming**
[![npm version](https://img.shields.io/npm/v/@push.rocks/taskbuffer.svg)](https://www.npmjs.com/package/@push.rocks/taskbuffer)
[![TypeScript](https://img.shields.io/badge/TypeScript-5.x-blue.svg)](https://www.typescriptlang.org/)
@@ -13,6 +13,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
## 🌟 Features
- **🎯 Type-Safe Task Management** — Full TypeScript support with generics and type inference
- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, and cooldown enforcement via `TaskConstraintGroup`
- **📊 Real-Time Progress Tracking** — Step-based progress with percentage weights
- **⚡ Smart Buffering** — Intelligent request debouncing and batching
- **⏰ Cron Scheduling** — Schedule tasks with cron expressions
@@ -49,6 +50,24 @@ const result = await greetTask.trigger('World');
console.log(result); // "Hello, World!"
```
### Task with Typed Data 📦
Every task can carry a typed data bag — perfect for constraint matching, routing, and metadata:
```typescript
const task = new Task<undefined, [], { domain: string; priority: number }>({
name: 'update-dns',
data: { domain: 'example.com', priority: 1 },
taskFunction: async () => {
// task.data is fully typed here
console.log(`Updating DNS for ${task.data.domain}`);
},
});
task.data.domain; // string — fully typed
task.data.priority; // number — fully typed
```
### Task with Steps & Progress 📊
```typescript
@@ -84,6 +103,225 @@ console.log(deployTask.getStepsMetadata()); // Step details with status
> **Note:** `notifyStep()` is fully type-safe — TypeScript only accepts step names you declared in the `steps` array when you use `as const`.
## 🔒 Task Constraints — Concurrency, Mutual Exclusion & Cooldowns
`TaskConstraintGroup` is the unified mechanism for controlling how tasks run relative to each other. It replaces older patterns like task runners, blocking tasks, and execution delays with a single, composable, key-based constraint system.
### Per-Key Mutual Exclusion
Ensure only one task runs at a time for a given key (e.g. per domain, per tenant, per resource):
```typescript
import { Task, TaskManager, TaskConstraintGroup } from '@push.rocks/taskbuffer';
const manager = new TaskManager();
// Only one DNS update per domain at a time
const domainMutex = new TaskConstraintGroup<{ domain: string }>({
name: 'domain-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?) => task.data.domain,
});
manager.addConstraintGroup(domainMutex);
const task1 = new Task<undefined, [], { domain: string }>({
name: 'update-a.com',
data: { domain: 'a.com' },
taskFunction: async () => { /* update DNS for a.com */ },
});
const task2 = new Task<undefined, [], { domain: string }>({
name: 'update-a.com-2',
data: { domain: 'a.com' },
taskFunction: async () => { /* another update for a.com */ },
});
manager.addTask(task1);
manager.addTask(task2);
// task2 waits until task1 finishes (same domain key)
await Promise.all([
manager.triggerTask(task1),
manager.triggerTask(task2),
]);
```
### Group Concurrency Limits
Cap how many tasks can run concurrently across a group:
```typescript
// Max 3 DNS updaters running globally at once
const dnsLimit = new TaskConstraintGroup<{ group: string }>({
name: 'dns-concurrency',
maxConcurrent: 3,
constraintKeyForExecution: (task) =>
task.data.group === 'dns' ? 'dns' : null, // null = skip constraint
});
manager.addConstraintGroup(dnsLimit);
```
### Cooldowns (Rate Limiting)
Enforce a minimum time gap between consecutive executions for the same key:
```typescript
// No more than one API call per domain every 11 seconds
const rateLimiter = new TaskConstraintGroup<{ domain: string }>({
name: 'api-rate-limit',
maxConcurrent: 1,
cooldownMs: 11000,
constraintKeyForExecution: (task) => task.data.domain,
});
manager.addConstraintGroup(rateLimiter);
```
### Global Concurrency Cap
Limit total concurrent tasks system-wide:
```typescript
const globalCap = new TaskConstraintGroup({
name: 'global-cap',
maxConcurrent: 10,
constraintKeyForExecution: () => 'all', // same key = shared limit
});
manager.addConstraintGroup(globalCap);
```
### Composing Multiple Constraints
Multiple constraint groups stack — a task only runs when **all** applicable constraints allow it:
```typescript
manager.addConstraintGroup(globalCap); // max 10 globally
manager.addConstraintGroup(domainMutex); // max 1 per domain
manager.addConstraintGroup(rateLimiter); // 11s cooldown per domain
// A task must satisfy ALL three constraints before it starts
await manager.triggerTask(dnsTask);
```
### Selective Constraints
Return `null` from `constraintKeyForExecution` to exempt a task from a constraint group:
```typescript
const constraint = new TaskConstraintGroup<{ priority: string }>({
name: 'low-priority-limit',
maxConcurrent: 2,
constraintKeyForExecution: (task) =>
task.data.priority === 'low' ? 'low-priority' : null, // high priority tasks skip this constraint
});
```
### Input-Aware Constraints 🎯
The `constraintKeyForExecution` function receives both the **task** and the **runtime input** passed to `trigger(input)`. This means the same task triggered with different inputs can be constrained independently:
```typescript
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
// Same TLD → serialized. Different TLDs → parallel.
const tldMutex = new TaskConstraintGroup({
name: 'tld-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
if (!input) return null;
return extractTLD(input); // "example.com", "other.org", etc.
},
});
manager.addConstraintGroup(tldMutex);
// These two serialize (same TLD "example.com")
const p1 = manager.triggerTaskConstrained(getCert, 'app.example.com');
const p2 = manager.triggerTaskConstrained(getCert, 'api.example.com');
// This runs in parallel (different TLD "other.org")
const p3 = manager.triggerTaskConstrained(getCert, 'my.other.org');
```
You can also combine `task.data` and `input` for composite keys:
```typescript
const providerDomain = new TaskConstraintGroup<{ provider: string }>({
name: 'provider-domain',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
return `${task.data.provider}:${input || 'default'}`;
},
});
```
### Pre-Execution Check with `shouldExecute` ✅
The `shouldExecute` callback runs right before a queued task executes. If it returns `false`, the task is skipped and its promise resolves with `undefined`. This is perfect for scenarios where a prior execution's outcome makes subsequent queued tasks unnecessary:
```typescript
const certCache = new Map<string, string>();
const certConstraint = new TaskConstraintGroup({
name: 'cert-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
shouldExecute: (task, input?: string) => {
if (!input) return true;
// Skip if a wildcard cert already covers this TLD
return certCache.get(extractTLD(input)) !== 'wildcard';
},
});
const getCert = new Task({
name: 'get-certificate',
taskFunction: async (domain: string) => {
const cert = await acme.getCert(domain);
if (cert.isWildcard) certCache.set(extractTLD(domain), 'wildcard');
return cert;
},
});
manager.addConstraintGroup(certConstraint);
manager.addTask(getCert);
const r1 = manager.triggerTaskConstrained(getCert, 'app.example.com'); // runs, gets wildcard
const r2 = manager.triggerTaskConstrained(getCert, 'api.example.com'); // queued → skipped!
const r3 = manager.triggerTaskConstrained(getCert, 'my.other.org'); // parallel (different TLD)
const [cert1, cert2, cert3] = await Promise.all([r1, r2, r3]);
// cert2 === undefined (skipped because wildcard already covers example.com)
```
**`shouldExecute` semantics:**
- Runs right before execution (after slot acquisition, before `trigger()`)
- Also checked on immediate (non-queued) triggers
- Returns `false` → skip execution, deferred resolves with `undefined`
- Can be async (return `Promise<boolean>`)
- Has closure access to external state modified by prior executions
- If multiple constraint groups have `shouldExecute`, **all** must return `true`
### How It Works
When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager:
1. Evaluates all registered constraint groups against the task and input
2. If no constraints apply (all matchers return `null`) → checks `shouldExecute` → runs or skips
3. If all applicable constraints have capacity → acquires slots → checks `shouldExecute` → runs or skips
4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained
5. Cooldown-blocked tasks auto-retry after the shortest remaining cooldown expires
6. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned
## 🎯 Core Concepts
### Task Buffering — Intelligent Request Management
@@ -95,7 +333,6 @@ const apiTask = new Task({
name: 'APIRequest',
buffered: true,
bufferMax: 5, // Maximum 5 concurrent executions
execDelay: 100, // Minimum 100ms between executions
taskFunction: async (endpoint) => {
return await fetch(endpoint).then((r) => r.json());
},
@@ -156,9 +393,9 @@ console.log(`Saved ${savedCount} items`);
Taskchain also supports dynamic mutation:
```typescript
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
```
Error context is rich — a chain failure includes the chain name, failing task name, task index, and preserves the original error as `.cause`.
@@ -220,27 +457,6 @@ await initTask.trigger(); // No-op
console.log(initTask.hasTriggered); // true
```
### TaskRunner — Managed Queue with Concurrency Control
Process a queue of tasks with a configurable parallelism limit:
```typescript
import { TaskRunner } from '@push.rocks/taskbuffer';
const runner = new TaskRunner();
runner.setMaxParallelJobs(3); // Run up to 3 tasks concurrently
await runner.start();
runner.addTask(taskA);
runner.addTask(taskB);
runner.addTask(taskC);
runner.addTask(taskD); // Queued until a slot opens
// When done:
await runner.stop();
```
## 🏷️ Labels — Multi-Tenant Task Filtering
Attach arbitrary key-value labels to any task for filtering, grouping, or multi-tenant isolation:
@@ -411,6 +627,10 @@ manager.addTask(deployTask);
manager.addAndScheduleTask(backupTask, '0 2 * * *'); // Daily at 2 AM
manager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes
// Register constraint groups
manager.addConstraintGroup(globalCap);
manager.addConstraintGroup(perDomainMutex);
// Query metadata
const meta = manager.getTaskMetadata('Deploy');
console.log(meta);
@@ -433,7 +653,7 @@ const allMeta = manager.getAllTasksMetadata();
const scheduled = manager.getScheduledTasks();
const nextRuns = manager.getNextScheduledRuns(5);
// Trigger by name
// Trigger by name (routes through constraints)
await manager.triggerTaskByName('Deploy');
// One-shot: add, execute, collect report, remove
@@ -462,6 +682,12 @@ manager.removeTask(task); // Removes from map and unsubscribes event forwarding
manager.descheduleTaskByName('Deploy'); // Remove cron schedule only
```
### Remove Constraint Groups
```typescript
manager.removeConstraintGroup('domain-mutex'); // By name
```
## 🎨 Web Component Dashboard
Visualize your tasks in real-time with the included Lit-based web component:
@@ -570,32 +796,6 @@ await task.trigger('SELECT * FROM users'); // Setup runs here
await task.trigger('SELECT * FROM orders'); // Setup skipped, pool reused
```
### Blocking Tasks
Make one task wait for another to finish before executing:
```typescript
const initTask = new Task({
name: 'Init',
taskFunction: async () => {
await initializeSystem();
},
});
const workerTask = new Task({
name: 'Worker',
taskFunction: async () => {
await doWork();
},
});
workerTask.blockingTasks.push(initTask);
// Triggering worker will automatically wait for init to complete
initTask.trigger();
workerTask.trigger(); // Waits until initTask.finished resolves
```
### Database Migration Pipeline
```typescript
@@ -616,15 +816,24 @@ try {
### Multi-Tenant SaaS Monitoring
Combine labels + events for a real-time multi-tenant dashboard:
Combine labels + events + constraints for a real-time multi-tenant system:
```typescript
const manager = new TaskManager();
// Per-tenant concurrency limit
const tenantLimit = new TaskConstraintGroup<{ tenantId: string }>({
name: 'tenant-concurrency',
maxConcurrent: 2,
constraintKeyForExecution: (task, input?) => task.data.tenantId,
});
manager.addConstraintGroup(tenantLimit);
// Create tenant-scoped tasks
function createTenantTask(tenantId: string, taskName: string, fn: () => Promise<any>) {
const task = new Task({
const task = new Task<undefined, [], { tenantId: string }>({
name: `${tenantId}:${taskName}`,
data: { tenantId },
labels: { tenantId },
taskFunction: fn,
});
@@ -653,15 +862,31 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| Class | Description |
| --- | --- |
| `Task<T, TSteps>` | Core task unit with optional step tracking, labels, and event streaming |
| `TaskManager` | Centralized orchestrator with scheduling, label queries, and aggregated events |
| `Task<T, TSteps, TData>` | Core task unit with typed data, optional step tracking, labels, and event streaming |
| `TaskManager` | Centralized orchestrator with constraint groups, scheduling, label queries, and aggregated events |
| `TaskConstraintGroup<TData>` | Concurrency, mutual exclusion, and cooldown constraints with key-based grouping |
| `Taskchain` | Sequential task executor with data flow between tasks |
| `Taskparallel` | Concurrent task executor via `Promise.all()` |
| `TaskOnce` | Single-execution guard |
| `TaskDebounced` | Debounced task using rxjs |
| `TaskRunner` | Sequential queue with configurable parallelism |
| `TaskStep` | Step tracking unit (internal, exposed via metadata) |
### Task Constructor Options
| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `taskFunction` | `ITaskFunction<T>` | *required* | The async function to execute |
| `name` | `string` | — | Task identifier (required for TaskManager) |
| `data` | `TData` | `{}` | Typed data bag for constraint matching and routing |
| `steps` | `ReadonlyArray<{name, description, percentage}>` | — | Step definitions for progress tracking |
| `buffered` | `boolean` | — | Enable request buffering |
| `bufferMax` | `number` | — | Max buffered calls |
| `preTask` | `Task \| () => Task` | — | Task to run before |
| `afterTask` | `Task \| () => Task` | — | Task to run after |
| `taskSetup` | `() => Promise<T>` | — | One-time setup function |
| `catchErrors` | `boolean` | `false` | Swallow errors instead of rejecting |
| `labels` | `Record<string, string>` | `{}` | Initial labels |
### Task Methods
| Method | Returns | Description |
@@ -682,6 +907,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| Property | Type | Description |
| --- | --- | --- |
| `name` | `string` | Task identifier |
| `data` | `TData` | Typed data bag |
| `running` | `boolean` | Whether the task is currently executing |
| `idle` | `boolean` | Inverse of `running` |
| `labels` | `Record<string, string>` | Attached labels |
@@ -690,7 +916,29 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| `errorCount` | `number` | Total error count across all runs |
| `runCount` | `number` | Total execution count |
| `lastRun` | `Date \| undefined` | Timestamp of last execution |
| `blockingTasks` | `Task[]` | Tasks that must finish before this one starts |
### TaskConstraintGroup Constructor Options
| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `name` | `string` | *required* | Constraint group identifier |
| `constraintKeyForExecution` | `(task, input?) => string \| null` | *required* | Returns key for grouping, or `null` to skip. Receives both the task and runtime input. |
| `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key |
| `cooldownMs` | `number` | `0` | Minimum ms between completions per key |
| `shouldExecute` | `(task, input?) => boolean \| Promise<boolean>` | — | Pre-execution check. Return `false` to skip; deferred resolves `undefined`. |
### TaskConstraintGroup Methods
| Method | Returns | Description |
| --- | --- | --- |
| `getConstraintKey(task, input?)` | `string \| null` | Get the constraint key for a task + input |
| `checkShouldExecute(task, input?)` | `Promise<boolean>` | Run the `shouldExecute` callback (defaults to `true`) |
| `canRun(key)` | `boolean` | Check if a slot is available |
| `acquireSlot(key)` | `void` | Claim a running slot |
| `releaseSlot(key)` | `void` | Release a slot and record completion time |
| `getCooldownRemaining(key)` | `number` | Milliseconds until cooldown expires |
| `getRunningCount(key)` | `number` | Current running count for key |
| `reset()` | `void` | Clear all state |
### TaskManager Methods
@@ -699,7 +947,11 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| `addTask(task)` | `void` | Register a task (wires event forwarding) |
| `removeTask(task)` | `void` | Remove task and unsubscribe events |
| `getTaskByName(name)` | `Task \| undefined` | Look up by name |
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name |
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name (routes through constraints) |
| `triggerTask(task)` | `Promise<any>` | Trigger directly (routes through constraints) |
| `triggerTaskConstrained(task, input?)` | `Promise<any>` | Core constraint evaluation entry point |
| `addConstraintGroup(group)` | `void` | Register a constraint group |
| `removeConstraintGroup(name)` | `void` | Remove a constraint group by name |
| `addAndScheduleTask(task, cron)` | `void` | Register + schedule |
| `scheduleTaskByName(name, cron)` | `void` | Schedule existing task |
| `descheduleTaskByName(name)` | `void` | Remove schedule |
@@ -719,6 +971,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| --- | --- | --- |
| `taskSubject` | `Subject<ITaskEvent>` | Aggregated events from all added tasks |
| `taskMap` | `ObjectMap<Task>` | Internal task registry |
| `constraintGroups` | `TaskConstraintGroup[]` | Registered constraint groups |
### Exported Types
@@ -726,11 +979,13 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
import type {
ITaskMetadata,
ITaskExecutionReport,
ITaskExecution,
IScheduledTaskInfo,
ITaskEvent,
TTaskEventType,
ITaskStep,
ITaskFunction,
ITaskConstraintGroupOptions,
StepNames,
} from '@push.rocks/taskbuffer';
```

View File

@@ -137,38 +137,7 @@ tap.test('should reject Taskparallel when a child task throws', async () => {
expect(didReject).toBeTrue();
});
// Test 7: TaskRunner continues processing after a task error
tap.test('should continue TaskRunner queue after a task error', async () => {
const runner = new taskbuffer.TaskRunner();
const executionOrder: string[] = [];
const badTask = new taskbuffer.Task({
name: 'runner-bad-task',
taskFunction: async () => {
executionOrder.push('bad');
throw new Error('runner task failure');
},
});
const goodTask = new taskbuffer.Task({
name: 'runner-good-task',
taskFunction: async () => {
executionOrder.push('good');
},
});
await runner.start();
runner.addTask(badTask);
runner.addTask(goodTask);
// Wait for both tasks to be processed
await smartdelay.delayFor(500);
await runner.stop();
expect(executionOrder).toContain('bad');
expect(executionOrder).toContain('good');
});
// Test 8: BufferRunner handles errors without hanging
// Test 7: BufferRunner handles errors without hanging
tap.test('should handle BufferRunner errors without hanging', async () => {
let callCount = 0;
const bufferedTask = new taskbuffer.Task({

873
test/test.13.constraints.ts Normal file
View File

@@ -0,0 +1,873 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay';
// Test 1: Task data property — typed data accessible
tap.test('should have typed data property on task', async () => {
const task = new taskbuffer.Task<undefined, [], { domain: string; priority: number }>({
name: 'data-task',
data: { domain: 'example.com', priority: 1 },
taskFunction: async () => {},
});
expect(task.data.domain).toEqual('example.com');
expect(task.data.priority).toEqual(1);
});
// Test 2: Task data defaults to empty object
tap.test('should default data to empty object when not provided', async () => {
const task = new taskbuffer.Task({
name: 'no-data-task',
taskFunction: async () => {},
});
expect(task.data).toBeTruthy();
expect(typeof task.data).toEqual('object');
});
// Test 3: No-constraint passthrough — behavior unchanged
tap.test('should run tasks directly when no constraints are configured', async () => {
const manager = new taskbuffer.TaskManager();
let executed = false;
const task = new taskbuffer.Task({
name: 'passthrough-task',
taskFunction: async () => {
executed = true;
return 'done';
},
});
manager.addTask(task);
const result = await manager.triggerTaskByName('passthrough-task');
expect(executed).toBeTrue();
expect(result).toEqual('done');
await manager.stop();
});
// Test 4: Group concurrency — 3 tasks, max 2 concurrent, 3rd queues
tap.test('should enforce group concurrency limit', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({
name: 'concurrency-test',
maxConcurrent: 2,
constraintKeyForExecution: (task) =>
task.data.group === 'workers' ? 'workers' : null,
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task<undefined, [], { group: string }>({
name: `worker-${id}`,
data: { group: 'workers' },
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(200);
running--;
},
});
const t1 = makeTask(1);
const t2 = makeTask(2);
const t3 = makeTask(3);
manager.addTask(t1);
manager.addTask(t2);
manager.addTask(t3);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
manager.triggerTaskConstrained(t3),
]);
expect(maxRunning).toBeLessThanOrEqual(2);
await manager.stop();
});
// Test 5: Key-based mutual exclusion — same key sequential, different keys parallel
tap.test('should enforce key-based mutual exclusion', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup<{ domain: string }>({
name: 'domain-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task) => task.data.domain,
});
manager.addConstraintGroup(constraint);
const makeTask = (name: string, domain: string, delayMs: number) =>
new taskbuffer.Task<undefined, [], { domain: string }>({
name,
data: { domain },
taskFunction: async () => {
log.push(`${name}-start`);
await smartdelay.delayFor(delayMs);
log.push(`${name}-end`);
},
});
const taskA1 = makeTask('a1', 'a.com', 100);
const taskA2 = makeTask('a2', 'a.com', 100);
const taskB1 = makeTask('b1', 'b.com', 100);
manager.addTask(taskA1);
manager.addTask(taskA2);
manager.addTask(taskB1);
await Promise.all([
manager.triggerTaskConstrained(taskA1),
manager.triggerTaskConstrained(taskA2),
manager.triggerTaskConstrained(taskB1),
]);
// a1 and a2 should be sequential (same key)
const a1EndIdx = log.indexOf('a1-end');
const a2StartIdx = log.indexOf('a2-start');
expect(a2StartIdx).toBeGreaterThanOrEqual(a1EndIdx);
// b1 should start concurrently with a1 (different key)
const a1StartIdx = log.indexOf('a1-start');
const b1StartIdx = log.indexOf('b1-start');
// Both should start before a1 ends
expect(b1StartIdx).toBeLessThan(a1EndIdx);
await manager.stop();
});
// Test 6: Cooldown enforcement
tap.test('should enforce cooldown between task executions', async () => {
const manager = new taskbuffer.TaskManager();
const timestamps: number[] = [];
const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({
name: 'cooldown-test',
maxConcurrent: 1,
cooldownMs: 300,
constraintKeyForExecution: (task) => task.data.key,
});
manager.addConstraintGroup(constraint);
const makeTask = (name: string) =>
new taskbuffer.Task<undefined, [], { key: string }>({
name,
data: { key: 'shared' },
taskFunction: async () => {
timestamps.push(Date.now());
},
});
const t1 = makeTask('cool-1');
const t2 = makeTask('cool-2');
const t3 = makeTask('cool-3');
manager.addTask(t1);
manager.addTask(t2);
manager.addTask(t3);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
manager.triggerTaskConstrained(t3),
]);
// Each execution should be at least ~300ms apart (with 200ms tolerance)
for (let i = 1; i < timestamps.length; i++) {
const gap = timestamps[i] - timestamps[i - 1];
expect(gap).toBeGreaterThanOrEqual(250); // 300ms cooldown minus 50ms tolerance
}
await manager.stop();
});
// Test 7: Multiple constraint groups on one task
tap.test('should apply multiple constraint groups to one task', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const globalConstraint = new taskbuffer.TaskConstraintGroup({
name: 'global',
maxConcurrent: 3,
constraintKeyForExecution: () => 'all',
});
const groupConstraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({
name: 'group',
maxConcurrent: 1,
constraintKeyForExecution: (task) => task.data.group,
});
manager.addConstraintGroup(globalConstraint);
manager.addConstraintGroup(groupConstraint);
const makeTask = (name: string, group: string) =>
new taskbuffer.Task<undefined, [], { group: string }>({
name,
data: { group },
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
// Same group - should be serialized by group constraint
const t1 = makeTask('multi-1', 'A');
const t2 = makeTask('multi-2', 'A');
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
]);
// With group maxConcurrent: 1, only 1 should run at a time
expect(maxRunning).toBeLessThanOrEqual(1);
await manager.stop();
});
// Test 8: Matcher returns null — task runs unconstrained
tap.test('should run task unconstrained when matcher returns null', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup<{ skip: boolean }>({
name: 'selective',
maxConcurrent: 1,
constraintKeyForExecution: (task) => (task.data.skip ? null : 'constrained'),
});
manager.addConstraintGroup(constraint);
let unconstrained = false;
const task = new taskbuffer.Task<undefined, [], { skip: boolean }>({
name: 'skip-task',
data: { skip: true },
taskFunction: async () => {
unconstrained = true;
},
});
manager.addTask(task);
await manager.triggerTaskConstrained(task);
expect(unconstrained).toBeTrue();
await manager.stop();
});
// Test 9: Error handling — failed task releases slot, queue drains
tap.test('should release slot and drain queue when task fails', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({
name: 'error-drain',
maxConcurrent: 1,
constraintKeyForExecution: (task) => task.data.key,
});
manager.addConstraintGroup(constraint);
const failTask = new taskbuffer.Task<undefined, [], { key: string }>({
name: 'fail-task',
data: { key: 'shared' },
catchErrors: true,
taskFunction: async () => {
log.push('fail');
throw new Error('intentional');
},
});
const successTask = new taskbuffer.Task<undefined, [], { key: string }>({
name: 'success-task',
data: { key: 'shared' },
taskFunction: async () => {
log.push('success');
},
});
manager.addTask(failTask);
manager.addTask(successTask);
await Promise.all([
manager.triggerTaskConstrained(failTask),
manager.triggerTaskConstrained(successTask),
]);
expect(log).toContain('fail');
expect(log).toContain('success');
await manager.stop();
});
// Test 10: TaskManager integration — addConstraintGroup + triggerTaskByName
tap.test('should route triggerTaskByName through constraints', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'manager-integration',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'managed-1',
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
const t2 = new taskbuffer.Task({
name: 'managed-2',
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTaskByName('managed-1'),
manager.triggerTaskByName('managed-2'),
]);
expect(maxRunning).toBeLessThanOrEqual(1);
await manager.stop();
});
// Test 11: removeConstraintGroup removes by name
tap.test('should remove a constraint group by name', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'removable',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
expect(manager.constraintGroups.length).toEqual(1);
manager.removeConstraintGroup('removable');
expect(manager.constraintGroups.length).toEqual(0);
await manager.stop();
});
// Test 12: TaskConstraintGroup reset clears state
tap.test('should reset constraint group state', async () => {
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'resettable',
maxConcurrent: 2,
cooldownMs: 1000,
constraintKeyForExecution: () => 'key',
});
// Simulate usage
constraint.acquireSlot('key');
expect(constraint.getRunningCount('key')).toEqual(1);
constraint.releaseSlot('key');
expect(constraint.getCooldownRemaining('key')).toBeGreaterThan(0);
constraint.reset();
expect(constraint.getRunningCount('key')).toEqual(0);
expect(constraint.getCooldownRemaining('key')).toEqual(0);
});
// Test 13: Queued task returns correct result
tap.test('should return correct result from queued tasks', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'return-value-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'ret-1',
taskFunction: async () => {
await smartdelay.delayFor(100);
return 'result-A';
},
});
const t2 = new taskbuffer.Task({
name: 'ret-2',
taskFunction: async () => {
return 'result-B';
},
});
manager.addTask(t1);
manager.addTask(t2);
const [r1, r2] = await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
]);
expect(r1).toEqual('result-A');
expect(r2).toEqual('result-B');
await manager.stop();
});
// Test 14: Error propagation for queued tasks (catchErrors: false)
tap.test('should propagate errors from queued tasks (catchErrors: false)', async () => {
const manager = new taskbuffer.TaskManager();
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'error-propagation',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'err-first',
taskFunction: async () => {
await smartdelay.delayFor(100);
return 'ok';
},
});
const t2 = new taskbuffer.Task({
name: 'err-second',
catchErrors: false,
taskFunction: async () => {
throw new Error('queued-error');
},
});
manager.addTask(t1);
manager.addTask(t2);
const r1Promise = manager.triggerTaskConstrained(t1);
const r2Promise = manager.triggerTaskConstrained(t2);
const r1 = await r1Promise;
expect(r1).toEqual('ok');
let caughtError: Error | null = null;
try {
await r2Promise;
} catch (err) {
caughtError = err as Error;
}
expect(caughtError).toBeTruthy();
expect(caughtError!.message).toEqual('queued-error');
await manager.stop();
});
// Test 15: triggerTask() routes through constraints
tap.test('should route triggerTask() through constraints', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'trigger-task-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task({
name: `tt-${id}`,
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
const t1 = makeTask(1);
const t2 = makeTask(2);
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTask(t1),
manager.triggerTask(t2),
]);
expect(maxRunning).toBeLessThanOrEqual(1);
await manager.stop();
});
// Test 16: addExecuteRemoveTask() routes through constraints
tap.test('should route addExecuteRemoveTask() through constraints', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'add-execute-remove-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'all',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task({
name: `aer-${id}`,
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
return `done-${id}`;
},
});
const t1 = makeTask(1);
const t2 = makeTask(2);
const [report1, report2] = await Promise.all([
manager.addExecuteRemoveTask(t1),
manager.addExecuteRemoveTask(t2),
]);
expect(maxRunning).toBeLessThanOrEqual(1);
expect(report1.result).toEqual('done-1');
expect(report2.result).toEqual('done-2');
await manager.stop();
});
// Test 17: FIFO ordering of queued tasks
tap.test('should execute queued tasks in FIFO order', async () => {
const manager = new taskbuffer.TaskManager();
const executionOrder: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'fifo-test',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: string) =>
new taskbuffer.Task({
name: `fifo-${id}`,
taskFunction: async () => {
executionOrder.push(id);
await smartdelay.delayFor(50);
},
});
const tA = makeTask('A');
const tB = makeTask('B');
const tC = makeTask('C');
manager.addTask(tA);
manager.addTask(tB);
manager.addTask(tC);
await Promise.all([
manager.triggerTaskConstrained(tA),
manager.triggerTaskConstrained(tB),
manager.triggerTaskConstrained(tC),
]);
expect(executionOrder).toEqual(['A', 'B', 'C']);
await manager.stop();
});
// Test 18: Combined concurrency + cooldown
tap.test('should enforce both concurrency and cooldown together', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const timestamps: number[] = [];
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'combined-test',
maxConcurrent: 2,
cooldownMs: 200,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const makeTask = (id: number) =>
new taskbuffer.Task({
name: `combo-${id}`,
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
timestamps.push(Date.now());
await smartdelay.delayFor(100);
running--;
},
});
const tasks = [makeTask(1), makeTask(2), makeTask(3), makeTask(4)];
for (const t of tasks) {
manager.addTask(t);
}
await Promise.all(tasks.map((t) => manager.triggerTaskConstrained(t)));
// Concurrency never exceeded 2
expect(maxRunning).toBeLessThanOrEqual(2);
// First 2 tasks start nearly together, 3rd task starts after first batch completes + cooldown
// First batch completes ~100ms after start, then 200ms cooldown
const gap = timestamps[2] - timestamps[0];
expect(gap).toBeGreaterThanOrEqual(250); // 100ms task + 200ms cooldown - 50ms tolerance
await manager.stop();
});
// Test 19: Constraint removal unblocks queued tasks
tap.test('should unblock queued tasks when constraint group is removed', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'removable-constraint',
maxConcurrent: 1,
constraintKeyForExecution: () => 'shared',
});
manager.addConstraintGroup(constraint);
const t1 = new taskbuffer.Task({
name: 'block-1',
taskFunction: async () => {
log.push('t1-start');
// Remove constraint while t1 is running so t2 runs unconstrained after drain
manager.removeConstraintGroup('removable-constraint');
await smartdelay.delayFor(100);
log.push('t1-end');
},
});
const t2 = new taskbuffer.Task({
name: 'block-2',
taskFunction: async () => {
log.push('t2-start');
log.push('t2-end');
},
});
manager.addTask(t1);
manager.addTask(t2);
await Promise.all([
manager.triggerTaskConstrained(t1),
manager.triggerTaskConstrained(t2),
]);
// Both tasks completed (drain didn't deadlock after constraint removal)
expect(log).toContain('t1-start');
expect(log).toContain('t1-end');
expect(log).toContain('t2-start');
expect(log).toContain('t2-end');
// t2 started after t1 completed (drain fires after t1 finishes)
const t1EndIdx = log.indexOf('t1-end');
const t2StartIdx = log.indexOf('t2-start');
expect(t2StartIdx).toBeGreaterThanOrEqual(t1EndIdx);
await manager.stop();
});
// Test 20: Intra-task concurrency by input — same task, different inputs, key extracts TLD
tap.test('should serialize same-TLD inputs and parallelize different-TLD inputs', async () => {
const manager = new taskbuffer.TaskManager();
const log: string[] = [];
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'tld-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (_task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
});
manager.addConstraintGroup(constraint);
const getCert = new taskbuffer.Task({
name: 'get-cert',
taskFunction: async (domain: string) => {
log.push(`${domain}-start`);
await smartdelay.delayFor(100);
log.push(`${domain}-end`);
},
});
manager.addTask(getCert);
await Promise.all([
manager.triggerTaskConstrained(getCert, 'a.example.com'),
manager.triggerTaskConstrained(getCert, 'b.example.com'),
manager.triggerTaskConstrained(getCert, 'c.other.org'),
]);
// a.example.com and b.example.com share TLD "example.com" → serialized
const aEndIdx = log.indexOf('a.example.com-end');
const bStartIdx = log.indexOf('b.example.com-start');
expect(bStartIdx).toBeGreaterThanOrEqual(aEndIdx);
// c.other.org has different TLD → runs in parallel with a.example.com
const aStartIdx = log.indexOf('a.example.com-start');
const cStartIdx = log.indexOf('c.other.org-start');
expect(cStartIdx).toBeLessThan(aEndIdx);
await manager.stop();
});
// Test 21: shouldExecute skips queued task based on external state
tap.test('should skip queued task when shouldExecute returns false', async () => {
const manager = new taskbuffer.TaskManager();
const execLog: string[] = [];
const certCache = new Map<string, string>();
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'cert-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (_task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
shouldExecute: (_task, input?: string) => {
if (!input) return true;
return certCache.get(extractTLD(input)) !== 'wildcard';
},
});
manager.addConstraintGroup(constraint);
const getCert = new taskbuffer.Task({
name: 'get-cert-skip',
taskFunction: async (domain: string) => {
execLog.push(domain);
// First execution sets wildcard in cache
certCache.set(extractTLD(domain), 'wildcard');
await smartdelay.delayFor(100);
return `cert-for-${domain}`;
},
});
manager.addTask(getCert);
const [r1, r2] = await Promise.all([
manager.triggerTaskConstrained(getCert, 'app.example.com'),
manager.triggerTaskConstrained(getCert, 'api.example.com'),
]);
// First ran, second was skipped
expect(execLog).toEqual(['app.example.com']);
expect(r1).toEqual('cert-for-app.example.com');
expect(r2).toEqual(undefined);
await manager.stop();
});
// Test 22: shouldExecute on immediate (non-queued) trigger
tap.test('should skip immediate trigger when shouldExecute returns false', async () => {
const manager = new taskbuffer.TaskManager();
let executed = false;
const constraint = new taskbuffer.TaskConstraintGroup({
name: 'always-skip',
maxConcurrent: 10,
constraintKeyForExecution: () => 'all',
shouldExecute: () => false,
});
manager.addConstraintGroup(constraint);
const task = new taskbuffer.Task({
name: 'skip-immediate',
taskFunction: async () => {
executed = true;
return 'should-not-see';
},
});
manager.addTask(task);
const result = await manager.triggerTaskConstrained(task);
expect(executed).toBeFalse();
expect(result).toEqual(undefined);
await manager.stop();
});
// Test 23: Mixed task.data + input constraint key
tap.test('should use both task.data and input in constraint key', async () => {
const manager = new taskbuffer.TaskManager();
let running = 0;
let maxRunning = 0;
const constraint = new taskbuffer.TaskConstraintGroup<{ provider: string }>({
name: 'provider-domain',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
return `${task.data.provider}:${input || 'default'}`;
},
});
manager.addConstraintGroup(constraint);
const makeTask = (name: string, provider: string) =>
new taskbuffer.Task<undefined, [], { provider: string }>({
name,
data: { provider },
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await smartdelay.delayFor(100);
running--;
},
});
// Same provider + same domain input → should serialize
const t1 = makeTask('mixed-1', 'acme');
const t2 = makeTask('mixed-2', 'acme');
// Different provider + same domain → parallel
const t3 = makeTask('mixed-3', 'cloudflare');
manager.addTask(t1);
manager.addTask(t2);
manager.addTask(t3);
await Promise.all([
manager.triggerTaskConstrained(t1, 'example.com'),
manager.triggerTaskConstrained(t2, 'example.com'),
manager.triggerTaskConstrained(t3, 'example.com'),
]);
// t1 and t2 share key "acme:example.com" → serialized (max 1 at a time)
// t3 has key "cloudflare:example.com" → parallel with t1
// So maxRunning should be exactly 2 (t1 + t3, or t3 + t2)
expect(maxRunning).toBeLessThanOrEqual(2);
expect(maxRunning).toBeGreaterThanOrEqual(2);
await manager.stop();
});
export default tap.start();

View File

@@ -36,12 +36,9 @@ tap.test('expect run tasks in sequence', async () => {
});
const testPromise = testTaskchain.trigger();
await smartdelay.delayFor(2100);
// tslint:disable-next-line:no-unused-expression
expect(task1Executed).toBeTrue();
// tslint:disable-next-line:no-unused-expression
expect(task2Executed).toBeFalse();
await smartdelay.delayFor(2100);
// tslint:disable-next-line:no-unused-expression
expect(task2Executed).toBeTrue();
await testPromise;
});

View File

@@ -33,7 +33,6 @@ tap.test('should run the task as expected', async () => {
);
myTaskManager.start();
await myTaskManager.triggerTaskByName('myTask');
// tslint:disable-next-line:no-unused-expression
expect(referenceBoolean).toBeTrue();
});

View File

@@ -1,34 +0,0 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
let testTaskRunner: taskbuffer.TaskRunner;
tap.test('should create a valid taskrunner', async () => {
testTaskRunner = new taskbuffer.TaskRunner();
await testTaskRunner.start();
});
tap.test('should execute task when its scheduled', async (tools) => {
const done = tools.defer();
testTaskRunner.addTask(
new taskbuffer.Task({
taskFunction: async () => {
console.log('hi');
},
}),
);
testTaskRunner.addTask(
new taskbuffer.Task({
taskFunction: async () => {
console.log('there');
done.resolve();
},
}),
);
await done.promise;
});
tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.1.1',
version: '6.0.0',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -4,15 +4,15 @@ export { Taskchain } from './taskbuffer.classes.taskchain.js';
export { Taskparallel } from './taskbuffer.classes.taskparallel.js';
export { TaskManager } from './taskbuffer.classes.taskmanager.js';
export { TaskOnce } from './taskbuffer.classes.taskonce.js';
export { TaskRunner } from './taskbuffer.classes.taskrunner.js';
export { TaskDebounced } from './taskbuffer.classes.taskdebounced.js';
export { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
// Task step system
export { TaskStep } from './taskbuffer.classes.taskstep.js';
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
// Metadata interfaces
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js';
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution } from './taskbuffer.interfaces.js';
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
export { distributedCoordination };

View File

@@ -6,7 +6,7 @@ export class BufferRunner {
// initialize by default
public bufferCounter: number = 0;
constructor(taskArg: Task<any>) {
constructor(taskArg: Task<any, any, any>) {
this.task = taskArg;
}

View File

@@ -9,7 +9,7 @@ export interface ICycleObject {
export class CycleCounter {
public task: Task;
public cycleObjectArray: ICycleObject[] = [];
constructor(taskArg: Task<any>) {
constructor(taskArg: Task<any, any, any>) {
this.task = taskArg;
}
public getPromiseForCycle(cycleCountArg: number) {

View File

@@ -19,18 +19,18 @@ export type TPreOrAfterTaskFunction = () => Task<any>;
// Type helper to extract step names from array
export type StepNames<T> = T extends ReadonlyArray<{ name: infer N }> ? N : never;
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []> {
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = [], TData extends Record<string, unknown> = Record<string, unknown>> {
public static extractTask<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
preOrAfterTaskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
): Task<T, TSteps> {
preOrAfterTaskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
): Task<T, TSteps, any> {
switch (true) {
case !preOrAfterTaskArg:
return null;
case preOrAfterTaskArg instanceof Task:
return preOrAfterTaskArg as Task<T, TSteps>;
return preOrAfterTaskArg as Task<T, TSteps, any>;
case typeof preOrAfterTaskArg === 'function':
const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction;
return taskFunction() as unknown as Task<T, TSteps>;
return taskFunction() as unknown as Task<T, TSteps, any>;
default:
return null;
}
@@ -42,7 +42,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
return done.promise;
};
public static isTask = (taskArg: Task<any>): boolean => {
public static isTask = (taskArg: Task<any, any, any>): boolean => {
if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') {
return true;
} else {
@@ -51,8 +51,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
};
public static isTaskTouched<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T, TSteps>[],
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T, TSteps, any>[],
): boolean {
const taskToCheck = Task.extractTask(taskArg);
let result = false;
@@ -65,25 +65,16 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
}
public static runTask = async <T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps>[] },
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps, any>[] },
) => {
const taskToRun = Task.extractTask(taskArg);
const done = plugins.smartpromise.defer();
// Wait for all blocking tasks to finish
for (const task of taskToRun.blockingTasks) {
await task.finished;
}
if (!taskToRun.setupValue && taskToRun.taskSetup) {
taskToRun.setupValue = await taskToRun.taskSetup();
}
if (taskToRun.execDelay) {
await plugins.smartdelay.delayFor(taskToRun.execDelay);
}
taskToRun.running = true;
taskToRun.runCount++;
taskToRun.lastRun = new Date();
@@ -100,26 +91,10 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
// Complete all steps when task finishes
taskToRun.completeAllSteps();
taskToRun.emitEvent(taskToRun.lastError ? 'failed' : 'completed');
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
})
.catch((err) => {
taskToRun.running = false;
taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) });
// Resolve finished so blocking dependants don't hang
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
});
const options = {
@@ -127,7 +102,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
...optionsArg,
};
const x = options.x;
const touchedTasksArray: Task<T, TSteps>[] = options.touchedTasksArray;
const touchedTasksArray: Task<T, TSteps, any>[] = options.touchedTasksArray;
touchedTasksArray.push(taskToRun);
@@ -198,18 +173,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
public cronJob: plugins.smarttime.CronJob;
public bufferMax: number;
public execDelay: number;
public timeout: number;
public preTask: Task<T, any> | TPreOrAfterTaskFunction;
public afterTask: Task<T, any> | TPreOrAfterTaskFunction;
public data: TData;
// Add a list to store the blocking tasks
public blockingTasks: Task[] = [];
// Add a promise that will resolve when the task has finished
private finished: Promise<void>;
private resolveFinished: () => void;
public preTask: Task<T, any, any> | TPreOrAfterTaskFunction;
public afterTask: Task<T, any, any> | TPreOrAfterTaskFunction;
public running: boolean = false;
public bufferRunner = new BufferRunner(this);
@@ -275,12 +244,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
constructor(optionsArg: {
taskFunction: ITaskFunction<T>;
preTask?: Task<T, any> | TPreOrAfterTaskFunction;
afterTask?: Task<T, any> | TPreOrAfterTaskFunction;
preTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
afterTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
buffered?: boolean;
bufferMax?: number;
execDelay?: number;
name?: string;
data?: TData;
taskSetup?: ITaskSetupFunction<T>;
steps?: TSteps;
catchErrors?: boolean;
@@ -291,8 +260,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.afterTask = optionsArg.afterTask;
this.buffered = optionsArg.buffered;
this.bufferMax = optionsArg.bufferMax;
this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name;
this.data = optionsArg.data ?? ({} as TData);
this.taskSetup = optionsArg.taskSetup;
this.catchErrors = optionsArg.catchErrors ?? false;
this.labels = optionsArg.labels ? { ...optionsArg.labels } : {};
@@ -309,11 +278,6 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.steps.set(stepConfig.name, step);
}
}
// Create the finished promise
this.finished = new Promise((resolve) => {
this.resolveFinished = resolve;
});
}
public trigger(x?: any): Promise<any> {

View File

@@ -0,0 +1,89 @@
import type { Task } from './taskbuffer.classes.task.js';
import type { ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js';
export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<string, unknown>> {
public name: string;
public maxConcurrent: number;
public cooldownMs: number;
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
private runningCounts = new Map<string, number>();
private lastCompletionTimes = new Map<string, number>();
constructor(options: ITaskConstraintGroupOptions<TData>) {
this.name = options.name;
this.constraintKeyForExecution = options.constraintKeyForExecution;
this.maxConcurrent = options.maxConcurrent ?? Infinity;
this.cooldownMs = options.cooldownMs ?? 0;
this.shouldExecuteFn = options.shouldExecute;
}
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
const key = this.constraintKeyForExecution(task, input);
return key ?? null;
}
public async checkShouldExecute(task: Task<any, any, TData>, input?: any): Promise<boolean> {
if (!this.shouldExecuteFn) {
return true;
}
return this.shouldExecuteFn(task, input);
}
public canRun(subGroupKey: string): boolean {
const running = this.runningCounts.get(subGroupKey) ?? 0;
if (running >= this.maxConcurrent) {
return false;
}
if (this.cooldownMs > 0) {
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
if (lastCompletion !== undefined) {
const elapsed = Date.now() - lastCompletion;
if (elapsed < this.cooldownMs) {
return false;
}
}
}
return true;
}
public acquireSlot(subGroupKey: string): void {
const current = this.runningCounts.get(subGroupKey) ?? 0;
this.runningCounts.set(subGroupKey, current + 1);
}
public releaseSlot(subGroupKey: string): void {
const current = this.runningCounts.get(subGroupKey) ?? 0;
const next = Math.max(0, current - 1);
if (next === 0) {
this.runningCounts.delete(subGroupKey);
} else {
this.runningCounts.set(subGroupKey, next);
}
this.lastCompletionTimes.set(subGroupKey, Date.now());
}
public getCooldownRemaining(subGroupKey: string): number {
if (this.cooldownMs <= 0) {
return 0;
}
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
if (lastCompletion === undefined) {
return 0;
}
const elapsed = Date.now() - lastCompletion;
return Math.max(0, this.cooldownMs - elapsed);
}
public getRunningCount(subGroupKey: string): number {
return this.runningCounts.get(subGroupKey) ?? 0;
}
public reset(): void {
this.runningCounts.clear();
this.lastCompletionTimes.clear();
}
}

View File

@@ -1,10 +1,11 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
import {
AbstractDistributedCoordinator,
type IDistributedTaskRequestResult,
} from './taskbuffer.classes.distributedcoordinator.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
export interface ICronJob {
@@ -19,23 +20,28 @@ export interface ITaskManagerConstructorOptions {
export class TaskManager {
public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
public taskMap = new plugins.lik.ObjectMap<Task<any, any, any>>();
public readonly taskSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
private taskSubscriptions = new Map<Task<any, any>, plugins.smartrx.rxjs.Subscription>();
private taskSubscriptions = new Map<Task<any, any, any>, plugins.smartrx.rxjs.Subscription>();
private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null,
};
// Constraint system
public constraintGroups: TaskConstraintGroup<any>[] = [];
private constraintQueue: IConstrainedTaskEntry[] = [];
private drainTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, options);
}
public getTaskByName(taskName: string): Task<any, any> {
public getTaskByName(taskName: string): Task<any, any, any> {
return this.taskMap.findSync((task) => task.name === taskName);
}
public addTask(task: Task<any, any>): void {
public addTask(task: Task<any, any, any>): void {
if (!task.name) {
throw new Error('Task must have a name to be added to taskManager');
}
@@ -46,7 +52,7 @@ export class TaskManager {
this.taskSubscriptions.set(task, subscription);
}
public removeTask(task: Task<any, any>): void {
public removeTask(task: Task<any, any, any>): void {
this.taskMap.remove(task);
const subscription = this.taskSubscriptions.get(task);
if (subscription) {
@@ -55,21 +61,170 @@ export class TaskManager {
}
}
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
public addAndScheduleTask(task: Task<any, any, any>, cronString: string) {
this.addTask(task);
this.scheduleTaskByName(task.name, cronString);
}
// Constraint group management
public addConstraintGroup(group: TaskConstraintGroup<any>): void {
this.constraintGroups.push(group);
}
public removeConstraintGroup(name: string): void {
this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name);
}
// Core constraint evaluation
public async triggerTaskConstrained(task: Task<any, any, any>, input?: any): Promise<any> {
// Gather applicable constraints
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(task, input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply → check shouldExecute then trigger directly
if (applicableGroups.length === 0) {
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
return undefined;
}
return task.trigger(input);
}
// Check if all constraints allow running
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
return this.executeWithConstraintTracking(task, input, applicableGroups);
}
// Blocked → enqueue with deferred promise and cached constraint keys
const deferred = plugins.smartpromise.defer<any>();
const constraintKeys = new Map<string, string>();
for (const { group, key } of applicableGroups) {
constraintKeys.set(group.name, key);
}
this.constraintQueue.push({ task, input, deferred, constraintKeys });
return deferred.promise;
}
private async checkAllShouldExecute(task: Task<any, any, any>, input?: any): Promise<boolean> {
for (const group of this.constraintGroups) {
const shouldRun = await group.checkShouldExecute(task, input);
if (!shouldRun) {
return false;
}
}
return true;
}
private async executeWithConstraintTracking(
task: Task<any, any, any>,
input: any,
groups: Array<{ group: TaskConstraintGroup<any>; key: string }>,
): Promise<any> {
// Acquire slots synchronously to prevent race conditions
for (const { group, key } of groups) {
group.acquireSlot(key);
}
// Check shouldExecute after acquiring slots
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
// Release slots and drain queue
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
return undefined;
}
try {
return await task.trigger(input);
} finally {
// Release slots
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
}
}
private drainConstraintQueue(): void {
let shortestCooldown = Infinity;
const stillQueued: IConstrainedTaskEntry[] = [];
for (const entry of this.constraintQueue) {
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(entry.task, entry.input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply anymore (group removed?) → check shouldExecute then run
if (applicableGroups.length === 0) {
this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => {
if (!shouldRun) {
entry.deferred.resolve(undefined);
return;
}
entry.task.trigger(entry.input).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
});
continue;
}
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
// executeWithConstraintTracking handles shouldExecute check internally
this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
} else {
stillQueued.push(entry);
// Track shortest cooldown for timer scheduling
for (const { group, key } of applicableGroups) {
const remaining = group.getCooldownRemaining(key);
if (remaining > 0 && remaining < shortestCooldown) {
shortestCooldown = remaining;
}
}
}
}
this.constraintQueue = stillQueued;
// Schedule next drain if there are cooldown-blocked entries
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
if (stillQueued.length > 0 && shortestCooldown < Infinity) {
this.drainTimer = setTimeout(() => {
this.drainTimer = null;
this.drainConstraintQueue();
}, shortestCooldown + 1);
}
}
public async triggerTaskByName(taskName: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskName);
if (!taskToTrigger) {
throw new Error(`No task with the name ${taskName} found.`);
}
return taskToTrigger.trigger();
return this.triggerTaskConstrained(taskToTrigger);
}
public async triggerTask(task: Task<any, any>) {
return task.trigger();
public async triggerTask(task: Task<any, any, any>) {
return this.triggerTaskConstrained(task);
}
public scheduleTaskByName(taskName: string, cronString: string) {
@@ -80,7 +235,7 @@ export class TaskManager {
this.handleTaskScheduling(taskToSchedule, cronString);
}
private handleTaskScheduling(task: Task<any, any>, cronString: string) {
private handleTaskScheduling(task: Task<any, any, any>, cronString: string) {
const cronJob = this.cronJobManager.addCronjob(
cronString,
async (triggerTime: number) => {
@@ -98,7 +253,7 @@ export class TaskManager {
}
}
try {
await task.trigger();
await this.triggerTaskConstrained(task);
} catch (err) {
logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
@@ -107,7 +262,7 @@ export class TaskManager {
task.cronJob = cronJob;
}
private logTaskState(task: Task<any, any>) {
private logTaskState(task: Task<any, any, any>) {
logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
@@ -116,7 +271,7 @@ export class TaskManager {
}
private async performDistributedConsultation(
task: Task<any, any>,
task: Task<any, any, any>,
triggerTime: number,
): Promise<IDistributedTaskRequestResult> {
logger.log('info', 'Found a distributed coordinator, performing consultation.');
@@ -144,7 +299,7 @@ export class TaskManager {
}
}
public async descheduleTask(task: Task<any, any>) {
public async descheduleTask(task: Task<any, any, any>) {
await this.descheduleTaskByName(task.name);
}
@@ -169,6 +324,10 @@ export class TaskManager {
subscription.unsubscribe();
}
this.taskSubscriptions.clear();
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
}
// Get metadata for a specific task
@@ -186,7 +345,7 @@ export class TaskManager {
// Get scheduled tasks with their schedules and next run times
public getScheduledTasks(): IScheduledTaskInfo[] {
const scheduledTasks: IScheduledTaskInfo[] = [];
for (const task of this.taskMap.getArray()) {
if (task.cronJob) {
scheduledTasks.push({
@@ -199,7 +358,7 @@ export class TaskManager {
});
}
}
return scheduledTasks;
}
@@ -213,11 +372,11 @@ export class TaskManager {
}))
.sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime())
.slice(0, limit);
return scheduledRuns;
}
public getTasksByLabel(key: string, value: string): Task<any, any>[] {
public getTasksByLabel(key: string, value: string): Task<any, any, any>[] {
return this.taskMap.getArray().filter(task => task.labels[key] === value);
}
@@ -227,7 +386,7 @@ export class TaskManager {
// Add, execute, and remove a task while collecting metadata
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
task: Task<T, TSteps>,
task: Task<T, TSteps, any>,
options?: {
schedule?: string;
trackProgress?: boolean;
@@ -235,19 +394,18 @@ export class TaskManager {
): Promise<ITaskExecutionReport> {
// Add task to manager
this.addTask(task);
// Optionally schedule it
if (options?.schedule) {
this.scheduleTaskByName(task.name!, options.schedule);
}
const startTime = Date.now();
const progressUpdates: Array<{ stepName: string; timestamp: number }> = [];
try {
// Execute the task
const result = await task.trigger();
// Execute the task through constraints
const result = await this.triggerTaskConstrained(task);
// Collect execution report
const report: ITaskExecutionReport = {
taskName: task.name || 'unnamed',
@@ -261,15 +419,15 @@ export class TaskManager {
progress: task.getProgress(),
result,
};
// Remove task from manager
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
return report;
} catch (error) {
// Create error report
@@ -285,15 +443,15 @@ export class TaskManager {
progress: task.getProgress(),
error: error as Error,
};
// Remove task from manager even on error
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
throw errorReport;
}
}

View File

@@ -1,69 +0,0 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { logger } from './taskbuffer.logging.js';
export class TaskRunner {
public maxParallelJobs: number = 1;
public status: 'stopped' | 'running' = 'stopped';
public runningTasks: plugins.lik.ObjectMap<Task> =
new plugins.lik.ObjectMap<Task>();
public queuedTasks: Task[] = [];
constructor() {
this.runningTasks.eventSubject.subscribe(async (eventArg) => {
this.checkExecution();
});
}
/**
* adds a task to the queue
*/
public addTask(taskArg: Task) {
this.queuedTasks.push(taskArg);
this.checkExecution();
}
/**
* set amount of parallel tasks
* be careful, you might lose dependability of tasks
*/
public setMaxParallelJobs(maxParallelJobsArg: number) {
this.maxParallelJobs = maxParallelJobsArg;
}
/**
* starts the task queue
*/
public async start() {
this.status = 'running';
}
/**
* checks whether execution is on point
*/
public async checkExecution() {
if (
this.runningTasks.getArray().length < this.maxParallelJobs &&
this.status === 'running' &&
this.queuedTasks.length > 0
) {
const nextJob = this.queuedTasks.shift();
this.runningTasks.add(nextJob);
try {
await nextJob.trigger();
} catch (err) {
logger.log('error', `TaskRunner: task "${nextJob.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
this.runningTasks.remove(nextJob);
this.checkExecution();
}
}
/**
* stops the task queue
*/
public async stop() {
this.status = 'stopped';
}
}

View File

@@ -1,4 +1,25 @@
import type { ITaskStep } from './taskbuffer.classes.taskstep.js';
import type { Task } from './taskbuffer.classes.task.js';
export interface ITaskConstraintGroupOptions<TData extends Record<string, unknown> = Record<string, unknown>> {
name: string;
constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
maxConcurrent?: number; // default: Infinity
cooldownMs?: number; // default: 0
shouldExecute?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
}
export interface ITaskExecution<TData extends Record<string, unknown> = Record<string, unknown>> {
task: Task<any, any, TData>;
input: any;
}
export interface IConstrainedTaskEntry {
task: Task<any, any, any>;
input: any;
deferred: import('@push.rocks/smartpromise').Deferred<any>;
constraintKeys: Map<string, string>; // groupName -> key
}
export interface ITaskMetadata {
name: string;

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.1.1',
version: '6.0.0',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -8,20 +8,20 @@ import type { TaskManager, ITaskMetadata, IScheduledTaskInfo } from '../ts/index
export class TaskbufferDashboard extends DeesElement {
// Properties
@property({ type: Object })
public taskManager: TaskManager | null = null;
accessor taskManager: TaskManager | null = null;
@property({ type: Number })
public refreshInterval: number = 1000; // milliseconds
accessor refreshInterval: number = 1000; // milliseconds
// Internal state
@state()
private tasks: ITaskMetadata[] = [];
accessor tasks: ITaskMetadata[] = [];
@state()
private scheduledTasks: IScheduledTaskInfo[] = [];
accessor scheduledTasks: IScheduledTaskInfo[] = [];
@state()
private isRunning: boolean = false;
accessor isRunning: boolean = false;
private refreshTimer: any;