Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6e43e2ea68 | |||
| 2a345f6514 | |||
| b536dc8ba2 | |||
| 6ca6cf6bc0 | |||
| ed3bd99406 | |||
| 3ab90d9895 | |||
| aee7236e5f | |||
| c89da9e2b0 | |||
| fae13bb944 | |||
| 0811b04dfd | |||
| 33d1c334c4 | |||
| b2c0553e30 | |||
| 450b62fe5d | |||
| d3e8ff1a11 | |||
| 9d78933a46 | |||
| 28d9ad1746 | |||
| 28312972e0 | |||
| a0abcdda90 |
75
changelog.md
75
changelog.md
@@ -1,5 +1,80 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-02-15 - 6.1.2 - fix(deps)
|
||||
bump @push.rocks/smarttime to ^4.2.3
|
||||
|
||||
- Updated @push.rocks/smarttime from ^4.1.1 to ^4.2.3
|
||||
- Non-breaking dependency version bump; increment patch version
|
||||
|
||||
## 2026-02-15 - 6.1.1 - fix(tests)
|
||||
improve buffered task tests: add chain, concurrency and queue behavior tests
|
||||
|
||||
- Replace tools.delayFor with @push.rocks/smartdelay for more deterministic timing in tests
|
||||
- Add tests for afterTask chaining, bufferMax concurrency, queued-run limits, and re-trigger behavior
|
||||
- Rename tasks to descriptive names and fix afterTask chaining order to avoid circular references
|
||||
- Change test runner invocation to export default tap.start() instead of calling tap.start() directly
|
||||
|
||||
## 2026-02-15 - 6.1.0 - feat(taskbuffer)
|
||||
add sliding-window rate limiting and result-sharing to TaskConstraintGroup and integrate with TaskManager
|
||||
|
||||
- Added IRateLimitConfig and TResultSharingMode types and exported them from the public index
|
||||
- TaskConstraintGroup: added rateLimit and resultSharingMode options, internal completion timestamp tracking, and last-result storage
|
||||
- TaskConstraintGroup: new helpers - pruneCompletionTimestamps, getRateLimitDelay, getNextAvailableDelay, recordResult, getLastResult, hasResultSharing
|
||||
- TaskConstraintGroup: rate-limit logic enforces maxPerWindow (counts running + completions) and composes with cooldown/maxConcurrent
|
||||
- TaskManager: records successful task results to constraint groups and resolves queued entries immediately when a shared result exists
|
||||
- TaskManager: queue drain now considers unified next-available delay (cooldown + rate limit) when scheduling retries
|
||||
- Documentation updated: README and hints with usage examples for sliding-window rate limiting and result sharing
|
||||
- Comprehensive tests added for rate limiting, concurrency interaction, and result-sharing behavior
|
||||
|
||||
## 2026-02-15 - 6.0.1 - fix(taskbuffer)
|
||||
no changes to commit
|
||||
|
||||
- Git diff shows no changes
|
||||
- package.json current version is 6.0.0; no version bump required
|
||||
|
||||
## 2026-02-15 - 6.0.0 - BREAKING CHANGE(constraints)
|
||||
make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook
|
||||
|
||||
- Rename ITaskConstraintGroupOptions.constraintKeyForTask -> constraintKeyForExecution(task, input?) and update TaskConstraintGroup.getConstraintKey signature
|
||||
- Add optional shouldExecute(task, input?) hook; TaskManager checks shouldExecute before immediate runs, after acquiring slots, and when draining the constraint queue (queued tasks are skipped when shouldExecute returns false)
|
||||
- Export ITaskExecution type and store constraintKeys on queued entries (IConstrainedTaskEntry.constraintKeys)
|
||||
- Documentation and tests updated to demonstrate input-aware constraint keys and shouldExecute pruning
|
||||
|
||||
## 2026-02-15 - 5.0.1 - fix(tests)
|
||||
add and tighten constraint-related tests covering return values, error propagation, concurrency, cooldown timing, and constraint removal
|
||||
|
||||
- Tightened cooldown timing assertion from >=100ms to >=250ms to reflect 300ms cooldown with 50ms tolerance.
|
||||
- Added tests for queued task return values, error propagation when catchErrors is false, and error swallowing behavior when catchErrors is true.
|
||||
- Added concurrency and cooldown interaction tests to ensure maxConcurrent is respected and batch timing is correct.
|
||||
- Added test verifying removing a constraint group unblocks queued tasks and drain behavior completes correctly.
|
||||
|
||||
## 2026-02-15 - 5.0.0 - BREAKING CHANGE(taskbuffer)
|
||||
Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests.
|
||||
|
||||
- Add TaskConstraintGroup class with per-key maxConcurrent, cooldownMs, and helper methods (canRun, acquireSlot, releaseSlot, getCooldownRemaining, getRunningCount, reset).
|
||||
- Task generic signature extended to Task<T, TSteps, TData> and a new typed data property (data) with default {}.
|
||||
- TaskManager now supports addConstraintGroup/removeConstraintGroup, triggerTaskConstrained, queues blocked tasks, drains queue with cooldown timers, and routes triggerTask/triggerTaskByName through the constraint system.
|
||||
- Removed TaskRunner, plus Task APIs: blockingTasks, execDelay, finished promise and associated behavior have been removed (breaking changes).
|
||||
- Exports and interfaces updated: TaskConstraintGroup and ITaskConstraintGroupOptions added; TaskRunner removed from public API.
|
||||
- Updated README and added comprehensive tests for constraint behavior; adjusted other tests to remove TaskRunner usage and reflect new APIs.
|
||||
|
||||
## 2026-02-15 - 4.2.1 - fix(deps)
|
||||
bump @push.rocks/smartlog and @types/node; update dependency list version and license link in docs
|
||||
|
||||
- package.json: update @push.rocks/smartlog from ^3.1.10 to ^3.1.11
|
||||
- package.json: update @types/node from ^25.1.0 to ^25.2.3
|
||||
- readme.hints.md: update 'Dependencies (as of v4.1.1)' to 'Dependencies (as of v4.2.0)' and reflect bumped dependency versions
|
||||
- readme.md: change license link text to '[LICENSE](./license.md)'
|
||||
|
||||
## 2026-01-29 - 4.2.0 - feat(ts_web)
|
||||
support TC39 'accessor' decorators for web components; bump dependencies and devDependencies; rename browser tests to .chromium.ts; move LICENSE to license.md and update readme
|
||||
|
||||
- Convert web component class fields to use the TC39 'accessor' keyword in ts_web/taskbuffer-dashboard.ts to be compatible with @design.estate/dees-element v2.1.6
|
||||
- Bump @design.estate/dees-element to ^2.1.6 and update devDependencies (@git.zone/tsbuild, @git.zone/tsbundle, @git.zone/tsrun, @git.zone/tstest, @types/node) to newer versions
|
||||
- Replace test/test.10.webcomponent.browser.ts with test/test.10.webcomponent.chromium.ts and update testing guidance in readme.hints.md to prefer .chromium.ts
|
||||
- Move LICENSE file content to license.md and update readme.md to reference the new license file
|
||||
- Small test cleanups: remove obsolete tslint:disable comments
|
||||
|
||||
## 2026-01-26 - 4.1.1 - fix(ts_web)
|
||||
fix web dashboard typings and update generated commit info
|
||||
|
||||
|
||||
20
package.json
20
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/taskbuffer",
|
||||
"version": "4.1.1",
|
||||
"version": "6.1.2",
|
||||
"private": false,
|
||||
"description": "A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.",
|
||||
"main": "dist_ts/index.js",
|
||||
@@ -27,28 +27,28 @@
|
||||
"debounced tasks",
|
||||
"distributed coordination"
|
||||
],
|
||||
"author": "Lossless GmbH",
|
||||
"author": "Task Venture Capital GmbH <hello@task.vc>",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://code.foss.global/push.rocks/taskbuffer/issues"
|
||||
},
|
||||
"homepage": "https://code.foss.global/push.rocks/taskbuffer#readme",
|
||||
"dependencies": {
|
||||
"@design.estate/dees-element": "^2.1.3",
|
||||
"@design.estate/dees-element": "^2.1.6",
|
||||
"@push.rocks/lik": "^6.2.2",
|
||||
"@push.rocks/smartdelay": "^3.0.5",
|
||||
"@push.rocks/smartlog": "^3.1.10",
|
||||
"@push.rocks/smartlog": "^3.1.11",
|
||||
"@push.rocks/smartpromise": "^4.2.3",
|
||||
"@push.rocks/smartrx": "^3.0.10",
|
||||
"@push.rocks/smarttime": "^4.1.1",
|
||||
"@push.rocks/smarttime": "^4.2.3",
|
||||
"@push.rocks/smartunique": "^3.0.9"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^3.1.2",
|
||||
"@git.zone/tsbundle": "^2.6.3",
|
||||
"@git.zone/tsrun": "^2.0.0",
|
||||
"@git.zone/tstest": "^3.1.3",
|
||||
"@types/node": "^24.10.1"
|
||||
"@git.zone/tsbuild": "^4.1.2",
|
||||
"@git.zone/tsbundle": "^2.8.3",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tstest": "^3.1.8",
|
||||
"@types/node": "^25.2.3"
|
||||
},
|
||||
"files": [
|
||||
"ts/**/*",
|
||||
|
||||
4802
pnpm-lock.yaml
generated
4802
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@@ -1,20 +1,61 @@
|
||||
# Taskbuffer Hints
|
||||
|
||||
## Task Constraint System (v5.0.0+) — Breaking Changes
|
||||
- **`TaskRunner` removed** — replaced by `TaskManager` + `TaskConstraintGroup`
|
||||
- **`blockingTasks` removed** from `Task` — use `TaskConstraintGroup` with `maxConcurrent: 1`
|
||||
- **`execDelay` removed** from `Task` — use `TaskConstraintGroup` with `cooldownMs`
|
||||
- **`finished` promise removed** from `Task` — no longer needed
|
||||
- **`Task` generic signature**: `Task<T, TSteps, TData>` (3rd param added for typed data)
|
||||
|
||||
### Task.data
|
||||
- `Task` constructor accepts optional `data?: TData` (defaults to `{}`)
|
||||
- Typed data bag accessible as `task.data`
|
||||
|
||||
### TaskConstraintGroup
|
||||
- `new TaskConstraintGroup<TData>({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute?, rateLimit?, resultSharingMode? })`
|
||||
- `constraintKeyForExecution(task, input?)` returns a string key (constraint applies) or `null` (skip). Receives both task and runtime input.
|
||||
- `shouldExecute(task, input?)` — optional pre-execution check. Returns `false` to skip (deferred resolves `undefined`). Can be async.
|
||||
- `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key
|
||||
- `cooldownMs` (default: `0`) — minimum ms gap between completions per key
|
||||
- `rateLimit` (optional) — `{ maxPerWindow: number, windowMs: number }` sliding window rate limiter. Counts both running + completed tasks in window.
|
||||
- `resultSharingMode` (default: `'none'`) — `'none'` | `'share-latest'`. When `'share-latest'`, queued tasks for the same key resolve with the first task's result without executing.
|
||||
- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRateLimitDelay(key)`, `getNextAvailableDelay(key)`, `getRunningCount(key)`, `recordResult(key, result)`, `getLastResult(key)`, `hasResultSharing()`, `reset()`
|
||||
- `ITaskExecution<TData>` type exported from index — `{ task, input }` tuple
|
||||
|
||||
### Rate Limiting (v6.1.0+)
|
||||
- Sliding window rate limiter: `rateLimit: { maxPerWindow: N, windowMs: ms }`
|
||||
- Counts running + completed tasks against the window cap
|
||||
- Per-key independence: saturating key A doesn't block key B
|
||||
- Composable with `maxConcurrent` and `cooldownMs`
|
||||
- `getNextAvailableDelay(key)` returns `Math.max(cooldownRemaining, rateLimitDelay)` — unified "how long until I can run" answer
|
||||
- Drain timer auto-schedules based on shortest delay across all constraints
|
||||
|
||||
### Result Sharing (v6.1.0+)
|
||||
- `resultSharingMode: 'share-latest'` — queued tasks for the same key get the first task's result without executing
|
||||
- Only successful results are shared (errors from `catchErrors: true` or thrown errors are NOT shared)
|
||||
- `shouldExecute` is NOT called for shared results (the task's purpose was already fulfilled)
|
||||
- `lastResults` persists until `reset()` — for time-bounded sharing, use `shouldExecute` to control staleness
|
||||
- Composable with rate limiting: rate-limited waiters get shared result without waiting for the window
|
||||
|
||||
### TaskManager Constraint Integration
|
||||
- `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)`
|
||||
- `triggerTaskByName()`, `triggerTask()`, `addExecuteRemoveTask()`, cron callbacks all route through `triggerTaskConstrained()`
|
||||
- `triggerTaskConstrained(task, input?)` — evaluates constraints, queues if blocked, drains after completion
|
||||
- Cooldown-blocked entries auto-drain via timer
|
||||
|
||||
### Exported from index.ts
|
||||
- `TaskConstraintGroup` class
|
||||
- `ITaskConstraintGroupOptions`, `IRateLimitConfig`, `TResultSharingMode` types
|
||||
|
||||
## Error Handling (v3.6.0+)
|
||||
- `Task` now has `catchErrors` constructor option (default: `false`)
|
||||
- Default behavior: `trigger()` rejects when taskFunction throws (breaking change from pre-3.6)
|
||||
- Set `catchErrors: true` to swallow errors (old behavior) - returns `undefined` on error
|
||||
- Error state tracked via `lastError?: Error`, `errorCount: number`, `clearError()`
|
||||
- `getMetadata()` status uses all four values: `'idle'` | `'running'` | `'completed'` | `'failed'`
|
||||
- All peripheral classes (Taskchain, Taskparallel, TaskRunner, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
|
||||
- All peripheral classes (Taskchain, Taskparallel, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
|
||||
- `console.log` calls replaced with `logger.log()` throughout
|
||||
|
||||
## Breaking API Rename (TaskRunner)
|
||||
- `maxParrallelJobs` → `maxParallelJobs`
|
||||
- `qeuedTasks` → `queuedTasks`
|
||||
- JSDoc typos fixed: "qeue" → "queue", "wether" → "whether", "loose" → "lose"
|
||||
- The `setMaxParallelJobs()` parameter also renamed from `maxParrallelJobsArg` to `maxParallelJobsArg`
|
||||
|
||||
## Error Context Improvements
|
||||
- **TaskChain**: Errors now wrap the original with context: chain name, failing task name, and task index. Original error preserved via `.cause`
|
||||
- **BufferRunner**: When `catchErrors: false`, buffered task errors now reject the trigger promise (via `CycleCounter.informOfCycleError`) instead of silently resolving with `undefined`
|
||||
@@ -36,7 +77,34 @@
|
||||
|
||||
## Project Structure
|
||||
- Source in `ts/`, web components in `ts_web/`
|
||||
- Tests in `test/` - naming: `*.node.ts`, `*.browser.ts`, `*.both.ts`
|
||||
- Tests in `test/` - naming: `*.node.ts`, `*.chromium.ts` (preferred), `*.both.ts`
|
||||
- Note: `*.browser.ts` is deprecated, use `*.chromium.ts` for browser tests
|
||||
- Logger: `ts/taskbuffer.logging.ts` exports `logger` (ConsoleLog from smartlog)
|
||||
- Build: `pnpm build` (tsbuild tsfolders)
|
||||
- Test: `pnpm test` or `tstest test/test.XX.name.ts --verbose`
|
||||
|
||||
## Web Components (ts_web/)
|
||||
- Uses `@design.estate/dees-element` with TC39 decorators
|
||||
- Decorators require the `accessor` keyword:
|
||||
```typescript
|
||||
@property({ type: String })
|
||||
accessor myProp = 'default';
|
||||
|
||||
@state()
|
||||
accessor count = 0;
|
||||
```
|
||||
|
||||
## Dependencies (as of v4.2.0)
|
||||
- `@design.estate/dees-element` ^2.1.6 - TC39 decorators with `accessor` keyword
|
||||
- `@push.rocks/lik` ^6.2.2 - Data structures
|
||||
- `@push.rocks/smartdelay` ^3.0.5 - Delay utilities
|
||||
- `@push.rocks/smartlog` ^3.1.11 - Logging
|
||||
- `@push.rocks/smartpromise` ^4.2.3 - Promise utilities
|
||||
- `@push.rocks/smartrx` ^3.0.10 - RxJS wrapper
|
||||
- `@push.rocks/smarttime` ^4.1.1 - Time/cron utilities
|
||||
- `@push.rocks/smartunique` ^3.0.9 - Unique ID generation
|
||||
- `@git.zone/tsbuild` ^4.1.2 - Build tool
|
||||
- `@git.zone/tsbundle` ^2.8.3 - Bundler (for browser tests)
|
||||
- `@git.zone/tsrun` ^2.0.1 - TypeScript runner
|
||||
- `@git.zone/tstest` ^3.1.8 - Test runner (supports `.chromium.ts` files)
|
||||
- `@types/node` ^25.2.3 - Node.js type definitions
|
||||
|
||||
484
readme.md
484
readme.md
@@ -1,6 +1,6 @@
|
||||
# @push.rocks/taskbuffer 🚀
|
||||
|
||||
> **Modern TypeScript task orchestration with smart buffering, scheduling, labels, and real-time event streaming**
|
||||
> **Modern TypeScript task orchestration with constraint-based concurrency, smart buffering, scheduling, labels, and real-time event streaming**
|
||||
|
||||
[](https://www.npmjs.com/package/@push.rocks/taskbuffer)
|
||||
[](https://www.typescriptlang.org/)
|
||||
@@ -13,6 +13,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
||||
## 🌟 Features
|
||||
|
||||
- **🎯 Type-Safe Task Management** — Full TypeScript support with generics and type inference
|
||||
- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, cooldown enforcement, sliding-window rate limiting, and result sharing via `TaskConstraintGroup`
|
||||
- **📊 Real-Time Progress Tracking** — Step-based progress with percentage weights
|
||||
- **⚡ Smart Buffering** — Intelligent request debouncing and batching
|
||||
- **⏰ Cron Scheduling** — Schedule tasks with cron expressions
|
||||
@@ -49,6 +50,24 @@ const result = await greetTask.trigger('World');
|
||||
console.log(result); // "Hello, World!"
|
||||
```
|
||||
|
||||
### Task with Typed Data 📦
|
||||
|
||||
Every task can carry a typed data bag — perfect for constraint matching, routing, and metadata:
|
||||
|
||||
```typescript
|
||||
const task = new Task<undefined, [], { domain: string; priority: number }>({
|
||||
name: 'update-dns',
|
||||
data: { domain: 'example.com', priority: 1 },
|
||||
taskFunction: async () => {
|
||||
// task.data is fully typed here
|
||||
console.log(`Updating DNS for ${task.data.domain}`);
|
||||
},
|
||||
});
|
||||
|
||||
task.data.domain; // string — fully typed
|
||||
task.data.priority; // number — fully typed
|
||||
```
|
||||
|
||||
### Task with Steps & Progress 📊
|
||||
|
||||
```typescript
|
||||
@@ -84,6 +103,325 @@ console.log(deployTask.getStepsMetadata()); // Step details with status
|
||||
|
||||
> **Note:** `notifyStep()` is fully type-safe — TypeScript only accepts step names you declared in the `steps` array when you use `as const`.
|
||||
|
||||
## 🔒 Task Constraints — Concurrency, Mutual Exclusion & Cooldowns
|
||||
|
||||
`TaskConstraintGroup` is the unified mechanism for controlling how tasks run relative to each other. It replaces older patterns like task runners, blocking tasks, and execution delays with a single, composable, key-based constraint system.
|
||||
|
||||
### Per-Key Mutual Exclusion
|
||||
|
||||
Ensure only one task runs at a time for a given key (e.g. per domain, per tenant, per resource):
|
||||
|
||||
```typescript
|
||||
import { Task, TaskManager, TaskConstraintGroup } from '@push.rocks/taskbuffer';
|
||||
|
||||
const manager = new TaskManager();
|
||||
|
||||
// Only one DNS update per domain at a time
|
||||
const domainMutex = new TaskConstraintGroup<{ domain: string }>({
|
||||
name: 'domain-mutex',
|
||||
maxConcurrent: 1,
|
||||
constraintKeyForExecution: (task, input?) => task.data.domain,
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(domainMutex);
|
||||
|
||||
const task1 = new Task<undefined, [], { domain: string }>({
|
||||
name: 'update-a.com',
|
||||
data: { domain: 'a.com' },
|
||||
taskFunction: async () => { /* update DNS for a.com */ },
|
||||
});
|
||||
|
||||
const task2 = new Task<undefined, [], { domain: string }>({
|
||||
name: 'update-a.com-2',
|
||||
data: { domain: 'a.com' },
|
||||
taskFunction: async () => { /* another update for a.com */ },
|
||||
});
|
||||
|
||||
manager.addTask(task1);
|
||||
manager.addTask(task2);
|
||||
|
||||
// task2 waits until task1 finishes (same domain key)
|
||||
await Promise.all([
|
||||
manager.triggerTask(task1),
|
||||
manager.triggerTask(task2),
|
||||
]);
|
||||
```
|
||||
|
||||
### Group Concurrency Limits
|
||||
|
||||
Cap how many tasks can run concurrently across a group:
|
||||
|
||||
```typescript
|
||||
// Max 3 DNS updaters running globally at once
|
||||
const dnsLimit = new TaskConstraintGroup<{ group: string }>({
|
||||
name: 'dns-concurrency',
|
||||
maxConcurrent: 3,
|
||||
constraintKeyForExecution: (task) =>
|
||||
task.data.group === 'dns' ? 'dns' : null, // null = skip constraint
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(dnsLimit);
|
||||
```
|
||||
|
||||
### Cooldowns (Rate Limiting)
|
||||
|
||||
Enforce a minimum time gap between consecutive executions for the same key:
|
||||
|
||||
```typescript
|
||||
// No more than one API call per domain every 11 seconds
|
||||
const rateLimiter = new TaskConstraintGroup<{ domain: string }>({
|
||||
name: 'api-rate-limit',
|
||||
maxConcurrent: 1,
|
||||
cooldownMs: 11000,
|
||||
constraintKeyForExecution: (task) => task.data.domain,
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(rateLimiter);
|
||||
```
|
||||
|
||||
### Global Concurrency Cap
|
||||
|
||||
Limit total concurrent tasks system-wide:
|
||||
|
||||
```typescript
|
||||
const globalCap = new TaskConstraintGroup({
|
||||
name: 'global-cap',
|
||||
maxConcurrent: 10,
|
||||
constraintKeyForExecution: () => 'all', // same key = shared limit
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(globalCap);
|
||||
```
|
||||
|
||||
### Composing Multiple Constraints
|
||||
|
||||
Multiple constraint groups stack — a task only runs when **all** applicable constraints allow it:
|
||||
|
||||
```typescript
|
||||
manager.addConstraintGroup(globalCap); // max 10 globally
|
||||
manager.addConstraintGroup(domainMutex); // max 1 per domain
|
||||
manager.addConstraintGroup(rateLimiter); // 11s cooldown per domain
|
||||
|
||||
// A task must satisfy ALL three constraints before it starts
|
||||
await manager.triggerTask(dnsTask);
|
||||
```
|
||||
|
||||
### Selective Constraints
|
||||
|
||||
Return `null` from `constraintKeyForExecution` to exempt a task from a constraint group:
|
||||
|
||||
```typescript
|
||||
const constraint = new TaskConstraintGroup<{ priority: string }>({
|
||||
name: 'low-priority-limit',
|
||||
maxConcurrent: 2,
|
||||
constraintKeyForExecution: (task) =>
|
||||
task.data.priority === 'low' ? 'low-priority' : null, // high priority tasks skip this constraint
|
||||
});
|
||||
```
|
||||
|
||||
### Input-Aware Constraints 🎯
|
||||
|
||||
The `constraintKeyForExecution` function receives both the **task** and the **runtime input** passed to `trigger(input)`. This means the same task triggered with different inputs can be constrained independently:
|
||||
|
||||
```typescript
|
||||
const extractTLD = (domain: string) => {
|
||||
const parts = domain.split('.');
|
||||
return parts.slice(-2).join('.');
|
||||
};
|
||||
|
||||
// Same TLD → serialized. Different TLDs → parallel.
|
||||
const tldMutex = new TaskConstraintGroup({
|
||||
name: 'tld-mutex',
|
||||
maxConcurrent: 1,
|
||||
constraintKeyForExecution: (task, input?: string) => {
|
||||
if (!input) return null;
|
||||
return extractTLD(input); // "example.com", "other.org", etc.
|
||||
},
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(tldMutex);
|
||||
|
||||
// These two serialize (same TLD "example.com")
|
||||
const p1 = manager.triggerTaskConstrained(getCert, 'app.example.com');
|
||||
const p2 = manager.triggerTaskConstrained(getCert, 'api.example.com');
|
||||
|
||||
// This runs in parallel (different TLD "other.org")
|
||||
const p3 = manager.triggerTaskConstrained(getCert, 'my.other.org');
|
||||
```
|
||||
|
||||
You can also combine `task.data` and `input` for composite keys:
|
||||
|
||||
```typescript
|
||||
const providerDomain = new TaskConstraintGroup<{ provider: string }>({
|
||||
name: 'provider-domain',
|
||||
maxConcurrent: 1,
|
||||
constraintKeyForExecution: (task, input?: string) => {
|
||||
return `${task.data.provider}:${input || 'default'}`;
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
### Pre-Execution Check with `shouldExecute` ✅
|
||||
|
||||
The `shouldExecute` callback runs right before a queued task executes. If it returns `false`, the task is skipped and its promise resolves with `undefined`. This is perfect for scenarios where a prior execution's outcome makes subsequent queued tasks unnecessary:
|
||||
|
||||
```typescript
|
||||
const certCache = new Map<string, string>();
|
||||
|
||||
const certConstraint = new TaskConstraintGroup({
|
||||
name: 'cert-mutex',
|
||||
maxConcurrent: 1,
|
||||
constraintKeyForExecution: (task, input?: string) => {
|
||||
if (!input) return null;
|
||||
return extractTLD(input);
|
||||
},
|
||||
shouldExecute: (task, input?: string) => {
|
||||
if (!input) return true;
|
||||
// Skip if a wildcard cert already covers this TLD
|
||||
return certCache.get(extractTLD(input)) !== 'wildcard';
|
||||
},
|
||||
});
|
||||
|
||||
const getCert = new Task({
|
||||
name: 'get-certificate',
|
||||
taskFunction: async (domain: string) => {
|
||||
const cert = await acme.getCert(domain);
|
||||
if (cert.isWildcard) certCache.set(extractTLD(domain), 'wildcard');
|
||||
return cert;
|
||||
},
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(certConstraint);
|
||||
manager.addTask(getCert);
|
||||
|
||||
const r1 = manager.triggerTaskConstrained(getCert, 'app.example.com'); // runs, gets wildcard
|
||||
const r2 = manager.triggerTaskConstrained(getCert, 'api.example.com'); // queued → skipped!
|
||||
const r3 = manager.triggerTaskConstrained(getCert, 'my.other.org'); // parallel (different TLD)
|
||||
|
||||
const [cert1, cert2, cert3] = await Promise.all([r1, r2, r3]);
|
||||
// cert2 === undefined (skipped because wildcard already covers example.com)
|
||||
```
|
||||
|
||||
**`shouldExecute` semantics:**
|
||||
|
||||
- Runs right before execution (after slot acquisition, before `trigger()`)
|
||||
- Also checked on immediate (non-queued) triggers
|
||||
- Returns `false` → skip execution, deferred resolves with `undefined`
|
||||
- Can be async (return `Promise<boolean>`)
|
||||
- Has closure access to external state modified by prior executions
|
||||
- If multiple constraint groups have `shouldExecute`, **all** must return `true`
|
||||
|
||||
### Sliding Window Rate Limiting
|
||||
|
||||
Enforce "N completions per time window" with burst capability. Unlike `cooldownMs` (which forces even spacing between executions), `rateLimit` allows bursts up to the cap, then blocks until the window slides:
|
||||
|
||||
```typescript
|
||||
// Let's Encrypt style: 300 new orders per 3 hours
|
||||
const acmeRateLimit = new TaskConstraintGroup({
|
||||
name: 'acme-rate',
|
||||
constraintKeyForExecution: () => 'acme-account',
|
||||
rateLimit: {
|
||||
maxPerWindow: 300,
|
||||
windowMs: 3 * 60 * 60 * 1000, // 3 hours
|
||||
},
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(acmeRateLimit);
|
||||
|
||||
// All 300 can burst immediately. The 301st waits until the oldest
|
||||
// completion falls out of the 3-hour window.
|
||||
for (const domain of domains) {
|
||||
manager.triggerTaskConstrained(certTask, { domain });
|
||||
}
|
||||
```
|
||||
|
||||
Compose multiple rate limits for layered protection:
|
||||
|
||||
```typescript
|
||||
// Per-domain weekly cap AND global order rate
|
||||
const perDomainWeekly = new TaskConstraintGroup({
|
||||
name: 'per-domain-weekly',
|
||||
constraintKeyForExecution: (task, input) => input.registeredDomain,
|
||||
rateLimit: { maxPerWindow: 50, windowMs: 7 * 24 * 60 * 60 * 1000 },
|
||||
});
|
||||
|
||||
const globalOrderRate = new TaskConstraintGroup({
|
||||
name: 'global-order-rate',
|
||||
constraintKeyForExecution: () => 'global',
|
||||
rateLimit: { maxPerWindow: 300, windowMs: 3 * 60 * 60 * 1000 },
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(perDomainWeekly);
|
||||
manager.addConstraintGroup(globalOrderRate);
|
||||
```
|
||||
|
||||
Combine with `maxConcurrent` and `cooldownMs` for fine-grained control:
|
||||
|
||||
```typescript
|
||||
const throttled = new TaskConstraintGroup({
|
||||
name: 'acme-throttle',
|
||||
constraintKeyForExecution: () => 'acme',
|
||||
maxConcurrent: 5, // max 5 concurrent requests
|
||||
cooldownMs: 1000, // 1s gap after each completion
|
||||
rateLimit: {
|
||||
maxPerWindow: 300,
|
||||
windowMs: 3 * 60 * 60 * 1000,
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
### Result Sharing — Deduplication for Concurrent Requests
|
||||
|
||||
When multiple callers request the same resource concurrently, `resultSharingMode: 'share-latest'` ensures only one execution occurs. All queued waiters receive the same result:
|
||||
|
||||
```typescript
|
||||
const certMutex = new TaskConstraintGroup({
|
||||
name: 'cert-per-tld',
|
||||
constraintKeyForExecution: (task, input) => extractTld(input.domain),
|
||||
maxConcurrent: 1,
|
||||
resultSharingMode: 'share-latest',
|
||||
});
|
||||
|
||||
manager.addConstraintGroup(certMutex);
|
||||
|
||||
const certTask = new Task({
|
||||
name: 'obtain-cert',
|
||||
taskFunction: async (input) => {
|
||||
return await acmeClient.obtainWildcard(input.domain);
|
||||
},
|
||||
});
|
||||
manager.addTask(certTask);
|
||||
|
||||
// Three requests for *.example.com arrive simultaneously
|
||||
const [cert1, cert2, cert3] = await Promise.all([
|
||||
manager.triggerTaskConstrained(certTask, { domain: 'api.example.com' }),
|
||||
manager.triggerTaskConstrained(certTask, { domain: 'www.example.com' }),
|
||||
manager.triggerTaskConstrained(certTask, { domain: 'mail.example.com' }),
|
||||
]);
|
||||
|
||||
// Only ONE ACME request was made.
|
||||
// cert1 === cert2 === cert3 — all callers got the same cert object.
|
||||
```
|
||||
|
||||
**Result sharing semantics:**
|
||||
|
||||
- `shouldExecute` is NOT called for shared results (the task's purpose was already fulfilled)
|
||||
- Error results are NOT shared — queued tasks execute independently after a failure
|
||||
- `lastResults` persists until `reset()` — for time-bounded sharing, use `shouldExecute` to control staleness
|
||||
- Composable with rate limiting: rate-limited waiters get shared results without waiting for the window
|
||||
|
||||
### How It Works
|
||||
|
||||
When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager:
|
||||
|
||||
1. Evaluates all registered constraint groups against the task and input
|
||||
2. If no constraints apply (all matchers return `null`) → checks `shouldExecute` → runs or skips
|
||||
3. If all applicable constraints have capacity → acquires slots → checks `shouldExecute` → runs or skips
|
||||
4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained
|
||||
5. Cooldown/rate-limit-blocked tasks auto-retry after the shortest remaining delay expires
|
||||
6. Queued tasks check for shared results first (if any group has `resultSharingMode: 'share-latest'`)
|
||||
7. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned
|
||||
|
||||
## 🎯 Core Concepts
|
||||
|
||||
### Task Buffering — Intelligent Request Management
|
||||
@@ -95,7 +433,6 @@ const apiTask = new Task({
|
||||
name: 'APIRequest',
|
||||
buffered: true,
|
||||
bufferMax: 5, // Maximum 5 concurrent executions
|
||||
execDelay: 100, // Minimum 100ms between executions
|
||||
taskFunction: async (endpoint) => {
|
||||
return await fetch(endpoint).then((r) => r.json());
|
||||
},
|
||||
@@ -156,9 +493,9 @@ console.log(`Saved ${savedCount} items`);
|
||||
Taskchain also supports dynamic mutation:
|
||||
|
||||
```typescript
|
||||
pipeline.addTask(newTask); // Append to chain
|
||||
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
|
||||
pipeline.shiftTask(); // Remove & return first task
|
||||
pipeline.addTask(newTask); // Append to chain
|
||||
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
|
||||
pipeline.shiftTask(); // Remove & return first task
|
||||
```
|
||||
|
||||
Error context is rich — a chain failure includes the chain name, failing task name, task index, and preserves the original error as `.cause`.
|
||||
@@ -220,27 +557,6 @@ await initTask.trigger(); // No-op
|
||||
console.log(initTask.hasTriggered); // true
|
||||
```
|
||||
|
||||
### TaskRunner — Managed Queue with Concurrency Control
|
||||
|
||||
Process a queue of tasks with a configurable parallelism limit:
|
||||
|
||||
```typescript
|
||||
import { TaskRunner } from '@push.rocks/taskbuffer';
|
||||
|
||||
const runner = new TaskRunner();
|
||||
runner.setMaxParallelJobs(3); // Run up to 3 tasks concurrently
|
||||
|
||||
await runner.start();
|
||||
|
||||
runner.addTask(taskA);
|
||||
runner.addTask(taskB);
|
||||
runner.addTask(taskC);
|
||||
runner.addTask(taskD); // Queued until a slot opens
|
||||
|
||||
// When done:
|
||||
await runner.stop();
|
||||
```
|
||||
|
||||
## 🏷️ Labels — Multi-Tenant Task Filtering
|
||||
|
||||
Attach arbitrary key-value labels to any task for filtering, grouping, or multi-tenant isolation:
|
||||
@@ -411,6 +727,10 @@ manager.addTask(deployTask);
|
||||
manager.addAndScheduleTask(backupTask, '0 2 * * *'); // Daily at 2 AM
|
||||
manager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes
|
||||
|
||||
// Register constraint groups
|
||||
manager.addConstraintGroup(globalCap);
|
||||
manager.addConstraintGroup(perDomainMutex);
|
||||
|
||||
// Query metadata
|
||||
const meta = manager.getTaskMetadata('Deploy');
|
||||
console.log(meta);
|
||||
@@ -433,7 +753,7 @@ const allMeta = manager.getAllTasksMetadata();
|
||||
const scheduled = manager.getScheduledTasks();
|
||||
const nextRuns = manager.getNextScheduledRuns(5);
|
||||
|
||||
// Trigger by name
|
||||
// Trigger by name (routes through constraints)
|
||||
await manager.triggerTaskByName('Deploy');
|
||||
|
||||
// One-shot: add, execute, collect report, remove
|
||||
@@ -462,6 +782,12 @@ manager.removeTask(task); // Removes from map and unsubscribes event forwarding
|
||||
manager.descheduleTaskByName('Deploy'); // Remove cron schedule only
|
||||
```
|
||||
|
||||
### Remove Constraint Groups
|
||||
|
||||
```typescript
|
||||
manager.removeConstraintGroup('domain-mutex'); // By name
|
||||
```
|
||||
|
||||
## 🎨 Web Component Dashboard
|
||||
|
||||
Visualize your tasks in real-time with the included Lit-based web component:
|
||||
@@ -570,32 +896,6 @@ await task.trigger('SELECT * FROM users'); // Setup runs here
|
||||
await task.trigger('SELECT * FROM orders'); // Setup skipped, pool reused
|
||||
```
|
||||
|
||||
### Blocking Tasks
|
||||
|
||||
Make one task wait for another to finish before executing:
|
||||
|
||||
```typescript
|
||||
const initTask = new Task({
|
||||
name: 'Init',
|
||||
taskFunction: async () => {
|
||||
await initializeSystem();
|
||||
},
|
||||
});
|
||||
|
||||
const workerTask = new Task({
|
||||
name: 'Worker',
|
||||
taskFunction: async () => {
|
||||
await doWork();
|
||||
},
|
||||
});
|
||||
|
||||
workerTask.blockingTasks.push(initTask);
|
||||
|
||||
// Triggering worker will automatically wait for init to complete
|
||||
initTask.trigger();
|
||||
workerTask.trigger(); // Waits until initTask.finished resolves
|
||||
```
|
||||
|
||||
### Database Migration Pipeline
|
||||
|
||||
```typescript
|
||||
@@ -616,15 +916,24 @@ try {
|
||||
|
||||
### Multi-Tenant SaaS Monitoring
|
||||
|
||||
Combine labels + events for a real-time multi-tenant dashboard:
|
||||
Combine labels + events + constraints for a real-time multi-tenant system:
|
||||
|
||||
```typescript
|
||||
const manager = new TaskManager();
|
||||
|
||||
// Per-tenant concurrency limit
|
||||
const tenantLimit = new TaskConstraintGroup<{ tenantId: string }>({
|
||||
name: 'tenant-concurrency',
|
||||
maxConcurrent: 2,
|
||||
constraintKeyForExecution: (task, input?) => task.data.tenantId,
|
||||
});
|
||||
manager.addConstraintGroup(tenantLimit);
|
||||
|
||||
// Create tenant-scoped tasks
|
||||
function createTenantTask(tenantId: string, taskName: string, fn: () => Promise<any>) {
|
||||
const task = new Task({
|
||||
const task = new Task<undefined, [], { tenantId: string }>({
|
||||
name: `${tenantId}:${taskName}`,
|
||||
data: { tenantId },
|
||||
labels: { tenantId },
|
||||
taskFunction: fn,
|
||||
});
|
||||
@@ -653,15 +962,31 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
|
||||
|
||||
| Class | Description |
|
||||
| --- | --- |
|
||||
| `Task<T, TSteps>` | Core task unit with optional step tracking, labels, and event streaming |
|
||||
| `TaskManager` | Centralized orchestrator with scheduling, label queries, and aggregated events |
|
||||
| `Task<T, TSteps, TData>` | Core task unit with typed data, optional step tracking, labels, and event streaming |
|
||||
| `TaskManager` | Centralized orchestrator with constraint groups, scheduling, label queries, and aggregated events |
|
||||
| `TaskConstraintGroup<TData>` | Concurrency, mutual exclusion, and cooldown constraints with key-based grouping |
|
||||
| `Taskchain` | Sequential task executor with data flow between tasks |
|
||||
| `Taskparallel` | Concurrent task executor via `Promise.all()` |
|
||||
| `TaskOnce` | Single-execution guard |
|
||||
| `TaskDebounced` | Debounced task using rxjs |
|
||||
| `TaskRunner` | Sequential queue with configurable parallelism |
|
||||
| `TaskStep` | Step tracking unit (internal, exposed via metadata) |
|
||||
|
||||
### Task Constructor Options
|
||||
|
||||
| Option | Type | Default | Description |
|
||||
| --- | --- | --- | --- |
|
||||
| `taskFunction` | `ITaskFunction<T>` | *required* | The async function to execute |
|
||||
| `name` | `string` | — | Task identifier (required for TaskManager) |
|
||||
| `data` | `TData` | `{}` | Typed data bag for constraint matching and routing |
|
||||
| `steps` | `ReadonlyArray<{name, description, percentage}>` | — | Step definitions for progress tracking |
|
||||
| `buffered` | `boolean` | — | Enable request buffering |
|
||||
| `bufferMax` | `number` | — | Max buffered calls |
|
||||
| `preTask` | `Task \| () => Task` | — | Task to run before |
|
||||
| `afterTask` | `Task \| () => Task` | — | Task to run after |
|
||||
| `taskSetup` | `() => Promise<T>` | — | One-time setup function |
|
||||
| `catchErrors` | `boolean` | `false` | Swallow errors instead of rejecting |
|
||||
| `labels` | `Record<string, string>` | `{}` | Initial labels |
|
||||
|
||||
### Task Methods
|
||||
|
||||
| Method | Returns | Description |
|
||||
@@ -682,6 +1007,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
|
||||
| Property | Type | Description |
|
||||
| --- | --- | --- |
|
||||
| `name` | `string` | Task identifier |
|
||||
| `data` | `TData` | Typed data bag |
|
||||
| `running` | `boolean` | Whether the task is currently executing |
|
||||
| `idle` | `boolean` | Inverse of `running` |
|
||||
| `labels` | `Record<string, string>` | Attached labels |
|
||||
@@ -690,7 +1016,36 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
|
||||
| `errorCount` | `number` | Total error count across all runs |
|
||||
| `runCount` | `number` | Total execution count |
|
||||
| `lastRun` | `Date \| undefined` | Timestamp of last execution |
|
||||
| `blockingTasks` | `Task[]` | Tasks that must finish before this one starts |
|
||||
|
||||
### TaskConstraintGroup Constructor Options
|
||||
|
||||
| Option | Type | Default | Description |
|
||||
| --- | --- | --- | --- |
|
||||
| `name` | `string` | *required* | Constraint group identifier |
|
||||
| `constraintKeyForExecution` | `(task, input?) => string \| null` | *required* | Returns key for grouping, or `null` to skip. Receives both the task and runtime input. |
|
||||
| `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key |
|
||||
| `cooldownMs` | `number` | `0` | Minimum ms between completions per key |
|
||||
| `shouldExecute` | `(task, input?) => boolean \| Promise<boolean>` | — | Pre-execution check. Return `false` to skip; deferred resolves `undefined`. |
|
||||
| `rateLimit` | `IRateLimitConfig` | — | Sliding window: `{ maxPerWindow, windowMs }`. Counts running + completed tasks. |
|
||||
| `resultSharingMode` | `TResultSharingMode` | `'none'` | `'none'` or `'share-latest'`. Queued tasks get first task's result without executing. |
|
||||
|
||||
### TaskConstraintGroup Methods
|
||||
|
||||
| Method | Returns | Description |
|
||||
| --- | --- | --- |
|
||||
| `getConstraintKey(task, input?)` | `string \| null` | Get the constraint key for a task + input |
|
||||
| `checkShouldExecute(task, input?)` | `Promise<boolean>` | Run the `shouldExecute` callback (defaults to `true`) |
|
||||
| `canRun(key)` | `boolean` | Check if a slot is available (considers concurrency, cooldown, and rate limit) |
|
||||
| `acquireSlot(key)` | `void` | Claim a running slot |
|
||||
| `releaseSlot(key)` | `void` | Release a slot and record completion time + rate-limit timestamp |
|
||||
| `getCooldownRemaining(key)` | `number` | Milliseconds until cooldown expires |
|
||||
| `getRateLimitDelay(key)` | `number` | Milliseconds until a rate-limit slot opens |
|
||||
| `getNextAvailableDelay(key)` | `number` | Max of cooldown + rate-limit delay — unified "when can I run" |
|
||||
| `getRunningCount(key)` | `number` | Current running count for key |
|
||||
| `recordResult(key, result)` | `void` | Store result for sharing (no-op if mode is `'none'`) |
|
||||
| `getLastResult(key)` | `{result, timestamp} \| undefined` | Get last shared result for key |
|
||||
| `hasResultSharing()` | `boolean` | Whether result sharing is enabled |
|
||||
| `reset()` | `void` | Clear all state (running counts, cooldowns, rate-limit timestamps, shared results) |
|
||||
|
||||
### TaskManager Methods
|
||||
|
||||
@@ -699,7 +1054,11 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
|
||||
| `addTask(task)` | `void` | Register a task (wires event forwarding) |
|
||||
| `removeTask(task)` | `void` | Remove task and unsubscribe events |
|
||||
| `getTaskByName(name)` | `Task \| undefined` | Look up by name |
|
||||
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name |
|
||||
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name (routes through constraints) |
|
||||
| `triggerTask(task)` | `Promise<any>` | Trigger directly (routes through constraints) |
|
||||
| `triggerTaskConstrained(task, input?)` | `Promise<any>` | Core constraint evaluation entry point |
|
||||
| `addConstraintGroup(group)` | `void` | Register a constraint group |
|
||||
| `removeConstraintGroup(name)` | `void` | Remove a constraint group by name |
|
||||
| `addAndScheduleTask(task, cron)` | `void` | Register + schedule |
|
||||
| `scheduleTaskByName(name, cron)` | `void` | Schedule existing task |
|
||||
| `descheduleTaskByName(name)` | `void` | Remove schedule |
|
||||
@@ -719,6 +1078,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
|
||||
| --- | --- | --- |
|
||||
| `taskSubject` | `Subject<ITaskEvent>` | Aggregated events from all added tasks |
|
||||
| `taskMap` | `ObjectMap<Task>` | Internal task registry |
|
||||
| `constraintGroups` | `TaskConstraintGroup[]` | Registered constraint groups |
|
||||
|
||||
### Exported Types
|
||||
|
||||
@@ -726,11 +1086,15 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
|
||||
import type {
|
||||
ITaskMetadata,
|
||||
ITaskExecutionReport,
|
||||
ITaskExecution,
|
||||
IScheduledTaskInfo,
|
||||
ITaskEvent,
|
||||
TTaskEventType,
|
||||
ITaskStep,
|
||||
ITaskFunction,
|
||||
ITaskConstraintGroupOptions,
|
||||
IRateLimitConfig,
|
||||
TResultSharingMode,
|
||||
StepNames,
|
||||
} from '@push.rocks/taskbuffer';
|
||||
```
|
||||
|
||||
@@ -137,38 +137,7 @@ tap.test('should reject Taskparallel when a child task throws', async () => {
|
||||
expect(didReject).toBeTrue();
|
||||
});
|
||||
|
||||
// Test 7: TaskRunner continues processing after a task error
|
||||
tap.test('should continue TaskRunner queue after a task error', async () => {
|
||||
const runner = new taskbuffer.TaskRunner();
|
||||
const executionOrder: string[] = [];
|
||||
|
||||
const badTask = new taskbuffer.Task({
|
||||
name: 'runner-bad-task',
|
||||
taskFunction: async () => {
|
||||
executionOrder.push('bad');
|
||||
throw new Error('runner task failure');
|
||||
},
|
||||
});
|
||||
const goodTask = new taskbuffer.Task({
|
||||
name: 'runner-good-task',
|
||||
taskFunction: async () => {
|
||||
executionOrder.push('good');
|
||||
},
|
||||
});
|
||||
|
||||
await runner.start();
|
||||
runner.addTask(badTask);
|
||||
runner.addTask(goodTask);
|
||||
|
||||
// Wait for both tasks to be processed
|
||||
await smartdelay.delayFor(500);
|
||||
await runner.stop();
|
||||
|
||||
expect(executionOrder).toContain('bad');
|
||||
expect(executionOrder).toContain('good');
|
||||
});
|
||||
|
||||
// Test 8: BufferRunner handles errors without hanging
|
||||
// Test 7: BufferRunner handles errors without hanging
|
||||
tap.test('should handle BufferRunner errors without hanging', async () => {
|
||||
let callCount = 0;
|
||||
const bufferedTask = new taskbuffer.Task({
|
||||
|
||||
1435
test/test.13.constraints.ts
Normal file
1435
test/test.13.constraints.ts
Normal file
File diff suppressed because it is too large
Load Diff
@@ -36,12 +36,9 @@ tap.test('expect run tasks in sequence', async () => {
|
||||
});
|
||||
const testPromise = testTaskchain.trigger();
|
||||
await smartdelay.delayFor(2100);
|
||||
// tslint:disable-next-line:no-unused-expression
|
||||
expect(task1Executed).toBeTrue();
|
||||
// tslint:disable-next-line:no-unused-expression
|
||||
expect(task2Executed).toBeFalse();
|
||||
await smartdelay.delayFor(2100);
|
||||
// tslint:disable-next-line:no-unused-expression
|
||||
expect(task2Executed).toBeTrue();
|
||||
await testPromise;
|
||||
});
|
||||
|
||||
@@ -33,7 +33,6 @@ tap.test('should run the task as expected', async () => {
|
||||
);
|
||||
myTaskManager.start();
|
||||
await myTaskManager.triggerTaskByName('myTask');
|
||||
// tslint:disable-next-line:no-unused-expression
|
||||
expect(referenceBoolean).toBeTrue();
|
||||
});
|
||||
|
||||
|
||||
@@ -1,52 +1,151 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
|
||||
import * as taskbuffer from '../ts/index.js';
|
||||
import * as smartdelay from '@push.rocks/smartdelay';
|
||||
|
||||
let counter1 = 0;
|
||||
let counter2 = 0;
|
||||
let counter3 = 0;
|
||||
// Test 1: Basic buffered execution with afterTask chain
|
||||
tap.test('should run buffered tasks with afterTask chain', async () => {
|
||||
let counter1 = 0;
|
||||
let counter2 = 0;
|
||||
let counter3 = 0;
|
||||
|
||||
tap.test('should run buffered', async (tools) => {
|
||||
const task = new taskbuffer.Task({
|
||||
name: 'a buffered task',
|
||||
taskFunction: async () => {
|
||||
counter1++;
|
||||
await tools.delayFor(2000);
|
||||
console.log(`task 1 ran ${counter1} times`);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 1,
|
||||
afterTask: () => {
|
||||
return task2;
|
||||
},
|
||||
});
|
||||
const task2 = new taskbuffer.Task({
|
||||
name: 'a buffered task',
|
||||
taskFunction: async () => {
|
||||
counter2++;
|
||||
await tools.delayFor(2000);
|
||||
console.log(`task2 ran ${counter2} times`);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 1,
|
||||
afterTask: () => {
|
||||
return task3;
|
||||
},
|
||||
});
|
||||
const task3 = new taskbuffer.Task({
|
||||
name: 'a buffered task',
|
||||
name: 'buffered-chain-3',
|
||||
taskFunction: async () => {
|
||||
counter3++;
|
||||
await tools.delayFor(2000);
|
||||
console.log(`task3 ran ${counter3} times`);
|
||||
await smartdelay.delayFor(50);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 1,
|
||||
});
|
||||
while (counter1 < 10) {
|
||||
await tools.delayFor(5000);
|
||||
task.trigger();
|
||||
|
||||
const task2 = new taskbuffer.Task({
|
||||
name: 'buffered-chain-2',
|
||||
taskFunction: async () => {
|
||||
counter2++;
|
||||
await smartdelay.delayFor(50);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 1,
|
||||
afterTask: () => task3,
|
||||
});
|
||||
|
||||
const task1 = new taskbuffer.Task({
|
||||
name: 'buffered-chain-1',
|
||||
taskFunction: async () => {
|
||||
counter1++;
|
||||
await smartdelay.delayFor(50);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 1,
|
||||
afterTask: () => task2,
|
||||
});
|
||||
|
||||
// Trigger 3 times with enough spacing for the chain to complete
|
||||
for (let i = 0; i < 3; i++) {
|
||||
task1.trigger();
|
||||
await smartdelay.delayFor(250); // enough for chain of 3 x 50ms tasks
|
||||
}
|
||||
|
||||
// Wait for final chain to finish
|
||||
await smartdelay.delayFor(500);
|
||||
|
||||
// Each task in the chain should have run at least once
|
||||
expect(counter1).toBeGreaterThanOrEqual(1);
|
||||
expect(counter2).toBeGreaterThanOrEqual(1);
|
||||
expect(counter3).toBeGreaterThanOrEqual(1);
|
||||
|
||||
// afterTask chain means task2 count should match task1 (each trigger chains)
|
||||
expect(counter2).toEqual(counter1);
|
||||
expect(counter3).toEqual(counter1);
|
||||
});
|
||||
|
||||
tap.start();
|
||||
// Test 2: bufferMax limits concurrent buffered executions
|
||||
tap.test('should respect bufferMax for concurrent buffered calls', async () => {
|
||||
let running = 0;
|
||||
let maxRunning = 0;
|
||||
let totalRuns = 0;
|
||||
|
||||
const task = new taskbuffer.Task({
|
||||
name: 'buffer-max-test',
|
||||
taskFunction: async () => {
|
||||
running++;
|
||||
maxRunning = Math.max(maxRunning, running);
|
||||
totalRuns++;
|
||||
await smartdelay.delayFor(100);
|
||||
running--;
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 2,
|
||||
});
|
||||
|
||||
// Fire many triggers rapidly — only bufferMax should run concurrently
|
||||
for (let i = 0; i < 10; i++) {
|
||||
task.trigger();
|
||||
}
|
||||
|
||||
// Wait for all buffered executions to complete
|
||||
await smartdelay.delayFor(1000);
|
||||
|
||||
expect(maxRunning).toBeLessThanOrEqual(2);
|
||||
expect(totalRuns).toBeGreaterThanOrEqual(1);
|
||||
});
|
||||
|
||||
// Test 3: bufferMax limits how many runs are queued during execution
|
||||
tap.test('should limit queued runs to bufferMax during execution', async () => {
|
||||
let runCount = 0;
|
||||
|
||||
const task = new taskbuffer.Task({
|
||||
name: 'buffer-queue-test',
|
||||
taskFunction: async () => {
|
||||
runCount++;
|
||||
await smartdelay.delayFor(100);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 2,
|
||||
});
|
||||
|
||||
// Rapid-fire 5 triggers — bufferMax:2 means counter caps at 2
|
||||
// so only 2 runs will happen (the initial run + 1 buffered rerun)
|
||||
task.trigger();
|
||||
task.trigger();
|
||||
task.trigger();
|
||||
task.trigger();
|
||||
task.trigger();
|
||||
|
||||
await smartdelay.delayFor(500);
|
||||
|
||||
expect(runCount).toEqual(2);
|
||||
});
|
||||
|
||||
// Test 4: Triggers spaced after completion queue new runs
|
||||
tap.test('should re-trigger after previous buffered run completes', async () => {
|
||||
let runCount = 0;
|
||||
|
||||
const task = new taskbuffer.Task({
|
||||
name: 'retrigger-test',
|
||||
taskFunction: async () => {
|
||||
runCount++;
|
||||
await smartdelay.delayFor(50);
|
||||
},
|
||||
buffered: true,
|
||||
bufferMax: 1,
|
||||
});
|
||||
|
||||
// First trigger starts execution
|
||||
task.trigger();
|
||||
// Wait for it to complete
|
||||
await smartdelay.delayFor(100);
|
||||
|
||||
// Second trigger starts a new execution (task is now idle)
|
||||
task.trigger();
|
||||
await smartdelay.delayFor(100);
|
||||
|
||||
// Third trigger
|
||||
task.trigger();
|
||||
await smartdelay.delayFor(100);
|
||||
|
||||
expect(runCount).toEqual(3);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
|
||||
import * as taskbuffer from '../ts/index.js';
|
||||
|
||||
let testTaskRunner: taskbuffer.TaskRunner;
|
||||
|
||||
tap.test('should create a valid taskrunner', async () => {
|
||||
testTaskRunner = new taskbuffer.TaskRunner();
|
||||
await testTaskRunner.start();
|
||||
});
|
||||
|
||||
tap.test('should execute task when its scheduled', async (tools) => {
|
||||
const done = tools.defer();
|
||||
testTaskRunner.addTask(
|
||||
new taskbuffer.Task({
|
||||
taskFunction: async () => {
|
||||
console.log('hi');
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
testTaskRunner.addTask(
|
||||
new taskbuffer.Task({
|
||||
taskFunction: async () => {
|
||||
console.log('there');
|
||||
done.resolve();
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/taskbuffer',
|
||||
version: '4.1.1',
|
||||
version: '6.1.2',
|
||||
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
|
||||
}
|
||||
|
||||
@@ -4,15 +4,15 @@ export { Taskchain } from './taskbuffer.classes.taskchain.js';
|
||||
export { Taskparallel } from './taskbuffer.classes.taskparallel.js';
|
||||
export { TaskManager } from './taskbuffer.classes.taskmanager.js';
|
||||
export { TaskOnce } from './taskbuffer.classes.taskonce.js';
|
||||
export { TaskRunner } from './taskbuffer.classes.taskrunner.js';
|
||||
export { TaskDebounced } from './taskbuffer.classes.taskdebounced.js';
|
||||
export { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
|
||||
|
||||
// Task step system
|
||||
export { TaskStep } from './taskbuffer.classes.taskstep.js';
|
||||
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
|
||||
|
||||
// Metadata interfaces
|
||||
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js';
|
||||
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
|
||||
|
||||
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
|
||||
export { distributedCoordination };
|
||||
|
||||
@@ -6,7 +6,7 @@ export class BufferRunner {
|
||||
// initialize by default
|
||||
public bufferCounter: number = 0;
|
||||
|
||||
constructor(taskArg: Task<any>) {
|
||||
constructor(taskArg: Task<any, any, any>) {
|
||||
this.task = taskArg;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ export interface ICycleObject {
|
||||
export class CycleCounter {
|
||||
public task: Task;
|
||||
public cycleObjectArray: ICycleObject[] = [];
|
||||
constructor(taskArg: Task<any>) {
|
||||
constructor(taskArg: Task<any, any, any>) {
|
||||
this.task = taskArg;
|
||||
}
|
||||
public getPromiseForCycle(cycleCountArg: number) {
|
||||
|
||||
@@ -19,18 +19,18 @@ export type TPreOrAfterTaskFunction = () => Task<any>;
|
||||
// Type helper to extract step names from array
|
||||
export type StepNames<T> = T extends ReadonlyArray<{ name: infer N }> ? N : never;
|
||||
|
||||
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []> {
|
||||
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = [], TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
public static extractTask<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
|
||||
preOrAfterTaskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
|
||||
): Task<T, TSteps> {
|
||||
preOrAfterTaskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
|
||||
): Task<T, TSteps, any> {
|
||||
switch (true) {
|
||||
case !preOrAfterTaskArg:
|
||||
return null;
|
||||
case preOrAfterTaskArg instanceof Task:
|
||||
return preOrAfterTaskArg as Task<T, TSteps>;
|
||||
return preOrAfterTaskArg as Task<T, TSteps, any>;
|
||||
case typeof preOrAfterTaskArg === 'function':
|
||||
const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction;
|
||||
return taskFunction() as unknown as Task<T, TSteps>;
|
||||
return taskFunction() as unknown as Task<T, TSteps, any>;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
@@ -42,7 +42,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
return done.promise;
|
||||
};
|
||||
|
||||
public static isTask = (taskArg: Task<any>): boolean => {
|
||||
public static isTask = (taskArg: Task<any, any, any>): boolean => {
|
||||
if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') {
|
||||
return true;
|
||||
} else {
|
||||
@@ -51,8 +51,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
};
|
||||
|
||||
public static isTaskTouched<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
|
||||
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
|
||||
touchedTasksArray: Task<T, TSteps>[],
|
||||
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
|
||||
touchedTasksArray: Task<T, TSteps, any>[],
|
||||
): boolean {
|
||||
const taskToCheck = Task.extractTask(taskArg);
|
||||
let result = false;
|
||||
@@ -65,25 +65,16 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
}
|
||||
|
||||
public static runTask = async <T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
|
||||
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
|
||||
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps>[] },
|
||||
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
|
||||
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps, any>[] },
|
||||
) => {
|
||||
const taskToRun = Task.extractTask(taskArg);
|
||||
const done = plugins.smartpromise.defer();
|
||||
|
||||
// Wait for all blocking tasks to finish
|
||||
for (const task of taskToRun.blockingTasks) {
|
||||
await task.finished;
|
||||
}
|
||||
|
||||
if (!taskToRun.setupValue && taskToRun.taskSetup) {
|
||||
taskToRun.setupValue = await taskToRun.taskSetup();
|
||||
}
|
||||
|
||||
if (taskToRun.execDelay) {
|
||||
await plugins.smartdelay.delayFor(taskToRun.execDelay);
|
||||
}
|
||||
|
||||
taskToRun.running = true;
|
||||
taskToRun.runCount++;
|
||||
taskToRun.lastRun = new Date();
|
||||
@@ -100,26 +91,10 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
// Complete all steps when task finishes
|
||||
taskToRun.completeAllSteps();
|
||||
taskToRun.emitEvent(taskToRun.lastError ? 'failed' : 'completed');
|
||||
|
||||
// When the task has finished running, resolve the finished promise
|
||||
taskToRun.resolveFinished();
|
||||
|
||||
// Create a new finished promise for the next run
|
||||
taskToRun.finished = new Promise((resolve) => {
|
||||
taskToRun.resolveFinished = resolve;
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
taskToRun.running = false;
|
||||
taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) });
|
||||
|
||||
// Resolve finished so blocking dependants don't hang
|
||||
taskToRun.resolveFinished();
|
||||
|
||||
// Create a new finished promise for the next run
|
||||
taskToRun.finished = new Promise((resolve) => {
|
||||
taskToRun.resolveFinished = resolve;
|
||||
});
|
||||
});
|
||||
|
||||
const options = {
|
||||
@@ -127,7 +102,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
...optionsArg,
|
||||
};
|
||||
const x = options.x;
|
||||
const touchedTasksArray: Task<T, TSteps>[] = options.touchedTasksArray;
|
||||
const touchedTasksArray: Task<T, TSteps, any>[] = options.touchedTasksArray;
|
||||
|
||||
touchedTasksArray.push(taskToRun);
|
||||
|
||||
@@ -198,18 +173,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
public cronJob: plugins.smarttime.CronJob;
|
||||
|
||||
public bufferMax: number;
|
||||
public execDelay: number;
|
||||
public timeout: number;
|
||||
|
||||
public preTask: Task<T, any> | TPreOrAfterTaskFunction;
|
||||
public afterTask: Task<T, any> | TPreOrAfterTaskFunction;
|
||||
public data: TData;
|
||||
|
||||
// Add a list to store the blocking tasks
|
||||
public blockingTasks: Task[] = [];
|
||||
|
||||
// Add a promise that will resolve when the task has finished
|
||||
private finished: Promise<void>;
|
||||
private resolveFinished: () => void;
|
||||
public preTask: Task<T, any, any> | TPreOrAfterTaskFunction;
|
||||
public afterTask: Task<T, any, any> | TPreOrAfterTaskFunction;
|
||||
|
||||
public running: boolean = false;
|
||||
public bufferRunner = new BufferRunner(this);
|
||||
@@ -275,12 +244,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
|
||||
constructor(optionsArg: {
|
||||
taskFunction: ITaskFunction<T>;
|
||||
preTask?: Task<T, any> | TPreOrAfterTaskFunction;
|
||||
afterTask?: Task<T, any> | TPreOrAfterTaskFunction;
|
||||
preTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
|
||||
afterTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
|
||||
buffered?: boolean;
|
||||
bufferMax?: number;
|
||||
execDelay?: number;
|
||||
name?: string;
|
||||
data?: TData;
|
||||
taskSetup?: ITaskSetupFunction<T>;
|
||||
steps?: TSteps;
|
||||
catchErrors?: boolean;
|
||||
@@ -291,8 +260,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
this.afterTask = optionsArg.afterTask;
|
||||
this.buffered = optionsArg.buffered;
|
||||
this.bufferMax = optionsArg.bufferMax;
|
||||
this.execDelay = optionsArg.execDelay;
|
||||
this.name = optionsArg.name;
|
||||
this.data = optionsArg.data ?? ({} as TData);
|
||||
this.taskSetup = optionsArg.taskSetup;
|
||||
this.catchErrors = optionsArg.catchErrors ?? false;
|
||||
this.labels = optionsArg.labels ? { ...optionsArg.labels } : {};
|
||||
@@ -309,11 +278,6 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
|
||||
this.steps.set(stepConfig.name, step);
|
||||
}
|
||||
}
|
||||
|
||||
// Create the finished promise
|
||||
this.finished = new Promise((resolve) => {
|
||||
this.resolveFinished = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
public trigger(x?: any): Promise<any> {
|
||||
|
||||
164
ts/taskbuffer.classes.taskconstraintgroup.ts
Normal file
164
ts/taskbuffer.classes.taskconstraintgroup.ts
Normal file
@@ -0,0 +1,164 @@
|
||||
import type { Task } from './taskbuffer.classes.task.js';
|
||||
import type { ITaskConstraintGroupOptions, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
|
||||
|
||||
export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
public name: string;
|
||||
public maxConcurrent: number;
|
||||
public cooldownMs: number;
|
||||
public rateLimit: IRateLimitConfig | null;
|
||||
public resultSharingMode: TResultSharingMode;
|
||||
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
|
||||
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
|
||||
|
||||
private runningCounts = new Map<string, number>();
|
||||
private lastCompletionTimes = new Map<string, number>();
|
||||
private completionTimestamps = new Map<string, number[]>();
|
||||
private lastResults = new Map<string, { result: any; timestamp: number }>();
|
||||
|
||||
constructor(options: ITaskConstraintGroupOptions<TData>) {
|
||||
this.name = options.name;
|
||||
this.constraintKeyForExecution = options.constraintKeyForExecution;
|
||||
this.maxConcurrent = options.maxConcurrent ?? Infinity;
|
||||
this.cooldownMs = options.cooldownMs ?? 0;
|
||||
this.shouldExecuteFn = options.shouldExecute;
|
||||
this.rateLimit = options.rateLimit ?? null;
|
||||
this.resultSharingMode = options.resultSharingMode ?? 'none';
|
||||
}
|
||||
|
||||
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
|
||||
const key = this.constraintKeyForExecution(task, input);
|
||||
return key ?? null;
|
||||
}
|
||||
|
||||
public async checkShouldExecute(task: Task<any, any, TData>, input?: any): Promise<boolean> {
|
||||
if (!this.shouldExecuteFn) {
|
||||
return true;
|
||||
}
|
||||
return this.shouldExecuteFn(task, input);
|
||||
}
|
||||
|
||||
public canRun(subGroupKey: string): boolean {
|
||||
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
if (running >= this.maxConcurrent) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this.cooldownMs > 0) {
|
||||
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
|
||||
if (lastCompletion !== undefined) {
|
||||
const elapsed = Date.now() - lastCompletion;
|
||||
if (elapsed < this.cooldownMs) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.rateLimit) {
|
||||
this.pruneCompletionTimestamps(subGroupKey);
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey);
|
||||
const completedInWindow = timestamps ? timestamps.length : 0;
|
||||
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
if (completedInWindow + running >= this.rateLimit.maxPerWindow) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public acquireSlot(subGroupKey: string): void {
|
||||
const current = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
this.runningCounts.set(subGroupKey, current + 1);
|
||||
}
|
||||
|
||||
public releaseSlot(subGroupKey: string): void {
|
||||
const current = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
const next = Math.max(0, current - 1);
|
||||
if (next === 0) {
|
||||
this.runningCounts.delete(subGroupKey);
|
||||
} else {
|
||||
this.runningCounts.set(subGroupKey, next);
|
||||
}
|
||||
this.lastCompletionTimes.set(subGroupKey, Date.now());
|
||||
|
||||
if (this.rateLimit) {
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey) ?? [];
|
||||
timestamps.push(Date.now());
|
||||
this.completionTimestamps.set(subGroupKey, timestamps);
|
||||
}
|
||||
}
|
||||
|
||||
public getCooldownRemaining(subGroupKey: string): number {
|
||||
if (this.cooldownMs <= 0) {
|
||||
return 0;
|
||||
}
|
||||
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
|
||||
if (lastCompletion === undefined) {
|
||||
return 0;
|
||||
}
|
||||
const elapsed = Date.now() - lastCompletion;
|
||||
return Math.max(0, this.cooldownMs - elapsed);
|
||||
}
|
||||
|
||||
public getRunningCount(subGroupKey: string): number {
|
||||
return this.runningCounts.get(subGroupKey) ?? 0;
|
||||
}
|
||||
|
||||
// Rate limit helpers
|
||||
private pruneCompletionTimestamps(subGroupKey: string): void {
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey);
|
||||
if (!timestamps || !this.rateLimit) return;
|
||||
const cutoff = Date.now() - this.rateLimit.windowMs;
|
||||
let i = 0;
|
||||
while (i < timestamps.length && timestamps[i] <= cutoff) {
|
||||
i++;
|
||||
}
|
||||
if (i > 0) {
|
||||
timestamps.splice(0, i);
|
||||
}
|
||||
}
|
||||
|
||||
public getRateLimitDelay(subGroupKey: string): number {
|
||||
if (!this.rateLimit) return 0;
|
||||
this.pruneCompletionTimestamps(subGroupKey);
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey);
|
||||
const completedInWindow = timestamps ? timestamps.length : 0;
|
||||
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
if (completedInWindow + running < this.rateLimit.maxPerWindow) {
|
||||
return 0;
|
||||
}
|
||||
// If only running tasks fill the window (no completions yet), we can't compute a delay
|
||||
if (!timestamps || timestamps.length === 0) {
|
||||
return 1; // minimal delay; drain will re-check after running tasks complete
|
||||
}
|
||||
// The oldest timestamp in the window determines when a slot opens
|
||||
const oldestInWindow = timestamps[0];
|
||||
const expiry = oldestInWindow + this.rateLimit.windowMs;
|
||||
return Math.max(0, expiry - Date.now());
|
||||
}
|
||||
|
||||
public getNextAvailableDelay(subGroupKey: string): number {
|
||||
return Math.max(this.getCooldownRemaining(subGroupKey), this.getRateLimitDelay(subGroupKey));
|
||||
}
|
||||
|
||||
// Result sharing helpers
|
||||
public recordResult(subGroupKey: string, result: any): void {
|
||||
if (this.resultSharingMode === 'none') return;
|
||||
this.lastResults.set(subGroupKey, { result, timestamp: Date.now() });
|
||||
}
|
||||
|
||||
public getLastResult(subGroupKey: string): { result: any; timestamp: number } | undefined {
|
||||
return this.lastResults.get(subGroupKey);
|
||||
}
|
||||
|
||||
public hasResultSharing(): boolean {
|
||||
return this.resultSharingMode !== 'none';
|
||||
}
|
||||
|
||||
public reset(): void {
|
||||
this.runningCounts.clear();
|
||||
this.lastCompletionTimes.clear();
|
||||
this.completionTimestamps.clear();
|
||||
this.lastResults.clear();
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,11 @@
|
||||
import * as plugins from './taskbuffer.plugins.js';
|
||||
import { Task } from './taskbuffer.classes.task.js';
|
||||
import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
|
||||
import {
|
||||
AbstractDistributedCoordinator,
|
||||
type IDistributedTaskRequestResult,
|
||||
} from './taskbuffer.classes.distributedcoordinator.js';
|
||||
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js';
|
||||
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js';
|
||||
import { logger } from './taskbuffer.logging.js';
|
||||
|
||||
export interface ICronJob {
|
||||
@@ -19,23 +20,28 @@ export interface ITaskManagerConstructorOptions {
|
||||
|
||||
export class TaskManager {
|
||||
public randomId = plugins.smartunique.shortId();
|
||||
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
|
||||
public taskMap = new plugins.lik.ObjectMap<Task<any, any, any>>();
|
||||
public readonly taskSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
|
||||
private taskSubscriptions = new Map<Task<any, any>, plugins.smartrx.rxjs.Subscription>();
|
||||
private taskSubscriptions = new Map<Task<any, any, any>, plugins.smartrx.rxjs.Subscription>();
|
||||
private cronJobManager = new plugins.smarttime.CronManager();
|
||||
public options: ITaskManagerConstructorOptions = {
|
||||
distributedCoordinator: null,
|
||||
};
|
||||
|
||||
// Constraint system
|
||||
public constraintGroups: TaskConstraintGroup<any>[] = [];
|
||||
private constraintQueue: IConstrainedTaskEntry[] = [];
|
||||
private drainTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
constructor(options: ITaskManagerConstructorOptions = {}) {
|
||||
this.options = Object.assign(this.options, options);
|
||||
}
|
||||
|
||||
public getTaskByName(taskName: string): Task<any, any> {
|
||||
public getTaskByName(taskName: string): Task<any, any, any> {
|
||||
return this.taskMap.findSync((task) => task.name === taskName);
|
||||
}
|
||||
|
||||
public addTask(task: Task<any, any>): void {
|
||||
public addTask(task: Task<any, any, any>): void {
|
||||
if (!task.name) {
|
||||
throw new Error('Task must have a name to be added to taskManager');
|
||||
}
|
||||
@@ -46,7 +52,7 @@ export class TaskManager {
|
||||
this.taskSubscriptions.set(task, subscription);
|
||||
}
|
||||
|
||||
public removeTask(task: Task<any, any>): void {
|
||||
public removeTask(task: Task<any, any, any>): void {
|
||||
this.taskMap.remove(task);
|
||||
const subscription = this.taskSubscriptions.get(task);
|
||||
if (subscription) {
|
||||
@@ -55,21 +61,189 @@ export class TaskManager {
|
||||
}
|
||||
}
|
||||
|
||||
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
|
||||
public addAndScheduleTask(task: Task<any, any, any>, cronString: string) {
|
||||
this.addTask(task);
|
||||
this.scheduleTaskByName(task.name, cronString);
|
||||
}
|
||||
|
||||
// Constraint group management
|
||||
public addConstraintGroup(group: TaskConstraintGroup<any>): void {
|
||||
this.constraintGroups.push(group);
|
||||
}
|
||||
|
||||
public removeConstraintGroup(name: string): void {
|
||||
this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name);
|
||||
}
|
||||
|
||||
// Core constraint evaluation
|
||||
public async triggerTaskConstrained(task: Task<any, any, any>, input?: any): Promise<any> {
|
||||
// Gather applicable constraints
|
||||
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
|
||||
for (const group of this.constraintGroups) {
|
||||
const key = group.getConstraintKey(task, input);
|
||||
if (key !== null) {
|
||||
applicableGroups.push({ group, key });
|
||||
}
|
||||
}
|
||||
|
||||
// No constraints apply → check shouldExecute then trigger directly
|
||||
if (applicableGroups.length === 0) {
|
||||
const shouldRun = await this.checkAllShouldExecute(task, input);
|
||||
if (!shouldRun) {
|
||||
return undefined;
|
||||
}
|
||||
return task.trigger(input);
|
||||
}
|
||||
|
||||
// Check if all constraints allow running
|
||||
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
|
||||
if (allCanRun) {
|
||||
return this.executeWithConstraintTracking(task, input, applicableGroups);
|
||||
}
|
||||
|
||||
// Blocked → enqueue with deferred promise and cached constraint keys
|
||||
const deferred = plugins.smartpromise.defer<any>();
|
||||
const constraintKeys = new Map<string, string>();
|
||||
for (const { group, key } of applicableGroups) {
|
||||
constraintKeys.set(group.name, key);
|
||||
}
|
||||
this.constraintQueue.push({ task, input, deferred, constraintKeys });
|
||||
return deferred.promise;
|
||||
}
|
||||
|
||||
private async checkAllShouldExecute(task: Task<any, any, any>, input?: any): Promise<boolean> {
|
||||
for (const group of this.constraintGroups) {
|
||||
const shouldRun = await group.checkShouldExecute(task, input);
|
||||
if (!shouldRun) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private async executeWithConstraintTracking(
|
||||
task: Task<any, any, any>,
|
||||
input: any,
|
||||
groups: Array<{ group: TaskConstraintGroup<any>; key: string }>,
|
||||
): Promise<any> {
|
||||
// Acquire slots synchronously to prevent race conditions
|
||||
for (const { group, key } of groups) {
|
||||
group.acquireSlot(key);
|
||||
}
|
||||
|
||||
// Check shouldExecute after acquiring slots
|
||||
const shouldRun = await this.checkAllShouldExecute(task, input);
|
||||
if (!shouldRun) {
|
||||
// Release slots and drain queue
|
||||
for (const { group, key } of groups) {
|
||||
group.releaseSlot(key);
|
||||
}
|
||||
this.drainConstraintQueue();
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await task.trigger(input);
|
||||
// Record result for groups with result sharing (only on true success, not caught errors)
|
||||
if (!task.lastError) {
|
||||
for (const { group, key } of groups) {
|
||||
group.recordResult(key, result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} finally {
|
||||
// Release slots
|
||||
for (const { group, key } of groups) {
|
||||
group.releaseSlot(key);
|
||||
}
|
||||
this.drainConstraintQueue();
|
||||
}
|
||||
}
|
||||
|
||||
private drainConstraintQueue(): void {
|
||||
let shortestCooldown = Infinity;
|
||||
const stillQueued: IConstrainedTaskEntry[] = [];
|
||||
|
||||
for (const entry of this.constraintQueue) {
|
||||
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
|
||||
for (const group of this.constraintGroups) {
|
||||
const key = group.getConstraintKey(entry.task, entry.input);
|
||||
if (key !== null) {
|
||||
applicableGroups.push({ group, key });
|
||||
}
|
||||
}
|
||||
|
||||
// No constraints apply anymore (group removed?) → check shouldExecute then run
|
||||
if (applicableGroups.length === 0) {
|
||||
this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => {
|
||||
if (!shouldRun) {
|
||||
entry.deferred.resolve(undefined);
|
||||
return;
|
||||
}
|
||||
entry.task.trigger(entry.input).then(
|
||||
(result) => entry.deferred.resolve(result),
|
||||
(err) => entry.deferred.reject(err),
|
||||
);
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check result sharing — if any applicable group has a shared result, resolve immediately
|
||||
const sharingGroups = applicableGroups.filter(({ group }) => group.hasResultSharing());
|
||||
if (sharingGroups.length > 0) {
|
||||
const groupWithResult = sharingGroups.find(({ group, key }) =>
|
||||
group.getLastResult(key) !== undefined
|
||||
);
|
||||
if (groupWithResult) {
|
||||
entry.deferred.resolve(groupWithResult.group.getLastResult(groupWithResult.key)!.result);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
|
||||
if (allCanRun) {
|
||||
// executeWithConstraintTracking handles shouldExecute check internally
|
||||
this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then(
|
||||
(result) => entry.deferred.resolve(result),
|
||||
(err) => entry.deferred.reject(err),
|
||||
);
|
||||
} else {
|
||||
stillQueued.push(entry);
|
||||
// Track shortest delay for timer scheduling (cooldown + rate limit)
|
||||
for (const { group, key } of applicableGroups) {
|
||||
const remaining = group.getNextAvailableDelay(key);
|
||||
if (remaining > 0 && remaining < shortestCooldown) {
|
||||
shortestCooldown = remaining;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.constraintQueue = stillQueued;
|
||||
|
||||
// Schedule next drain if there are cooldown-blocked entries
|
||||
if (this.drainTimer) {
|
||||
clearTimeout(this.drainTimer);
|
||||
this.drainTimer = null;
|
||||
}
|
||||
if (stillQueued.length > 0 && shortestCooldown < Infinity) {
|
||||
this.drainTimer = setTimeout(() => {
|
||||
this.drainTimer = null;
|
||||
this.drainConstraintQueue();
|
||||
}, shortestCooldown + 1);
|
||||
}
|
||||
}
|
||||
|
||||
public async triggerTaskByName(taskName: string): Promise<any> {
|
||||
const taskToTrigger = this.getTaskByName(taskName);
|
||||
if (!taskToTrigger) {
|
||||
throw new Error(`No task with the name ${taskName} found.`);
|
||||
}
|
||||
return taskToTrigger.trigger();
|
||||
return this.triggerTaskConstrained(taskToTrigger);
|
||||
}
|
||||
|
||||
public async triggerTask(task: Task<any, any>) {
|
||||
return task.trigger();
|
||||
public async triggerTask(task: Task<any, any, any>) {
|
||||
return this.triggerTaskConstrained(task);
|
||||
}
|
||||
|
||||
public scheduleTaskByName(taskName: string, cronString: string) {
|
||||
@@ -80,7 +254,7 @@ export class TaskManager {
|
||||
this.handleTaskScheduling(taskToSchedule, cronString);
|
||||
}
|
||||
|
||||
private handleTaskScheduling(task: Task<any, any>, cronString: string) {
|
||||
private handleTaskScheduling(task: Task<any, any, any>, cronString: string) {
|
||||
const cronJob = this.cronJobManager.addCronjob(
|
||||
cronString,
|
||||
async (triggerTime: number) => {
|
||||
@@ -98,7 +272,7 @@ export class TaskManager {
|
||||
}
|
||||
}
|
||||
try {
|
||||
await task.trigger();
|
||||
await this.triggerTaskConstrained(task);
|
||||
} catch (err) {
|
||||
logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
@@ -107,7 +281,7 @@ export class TaskManager {
|
||||
task.cronJob = cronJob;
|
||||
}
|
||||
|
||||
private logTaskState(task: Task<any, any>) {
|
||||
private logTaskState(task: Task<any, any, any>) {
|
||||
logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`);
|
||||
const bufferState = task.buffered
|
||||
? `buffered with max ${task.bufferMax} buffered calls`
|
||||
@@ -116,7 +290,7 @@ export class TaskManager {
|
||||
}
|
||||
|
||||
private async performDistributedConsultation(
|
||||
task: Task<any, any>,
|
||||
task: Task<any, any, any>,
|
||||
triggerTime: number,
|
||||
): Promise<IDistributedTaskRequestResult> {
|
||||
logger.log('info', 'Found a distributed coordinator, performing consultation.');
|
||||
@@ -144,7 +318,7 @@ export class TaskManager {
|
||||
}
|
||||
}
|
||||
|
||||
public async descheduleTask(task: Task<any, any>) {
|
||||
public async descheduleTask(task: Task<any, any, any>) {
|
||||
await this.descheduleTaskByName(task.name);
|
||||
}
|
||||
|
||||
@@ -169,6 +343,10 @@ export class TaskManager {
|
||||
subscription.unsubscribe();
|
||||
}
|
||||
this.taskSubscriptions.clear();
|
||||
if (this.drainTimer) {
|
||||
clearTimeout(this.drainTimer);
|
||||
this.drainTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Get metadata for a specific task
|
||||
@@ -186,7 +364,7 @@ export class TaskManager {
|
||||
// Get scheduled tasks with their schedules and next run times
|
||||
public getScheduledTasks(): IScheduledTaskInfo[] {
|
||||
const scheduledTasks: IScheduledTaskInfo[] = [];
|
||||
|
||||
|
||||
for (const task of this.taskMap.getArray()) {
|
||||
if (task.cronJob) {
|
||||
scheduledTasks.push({
|
||||
@@ -199,7 +377,7 @@ export class TaskManager {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return scheduledTasks;
|
||||
}
|
||||
|
||||
@@ -213,11 +391,11 @@ export class TaskManager {
|
||||
}))
|
||||
.sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime())
|
||||
.slice(0, limit);
|
||||
|
||||
|
||||
return scheduledRuns;
|
||||
}
|
||||
|
||||
public getTasksByLabel(key: string, value: string): Task<any, any>[] {
|
||||
public getTasksByLabel(key: string, value: string): Task<any, any, any>[] {
|
||||
return this.taskMap.getArray().filter(task => task.labels[key] === value);
|
||||
}
|
||||
|
||||
@@ -227,7 +405,7 @@ export class TaskManager {
|
||||
|
||||
// Add, execute, and remove a task while collecting metadata
|
||||
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
|
||||
task: Task<T, TSteps>,
|
||||
task: Task<T, TSteps, any>,
|
||||
options?: {
|
||||
schedule?: string;
|
||||
trackProgress?: boolean;
|
||||
@@ -235,19 +413,18 @@ export class TaskManager {
|
||||
): Promise<ITaskExecutionReport> {
|
||||
// Add task to manager
|
||||
this.addTask(task);
|
||||
|
||||
|
||||
// Optionally schedule it
|
||||
if (options?.schedule) {
|
||||
this.scheduleTaskByName(task.name!, options.schedule);
|
||||
}
|
||||
|
||||
|
||||
const startTime = Date.now();
|
||||
const progressUpdates: Array<{ stepName: string; timestamp: number }> = [];
|
||||
|
||||
|
||||
try {
|
||||
// Execute the task
|
||||
const result = await task.trigger();
|
||||
|
||||
// Execute the task through constraints
|
||||
const result = await this.triggerTaskConstrained(task);
|
||||
|
||||
// Collect execution report
|
||||
const report: ITaskExecutionReport = {
|
||||
taskName: task.name || 'unnamed',
|
||||
@@ -261,15 +438,15 @@ export class TaskManager {
|
||||
progress: task.getProgress(),
|
||||
result,
|
||||
};
|
||||
|
||||
|
||||
// Remove task from manager
|
||||
this.removeTask(task);
|
||||
|
||||
|
||||
// Deschedule if it was scheduled
|
||||
if (options?.schedule && task.name) {
|
||||
this.descheduleTaskByName(task.name);
|
||||
}
|
||||
|
||||
|
||||
return report;
|
||||
} catch (error) {
|
||||
// Create error report
|
||||
@@ -285,15 +462,15 @@ export class TaskManager {
|
||||
progress: task.getProgress(),
|
||||
error: error as Error,
|
||||
};
|
||||
|
||||
|
||||
// Remove task from manager even on error
|
||||
this.removeTask(task);
|
||||
|
||||
|
||||
// Deschedule if it was scheduled
|
||||
if (options?.schedule && task.name) {
|
||||
this.descheduleTaskByName(task.name);
|
||||
}
|
||||
|
||||
|
||||
throw errorReport;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
import * as plugins from './taskbuffer.plugins.js';
|
||||
|
||||
import { Task } from './taskbuffer.classes.task.js';
|
||||
import { logger } from './taskbuffer.logging.js';
|
||||
|
||||
export class TaskRunner {
|
||||
public maxParallelJobs: number = 1;
|
||||
public status: 'stopped' | 'running' = 'stopped';
|
||||
public runningTasks: plugins.lik.ObjectMap<Task> =
|
||||
new plugins.lik.ObjectMap<Task>();
|
||||
public queuedTasks: Task[] = [];
|
||||
|
||||
constructor() {
|
||||
this.runningTasks.eventSubject.subscribe(async (eventArg) => {
|
||||
this.checkExecution();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* adds a task to the queue
|
||||
*/
|
||||
public addTask(taskArg: Task) {
|
||||
this.queuedTasks.push(taskArg);
|
||||
this.checkExecution();
|
||||
}
|
||||
|
||||
/**
|
||||
* set amount of parallel tasks
|
||||
* be careful, you might lose dependability of tasks
|
||||
*/
|
||||
public setMaxParallelJobs(maxParallelJobsArg: number) {
|
||||
this.maxParallelJobs = maxParallelJobsArg;
|
||||
}
|
||||
|
||||
/**
|
||||
* starts the task queue
|
||||
*/
|
||||
public async start() {
|
||||
this.status = 'running';
|
||||
}
|
||||
|
||||
/**
|
||||
* checks whether execution is on point
|
||||
*/
|
||||
public async checkExecution() {
|
||||
if (
|
||||
this.runningTasks.getArray().length < this.maxParallelJobs &&
|
||||
this.status === 'running' &&
|
||||
this.queuedTasks.length > 0
|
||||
) {
|
||||
const nextJob = this.queuedTasks.shift();
|
||||
this.runningTasks.add(nextJob);
|
||||
try {
|
||||
await nextJob.trigger();
|
||||
} catch (err) {
|
||||
logger.log('error', `TaskRunner: task "${nextJob.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
this.runningTasks.remove(nextJob);
|
||||
this.checkExecution();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* stops the task queue
|
||||
*/
|
||||
public async stop() {
|
||||
this.status = 'stopped';
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,34 @@
|
||||
import type { ITaskStep } from './taskbuffer.classes.taskstep.js';
|
||||
import type { Task } from './taskbuffer.classes.task.js';
|
||||
|
||||
export interface IRateLimitConfig {
|
||||
maxPerWindow: number; // max completions allowed within the sliding window
|
||||
windowMs: number; // sliding window duration in ms
|
||||
}
|
||||
|
||||
export type TResultSharingMode = 'none' | 'share-latest';
|
||||
|
||||
export interface ITaskConstraintGroupOptions<TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
name: string;
|
||||
constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
|
||||
maxConcurrent?: number; // default: Infinity
|
||||
cooldownMs?: number; // default: 0
|
||||
shouldExecute?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
|
||||
rateLimit?: IRateLimitConfig;
|
||||
resultSharingMode?: TResultSharingMode; // default: 'none'
|
||||
}
|
||||
|
||||
export interface ITaskExecution<TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
task: Task<any, any, TData>;
|
||||
input: any;
|
||||
}
|
||||
|
||||
export interface IConstrainedTaskEntry {
|
||||
task: Task<any, any, any>;
|
||||
input: any;
|
||||
deferred: import('@push.rocks/smartpromise').Deferred<any>;
|
||||
constraintKeys: Map<string, string>; // groupName -> key
|
||||
}
|
||||
|
||||
export interface ITaskMetadata {
|
||||
name: string;
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/taskbuffer',
|
||||
version: '4.1.1',
|
||||
version: '6.1.2',
|
||||
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
|
||||
}
|
||||
|
||||
@@ -8,20 +8,20 @@ import type { TaskManager, ITaskMetadata, IScheduledTaskInfo } from '../ts/index
|
||||
export class TaskbufferDashboard extends DeesElement {
|
||||
// Properties
|
||||
@property({ type: Object })
|
||||
public taskManager: TaskManager | null = null;
|
||||
accessor taskManager: TaskManager | null = null;
|
||||
|
||||
@property({ type: Number })
|
||||
public refreshInterval: number = 1000; // milliseconds
|
||||
accessor refreshInterval: number = 1000; // milliseconds
|
||||
|
||||
// Internal state
|
||||
@state()
|
||||
private tasks: ITaskMetadata[] = [];
|
||||
accessor tasks: ITaskMetadata[] = [];
|
||||
|
||||
@state()
|
||||
private scheduledTasks: IScheduledTaskInfo[] = [];
|
||||
accessor scheduledTasks: IScheduledTaskInfo[] = [];
|
||||
|
||||
@state()
|
||||
private isRunning: boolean = false;
|
||||
accessor isRunning: boolean = false;
|
||||
|
||||
private refreshTimer: any;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user