Compare commits

..

26 Commits

Author SHA1 Message Date
587cfffbe4 v8.0.2 2026-03-23 14:59:59 +00:00
820811cca2 fix(servicemanager): cancel startup timeout once service initialization completes 2026-03-23 14:59:59 +00:00
e25271f9db v8.0.1 2026-03-23 14:40:17 +00:00
709f9fa894 fix(servicemanager): cancel shutdown timeouts after services stop 2026-03-23 14:40:17 +00:00
99430fbde4 v8.0.0 2026-03-21 10:57:27 +00:00
0f93e86cc1 BREAKING CHANGE(service): expand service lifecycle management with instance-aware hooks, startup timeouts, labels, readiness waits, and auto-restart support 2026-03-21 10:57:27 +00:00
0b78b05101 v7.0.0 2026-03-20 15:24:16 +00:00
e91e782113 feat(service): add Service and ServiceManager for component lifecycle management
Adds two new classes:
- Service: long-running component with start/stop lifecycle, health checks, builder pattern and subclass support
- ServiceManager: orchestrates multiple services with dependency-ordered startup, failure isolation, retry with backoff, and reverse-order shutdown
2026-03-20 15:24:12 +00:00
6e43e2ea68 v6.1.2 2026-02-15 23:14:45 +00:00
2a345f6514 fix(deps): bump @push.rocks/smarttime to ^4.2.3 2026-02-15 23:14:45 +00:00
b536dc8ba2 v6.1.1 2026-02-15 21:56:39 +00:00
6ca6cf6bc0 fix(tests): improve buffered task tests: add chain, concurrency and queue behavior tests 2026-02-15 21:56:39 +00:00
ed3bd99406 v6.1.0 2026-02-15 21:51:55 +00:00
3ab90d9895 feat(taskbuffer): add sliding-window rate limiting and result-sharing to TaskConstraintGroup and integrate with TaskManager 2026-02-15 21:51:55 +00:00
aee7236e5f v6.0.1 2026-02-15 21:08:38 +00:00
c89da9e2b0 fix(taskbuffer): no changes to commit 2026-02-15 21:08:38 +00:00
fae13bb944 v6.0.0 2026-02-15 15:15:37 +00:00
0811b04dfd BREAKING CHANGE(constraints): make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook 2026-02-15 15:15:37 +00:00
33d1c334c4 v5.0.1 2026-02-15 12:36:57 +00:00
b2c0553e30 fix(tests): add and tighten constraint-related tests covering return values, error propagation, concurrency, cooldown timing, and constraint removal 2026-02-15 12:36:57 +00:00
450b62fe5d v5.0.0 2026-02-15 12:20:01 +00:00
d3e8ff1a11 BREAKING CHANGE(taskbuffer): Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests. 2026-02-15 12:20:01 +00:00
9d78933a46 v4.2.1 2026-02-15 10:50:30 +00:00
28d9ad1746 fix(deps): bump @push.rocks/smartlog and @types/node; update dependency list version and license link in docs 2026-02-15 10:50:30 +00:00
28312972e0 v4.2.0 2026-01-29 15:13:34 +00:00
a0abcdda90 feat(ts_web): support TC39 accessor decorators for web components; bump dependencies and devDependencies; rename browser tests to .chromium.ts; move LICENSE to license.md and update readme 2026-01-29 15:13:34 +00:00
27 changed files with 7962 additions and 4827 deletions

View File

@@ -1,5 +1,99 @@
# Changelog
## 2026-03-23 - 8.0.2 - fix(servicemanager)
cancel startup timeout once service initialization completes
- Replaces the startup timeout race delay with a cancellable Timeout instance
- Prevents the global startup timeout from lingering after startup finishes or fails
## 2026-03-23 - 8.0.1 - fix(servicemanager)
cancel shutdown timeouts after services stop
- Replace the shutdown race delay with a cancellable Timeout in ServiceManager.
- Prevent timeout handlers from lingering after a service stops successfully during shutdown.
## 2026-03-20 - 8.0.0 - BREAKING CHANGE(service)
expand service lifecycle management with instance-aware hooks, startup timeouts, labels, readiness waits, and auto-restart support
- Change service stop and health check callbacks to receive the started instance and expose it via service.instance
- Add per-service and global startup timeout handling plus waitForState, waitForRunning, and waitForStopped readiness helpers
- Support service labels, label-based manager queries, and auto-restart lifecycle events with configurable backoff
## 2026-02-15 - 6.1.2 - fix(deps)
bump @push.rocks/smarttime to ^4.2.3
- Updated @push.rocks/smarttime from ^4.1.1 to ^4.2.3
- Non-breaking dependency version bump; increment patch version
## 2026-02-15 - 6.1.1 - fix(tests)
improve buffered task tests: add chain, concurrency and queue behavior tests
- Replace tools.delayFor with @push.rocks/smartdelay for more deterministic timing in tests
- Add tests for afterTask chaining, bufferMax concurrency, queued-run limits, and re-trigger behavior
- Rename tasks to descriptive names and fix afterTask chaining order to avoid circular references
- Change test runner invocation to export default tap.start() instead of calling tap.start() directly
## 2026-02-15 - 6.1.0 - feat(taskbuffer)
add sliding-window rate limiting and result-sharing to TaskConstraintGroup and integrate with TaskManager
- Added IRateLimitConfig and TResultSharingMode types and exported them from the public index
- TaskConstraintGroup: added rateLimit and resultSharingMode options, internal completion timestamp tracking, and last-result storage
- TaskConstraintGroup: new helpers - pruneCompletionTimestamps, getRateLimitDelay, getNextAvailableDelay, recordResult, getLastResult, hasResultSharing
- TaskConstraintGroup: rate-limit logic enforces maxPerWindow (counts running + completions) and composes with cooldown/maxConcurrent
- TaskManager: records successful task results to constraint groups and resolves queued entries immediately when a shared result exists
- TaskManager: queue drain now considers unified next-available delay (cooldown + rate limit) when scheduling retries
- Documentation updated: README and hints with usage examples for sliding-window rate limiting and result sharing
- Comprehensive tests added for rate limiting, concurrency interaction, and result-sharing behavior
## 2026-02-15 - 6.0.1 - fix(taskbuffer)
no changes to commit
- Git diff shows no changes
- package.json current version is 6.0.0; no version bump required
## 2026-02-15 - 6.0.0 - BREAKING CHANGE(constraints)
make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook
- Rename ITaskConstraintGroupOptions.constraintKeyForTask -> constraintKeyForExecution(task, input?) and update TaskConstraintGroup.getConstraintKey signature
- Add optional shouldExecute(task, input?) hook; TaskManager checks shouldExecute before immediate runs, after acquiring slots, and when draining the constraint queue (queued tasks are skipped when shouldExecute returns false)
- Export ITaskExecution type and store constraintKeys on queued entries (IConstrainedTaskEntry.constraintKeys)
- Documentation and tests updated to demonstrate input-aware constraint keys and shouldExecute pruning
## 2026-02-15 - 5.0.1 - fix(tests)
add and tighten constraint-related tests covering return values, error propagation, concurrency, cooldown timing, and constraint removal
- Tightened cooldown timing assertion from >=100ms to >=250ms to reflect 300ms cooldown with 50ms tolerance.
- Added tests for queued task return values, error propagation when catchErrors is false, and error swallowing behavior when catchErrors is true.
- Added concurrency and cooldown interaction tests to ensure maxConcurrent is respected and batch timing is correct.
- Added test verifying removing a constraint group unblocks queued tasks and drain behavior completes correctly.
## 2026-02-15 - 5.0.0 - BREAKING CHANGE(taskbuffer)
Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests.
- Add TaskConstraintGroup class with per-key maxConcurrent, cooldownMs, and helper methods (canRun, acquireSlot, releaseSlot, getCooldownRemaining, getRunningCount, reset).
- Task generic signature extended to Task<T, TSteps, TData> and a new typed data property (data) with default {}.
- TaskManager now supports addConstraintGroup/removeConstraintGroup, triggerTaskConstrained, queues blocked tasks, drains queue with cooldown timers, and routes triggerTask/triggerTaskByName through the constraint system.
- Removed TaskRunner, plus Task APIs: blockingTasks, execDelay, finished promise and associated behavior have been removed (breaking changes).
- Exports and interfaces updated: TaskConstraintGroup and ITaskConstraintGroupOptions added; TaskRunner removed from public API.
- Updated README and added comprehensive tests for constraint behavior; adjusted other tests to remove TaskRunner usage and reflect new APIs.
## 2026-02-15 - 4.2.1 - fix(deps)
bump @push.rocks/smartlog and @types/node; update dependency list version and license link in docs
- package.json: update @push.rocks/smartlog from ^3.1.10 to ^3.1.11
- package.json: update @types/node from ^25.1.0 to ^25.2.3
- readme.hints.md: update 'Dependencies (as of v4.1.1)' to 'Dependencies (as of v4.2.0)' and reflect bumped dependency versions
- readme.md: change license link text to '[LICENSE](./license.md)'
## 2026-01-29 - 4.2.0 - feat(ts_web)
support TC39 'accessor' decorators for web components; bump dependencies and devDependencies; rename browser tests to .chromium.ts; move LICENSE to license.md and update readme
- Convert web component class fields to use the TC39 'accessor' keyword in ts_web/taskbuffer-dashboard.ts to be compatible with @design.estate/dees-element v2.1.6
- Bump @design.estate/dees-element to ^2.1.6 and update devDependencies (@git.zone/tsbuild, @git.zone/tsbundle, @git.zone/tsrun, @git.zone/tstest, @types/node) to newer versions
- Replace test/test.10.webcomponent.browser.ts with test/test.10.webcomponent.chromium.ts and update testing guidance in readme.hints.md to prefer .chromium.ts
- Move LICENSE file content to license.md and update readme.md to reference the new license file
- Small test cleanups: remove obsolete tslint:disable comments
## 2026-01-26 - 4.1.1 - fix(ts_web)
fix web dashboard typings and update generated commit info

View File

View File

@@ -1,13 +1,13 @@
{
"name": "@push.rocks/taskbuffer",
"version": "4.1.1",
"version": "8.0.2",
"private": false,
"description": "A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"type": "module",
"scripts": {
"test": "(tstest test/ --verbose --logfile --timeout 120)",
"test": "(tstest test/ --verbose --logfile --timeout 300)",
"build": "(tsbuild tsfolders)",
"buildDocs": "tsdoc"
},
@@ -27,28 +27,28 @@
"debounced tasks",
"distributed coordination"
],
"author": "Lossless GmbH",
"author": "Task Venture Capital GmbH <hello@task.vc>",
"license": "MIT",
"bugs": {
"url": "https://code.foss.global/push.rocks/taskbuffer/issues"
},
"homepage": "https://code.foss.global/push.rocks/taskbuffer#readme",
"dependencies": {
"@design.estate/dees-element": "^2.1.3",
"@push.rocks/lik": "^6.2.2",
"@design.estate/dees-element": "^2.2.3",
"@push.rocks/lik": "^6.3.1",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartlog": "^3.1.10",
"@push.rocks/smartlog": "^3.2.1",
"@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smarttime": "^4.1.1",
"@push.rocks/smarttime": "^4.2.3",
"@push.rocks/smartunique": "^3.0.9"
},
"devDependencies": {
"@git.zone/tsbuild": "^3.1.2",
"@git.zone/tsbundle": "^2.6.3",
"@git.zone/tsrun": "^2.0.0",
"@git.zone/tstest": "^3.1.3",
"@types/node": "^24.10.1"
"@git.zone/tsbuild": "^4.3.0",
"@git.zone/tsbundle": "^2.9.1",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tstest": "^3.5.0",
"@types/node": "^25.5.0"
},
"files": [
"ts/**/*",

7544
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +1,61 @@
# Taskbuffer Hints
## Task Constraint System (v5.0.0+) — Breaking Changes
- **`TaskRunner` removed** — replaced by `TaskManager` + `TaskConstraintGroup`
- **`blockingTasks` removed** from `Task` — use `TaskConstraintGroup` with `maxConcurrent: 1`
- **`execDelay` removed** from `Task` — use `TaskConstraintGroup` with `cooldownMs`
- **`finished` promise removed** from `Task` — no longer needed
- **`Task` generic signature**: `Task<T, TSteps, TData>` (3rd param added for typed data)
### Task.data
- `Task` constructor accepts optional `data?: TData` (defaults to `{}`)
- Typed data bag accessible as `task.data`
### TaskConstraintGroup
- `new TaskConstraintGroup<TData>({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute?, rateLimit?, resultSharingMode? })`
- `constraintKeyForExecution(task, input?)` returns a string key (constraint applies) or `null` (skip). Receives both task and runtime input.
- `shouldExecute(task, input?)` — optional pre-execution check. Returns `false` to skip (deferred resolves `undefined`). Can be async.
- `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key
- `cooldownMs` (default: `0`) — minimum ms gap between completions per key
- `rateLimit` (optional) — `{ maxPerWindow: number, windowMs: number }` sliding window rate limiter. Counts both running + completed tasks in window.
- `resultSharingMode` (default: `'none'`) — `'none'` | `'share-latest'`. When `'share-latest'`, queued tasks for the same key resolve with the first task's result without executing.
- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRateLimitDelay(key)`, `getNextAvailableDelay(key)`, `getRunningCount(key)`, `recordResult(key, result)`, `getLastResult(key)`, `hasResultSharing()`, `reset()`
- `ITaskExecution<TData>` type exported from index — `{ task, input }` tuple
### Rate Limiting (v6.1.0+)
- Sliding window rate limiter: `rateLimit: { maxPerWindow: N, windowMs: ms }`
- Counts running + completed tasks against the window cap
- Per-key independence: saturating key A doesn't block key B
- Composable with `maxConcurrent` and `cooldownMs`
- `getNextAvailableDelay(key)` returns `Math.max(cooldownRemaining, rateLimitDelay)` — unified "how long until I can run" answer
- Drain timer auto-schedules based on shortest delay across all constraints
### Result Sharing (v6.1.0+)
- `resultSharingMode: 'share-latest'` — queued tasks for the same key get the first task's result without executing
- Only successful results are shared (errors from `catchErrors: true` or thrown errors are NOT shared)
- `shouldExecute` is NOT called for shared results (the task's purpose was already fulfilled)
- `lastResults` persists until `reset()` — for time-bounded sharing, use `shouldExecute` to control staleness
- Composable with rate limiting: rate-limited waiters get shared result without waiting for the window
### TaskManager Constraint Integration
- `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)`
- `triggerTaskByName()`, `triggerTask()`, `addExecuteRemoveTask()`, cron callbacks all route through `triggerTaskConstrained()`
- `triggerTaskConstrained(task, input?)` — evaluates constraints, queues if blocked, drains after completion
- Cooldown-blocked entries auto-drain via timer
### Exported from index.ts
- `TaskConstraintGroup` class
- `ITaskConstraintGroupOptions`, `IRateLimitConfig`, `TResultSharingMode` types
## Error Handling (v3.6.0+)
- `Task` now has `catchErrors` constructor option (default: `false`)
- Default behavior: `trigger()` rejects when taskFunction throws (breaking change from pre-3.6)
- Set `catchErrors: true` to swallow errors (old behavior) - returns `undefined` on error
- Error state tracked via `lastError?: Error`, `errorCount: number`, `clearError()`
- `getMetadata()` status uses all four values: `'idle'` | `'running'` | `'completed'` | `'failed'`
- All peripheral classes (Taskchain, Taskparallel, TaskRunner, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
- All peripheral classes (Taskchain, Taskparallel, BufferRunner, TaskDebounced, TaskManager) have proper error propagation/handling
- `console.log` calls replaced with `logger.log()` throughout
## Breaking API Rename (TaskRunner)
- `maxParrallelJobs``maxParallelJobs`
- `qeuedTasks``queuedTasks`
- JSDoc typos fixed: "qeue" → "queue", "wether" → "whether", "loose" → "lose"
- The `setMaxParallelJobs()` parameter also renamed from `maxParrallelJobsArg` to `maxParallelJobsArg`
## Error Context Improvements
- **TaskChain**: Errors now wrap the original with context: chain name, failing task name, and task index. Original error preserved via `.cause`
- **BufferRunner**: When `catchErrors: false`, buffered task errors now reject the trigger promise (via `CycleCounter.informOfCycleError`) instead of silently resolving with `undefined`
@@ -36,7 +77,62 @@
## Project Structure
- Source in `ts/`, web components in `ts_web/`
- Tests in `test/` - naming: `*.node.ts`, `*.browser.ts`, `*.both.ts`
- Tests in `test/` - naming: `*.node.ts`, `*.chromium.ts` (preferred), `*.both.ts`
- Note: `*.browser.ts` is deprecated, use `*.chromium.ts` for browser tests
- Logger: `ts/taskbuffer.logging.ts` exports `logger` (ConsoleLog from smartlog)
- Build: `pnpm build` (tsbuild tsfolders)
- Test: `pnpm test` or `tstest test/test.XX.name.ts --verbose`
## Web Components (ts_web/)
- Uses `@design.estate/dees-element` with TC39 decorators
- Decorators require the `accessor` keyword:
```typescript
@property({ type: String })
accessor myProp = 'default';
@state()
accessor count = 0;
```
## Service Lifecycle System (v7.0.0+)
### Service<T>
- `new Service<T>(name)` or `new Service<T>(options: IServiceOptions<T>)`
- Builder pattern: `.critical()`, `.optional()`, `.dependsOn(...)`, `.withStart(fn)`, `.withStop(fn)`, `.withHealthCheck(fn, config?)`, `.withRetry(config)`, `.withStartupTimeout(ms)`, `.withLabels(labels)`
- Subclass pattern: override `serviceStart()`, `serviceStop()`, `serviceHealthCheck()`
- State machine: `stopped` → `starting` → `running` → `degraded` → `failed` → `stopping`
- `service.instance` — stores the value returned from `start()` (e.g. a database pool)
- `withStop(fn)` and `withHealthCheck(fn)` receive the instance as argument: `(instance: T) => Promise<void>`
- `waitForState(target, timeoutMs?)`, `waitForRunning(timeoutMs?)`, `waitForStopped(timeoutMs?)` — programmatic readiness gates
- Per-service startup timeout via `withStartupTimeout(ms)` or `startupTimeoutMs` in options
- Labels: `setLabel`, `getLabel`, `removeLabel`, `hasLabel`, `withLabels`
- Health checks: configurable via `IHealthCheckConfig` with `intervalMs`, `timeoutMs`, `failuresBeforeDegraded`, `failuresBeforeFailed`
- Auto-restart on health failure: `autoRestart: true` in `IHealthCheckConfig` with `maxAutoRestarts`, `autoRestartDelayMs`, `autoRestartBackoffFactor`
- Events via `eventSubject`: `'started'`, `'stopped'`, `'failed'`, `'degraded'`, `'recovered'`, `'retrying'`, `'healthCheck'`, `'autoRestarting'`
### ServiceManager
- `manager.addService(service)` / `manager.addServiceFromOptions(options)` / `manager.removeService(name)`
- Dependency-ordered startup via topological sort (Kahn's algorithm), level-by-level parallel
- Critical service failure aborts startup with rollback; optional service failure continues
- Retry with exponential backoff + jitter
- Reverse-dependency-ordered shutdown with per-service timeout
- `restartService(name)` — cascade stops dependents, restarts target, restarts dependents
- `getHealth()` — aggregated `'healthy' | 'degraded' | 'unhealthy'` status
- `getServicesByLabel(key, value)` / `getServicesStatusByLabel(key, value)` — label-based queries
- Circular dependency detection
- Global startup timeout enforcement via `startupTimeoutMs` in `IServiceManagerOptions`
## Dependencies (as of v7.0.0)
- `@design.estate/dees-element` ^2.2.3 - TC39 decorators with `accessor` keyword
- `@push.rocks/lik` ^6.3.1 - Data structures
- `@push.rocks/smartdelay` ^3.0.5 - Delay utilities
- `@push.rocks/smartlog` ^3.2.1 - Logging
- `@push.rocks/smartpromise` ^4.2.3 - Promise utilities
- `@push.rocks/smartrx` ^3.0.10 - RxJS wrapper
- `@push.rocks/smarttime` ^4.2.3 - Time/cron utilities
- `@push.rocks/smartunique` ^3.0.9 - Unique ID generation
- `@git.zone/tsbuild` ^4.3.0 - Build tool
- `@git.zone/tsbundle` ^2.9.1 - Bundler (for browser tests)
- `@git.zone/tsrun` ^2.0.1 - TypeScript runner
- `@git.zone/tstest` ^3.5.0 - Test runner (supports `.chromium.ts` files)
- `@types/node` ^25.5.0 - Node.js type definitions

751
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/taskbuffer 🚀
> **Modern TypeScript task orchestration with smart buffering, scheduling, labels, and real-time event streaming**
> **Modern TypeScript task orchestration and service lifecycle management with constraint-based concurrency, smart buffering, scheduling, health checks, and real-time event streaming**
[![npm version](https://img.shields.io/npm/v/@push.rocks/taskbuffer.svg)](https://www.npmjs.com/package/@push.rocks/taskbuffer)
[![TypeScript](https://img.shields.io/badge/TypeScript-5.x-blue.svg)](https://www.typescriptlang.org/)
@@ -13,6 +13,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
## 🌟 Features
- **🎯 Type-Safe Task Management** — Full TypeScript support with generics and type inference
- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, cooldown enforcement, sliding-window rate limiting, and result sharing via `TaskConstraintGroup`
- **📊 Real-Time Progress Tracking** — Step-based progress with percentage weights
- **⚡ Smart Buffering** — Intelligent request debouncing and batching
- **⏰ Cron Scheduling** — Schedule tasks with cron expressions
@@ -20,6 +21,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
- **🏷️ Labels** — Attach arbitrary `Record<string, string>` metadata (userId, tenantId, etc.) for multi-tenant filtering
- **📡 Push-Based Events** — rxjs `Subject<ITaskEvent>` on every Task and TaskManager for real-time state change notifications
- **🛡️ Error Handling** — Configurable error propagation with `catchErrors`, error tracking, and clear error state
- **🩺 Service Lifecycle Management** — `Service` and `ServiceManager` for long-running components (databases, servers, queues) with health checks, auto-restart, dependency ordering, and instance access
- **🎨 Web Component Dashboard** — Built-in Lit-based dashboard for real-time task visualization
- **🌐 Distributed Coordination** — Abstract coordinator for multi-instance task deduplication
@@ -49,6 +51,24 @@ const result = await greetTask.trigger('World');
console.log(result); // "Hello, World!"
```
### Task with Typed Data 📦
Every task can carry a typed data bag — perfect for constraint matching, routing, and metadata:
```typescript
const task = new Task<undefined, [], { domain: string; priority: number }>({
name: 'update-dns',
data: { domain: 'example.com', priority: 1 },
taskFunction: async () => {
// task.data is fully typed here
console.log(`Updating DNS for ${task.data.domain}`);
},
});
task.data.domain; // string — fully typed
task.data.priority; // number — fully typed
```
### Task with Steps & Progress 📊
```typescript
@@ -84,6 +104,325 @@ console.log(deployTask.getStepsMetadata()); // Step details with status
> **Note:** `notifyStep()` is fully type-safe — TypeScript only accepts step names you declared in the `steps` array when you use `as const`.
## 🔒 Task Constraints — Concurrency, Mutual Exclusion & Cooldowns
`TaskConstraintGroup` is the unified mechanism for controlling how tasks run relative to each other. It replaces older patterns like task runners, blocking tasks, and execution delays with a single, composable, key-based constraint system.
### Per-Key Mutual Exclusion
Ensure only one task runs at a time for a given key (e.g. per domain, per tenant, per resource):
```typescript
import { Task, TaskManager, TaskConstraintGroup } from '@push.rocks/taskbuffer';
const manager = new TaskManager();
// Only one DNS update per domain at a time
const domainMutex = new TaskConstraintGroup<{ domain: string }>({
name: 'domain-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?) => task.data.domain,
});
manager.addConstraintGroup(domainMutex);
const task1 = new Task<undefined, [], { domain: string }>({
name: 'update-a.com',
data: { domain: 'a.com' },
taskFunction: async () => { /* update DNS for a.com */ },
});
const task2 = new Task<undefined, [], { domain: string }>({
name: 'update-a.com-2',
data: { domain: 'a.com' },
taskFunction: async () => { /* another update for a.com */ },
});
manager.addTask(task1);
manager.addTask(task2);
// task2 waits until task1 finishes (same domain key)
await Promise.all([
manager.triggerTask(task1),
manager.triggerTask(task2),
]);
```
### Group Concurrency Limits
Cap how many tasks can run concurrently across a group:
```typescript
// Max 3 DNS updaters running globally at once
const dnsLimit = new TaskConstraintGroup<{ group: string }>({
name: 'dns-concurrency',
maxConcurrent: 3,
constraintKeyForExecution: (task) =>
task.data.group === 'dns' ? 'dns' : null, // null = skip constraint
});
manager.addConstraintGroup(dnsLimit);
```
### Cooldowns (Rate Limiting)
Enforce a minimum time gap between consecutive executions for the same key:
```typescript
// No more than one API call per domain every 11 seconds
const rateLimiter = new TaskConstraintGroup<{ domain: string }>({
name: 'api-rate-limit',
maxConcurrent: 1,
cooldownMs: 11000,
constraintKeyForExecution: (task) => task.data.domain,
});
manager.addConstraintGroup(rateLimiter);
```
### Global Concurrency Cap
Limit total concurrent tasks system-wide:
```typescript
const globalCap = new TaskConstraintGroup({
name: 'global-cap',
maxConcurrent: 10,
constraintKeyForExecution: () => 'all', // same key = shared limit
});
manager.addConstraintGroup(globalCap);
```
### Composing Multiple Constraints
Multiple constraint groups stack — a task only runs when **all** applicable constraints allow it:
```typescript
manager.addConstraintGroup(globalCap); // max 10 globally
manager.addConstraintGroup(domainMutex); // max 1 per domain
manager.addConstraintGroup(rateLimiter); // 11s cooldown per domain
// A task must satisfy ALL three constraints before it starts
await manager.triggerTask(dnsTask);
```
### Selective Constraints
Return `null` from `constraintKeyForExecution` to exempt a task from a constraint group:
```typescript
const constraint = new TaskConstraintGroup<{ priority: string }>({
name: 'low-priority-limit',
maxConcurrent: 2,
constraintKeyForExecution: (task) =>
task.data.priority === 'low' ? 'low-priority' : null, // high priority tasks skip this constraint
});
```
### Input-Aware Constraints 🎯
The `constraintKeyForExecution` function receives both the **task** and the **runtime input** passed to `trigger(input)`. This means the same task triggered with different inputs can be constrained independently:
```typescript
const extractTLD = (domain: string) => {
const parts = domain.split('.');
return parts.slice(-2).join('.');
};
// Same TLD → serialized. Different TLDs → parallel.
const tldMutex = new TaskConstraintGroup({
name: 'tld-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
if (!input) return null;
return extractTLD(input); // "example.com", "other.org", etc.
},
});
manager.addConstraintGroup(tldMutex);
// These two serialize (same TLD "example.com")
const p1 = manager.triggerTaskConstrained(getCert, 'app.example.com');
const p2 = manager.triggerTaskConstrained(getCert, 'api.example.com');
// This runs in parallel (different TLD "other.org")
const p3 = manager.triggerTaskConstrained(getCert, 'my.other.org');
```
You can also combine `task.data` and `input` for composite keys:
```typescript
const providerDomain = new TaskConstraintGroup<{ provider: string }>({
name: 'provider-domain',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
return `${task.data.provider}:${input || 'default'}`;
},
});
```
### Pre-Execution Check with `shouldExecute` ✅
The `shouldExecute` callback runs right before a queued task executes. If it returns `false`, the task is skipped and its promise resolves with `undefined`. This is perfect for scenarios where a prior execution's outcome makes subsequent queued tasks unnecessary:
```typescript
const certCache = new Map<string, string>();
const certConstraint = new TaskConstraintGroup({
name: 'cert-mutex',
maxConcurrent: 1,
constraintKeyForExecution: (task, input?: string) => {
if (!input) return null;
return extractTLD(input);
},
shouldExecute: (task, input?: string) => {
if (!input) return true;
// Skip if a wildcard cert already covers this TLD
return certCache.get(extractTLD(input)) !== 'wildcard';
},
});
const getCert = new Task({
name: 'get-certificate',
taskFunction: async (domain: string) => {
const cert = await acme.getCert(domain);
if (cert.isWildcard) certCache.set(extractTLD(domain), 'wildcard');
return cert;
},
});
manager.addConstraintGroup(certConstraint);
manager.addTask(getCert);
const r1 = manager.triggerTaskConstrained(getCert, 'app.example.com'); // runs, gets wildcard
const r2 = manager.triggerTaskConstrained(getCert, 'api.example.com'); // queued → skipped!
const r3 = manager.triggerTaskConstrained(getCert, 'my.other.org'); // parallel (different TLD)
const [cert1, cert2, cert3] = await Promise.all([r1, r2, r3]);
// cert2 === undefined (skipped because wildcard already covers example.com)
```
**`shouldExecute` semantics:**
- Runs right before execution (after slot acquisition, before `trigger()`)
- Also checked on immediate (non-queued) triggers
- Returns `false` → skip execution, deferred resolves with `undefined`
- Can be async (return `Promise<boolean>`)
- Has closure access to external state modified by prior executions
- If multiple constraint groups have `shouldExecute`, **all** must return `true`
### Sliding Window Rate Limiting
Enforce "N completions per time window" with burst capability. Unlike `cooldownMs` (which forces even spacing between executions), `rateLimit` allows bursts up to the cap, then blocks until the window slides:
```typescript
// Let's Encrypt style: 300 new orders per 3 hours
const acmeRateLimit = new TaskConstraintGroup({
name: 'acme-rate',
constraintKeyForExecution: () => 'acme-account',
rateLimit: {
maxPerWindow: 300,
windowMs: 3 * 60 * 60 * 1000, // 3 hours
},
});
manager.addConstraintGroup(acmeRateLimit);
// All 300 can burst immediately. The 301st waits until the oldest
// completion falls out of the 3-hour window.
for (const domain of domains) {
manager.triggerTaskConstrained(certTask, { domain });
}
```
Compose multiple rate limits for layered protection:
```typescript
// Per-domain weekly cap AND global order rate
const perDomainWeekly = new TaskConstraintGroup({
name: 'per-domain-weekly',
constraintKeyForExecution: (task, input) => input.registeredDomain,
rateLimit: { maxPerWindow: 50, windowMs: 7 * 24 * 60 * 60 * 1000 },
});
const globalOrderRate = new TaskConstraintGroup({
name: 'global-order-rate',
constraintKeyForExecution: () => 'global',
rateLimit: { maxPerWindow: 300, windowMs: 3 * 60 * 60 * 1000 },
});
manager.addConstraintGroup(perDomainWeekly);
manager.addConstraintGroup(globalOrderRate);
```
Combine with `maxConcurrent` and `cooldownMs` for fine-grained control:
```typescript
const throttled = new TaskConstraintGroup({
name: 'acme-throttle',
constraintKeyForExecution: () => 'acme',
maxConcurrent: 5, // max 5 concurrent requests
cooldownMs: 1000, // 1s gap after each completion
rateLimit: {
maxPerWindow: 300,
windowMs: 3 * 60 * 60 * 1000,
},
});
```
### Result Sharing — Deduplication for Concurrent Requests
When multiple callers request the same resource concurrently, `resultSharingMode: 'share-latest'` ensures only one execution occurs. All queued waiters receive the same result:
```typescript
const certMutex = new TaskConstraintGroup({
name: 'cert-per-tld',
constraintKeyForExecution: (task, input) => extractTld(input.domain),
maxConcurrent: 1,
resultSharingMode: 'share-latest',
});
manager.addConstraintGroup(certMutex);
const certTask = new Task({
name: 'obtain-cert',
taskFunction: async (input) => {
return await acmeClient.obtainWildcard(input.domain);
},
});
manager.addTask(certTask);
// Three requests for *.example.com arrive simultaneously
const [cert1, cert2, cert3] = await Promise.all([
manager.triggerTaskConstrained(certTask, { domain: 'api.example.com' }),
manager.triggerTaskConstrained(certTask, { domain: 'www.example.com' }),
manager.triggerTaskConstrained(certTask, { domain: 'mail.example.com' }),
]);
// Only ONE ACME request was made.
// cert1 === cert2 === cert3 — all callers got the same cert object.
```
**Result sharing semantics:**
- `shouldExecute` is NOT called for shared results (the task's purpose was already fulfilled)
- Error results are NOT shared — queued tasks execute independently after a failure
- `lastResults` persists until `reset()` — for time-bounded sharing, use `shouldExecute` to control staleness
- Composable with rate limiting: rate-limited waiters get shared results without waiting for the window
### How It Works
When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager:
1. Evaluates all registered constraint groups against the task and input
2. If no constraints apply (all matchers return `null`) → checks `shouldExecute` → runs or skips
3. If all applicable constraints have capacity → acquires slots → checks `shouldExecute` → runs or skips
4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained
5. Cooldown/rate-limit-blocked tasks auto-retry after the shortest remaining delay expires
6. Queued tasks check for shared results first (if any group has `resultSharingMode: 'share-latest'`)
7. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned
## 🎯 Core Concepts
### Task Buffering — Intelligent Request Management
@@ -95,7 +434,6 @@ const apiTask = new Task({
name: 'APIRequest',
buffered: true,
bufferMax: 5, // Maximum 5 concurrent executions
execDelay: 100, // Minimum 100ms between executions
taskFunction: async (endpoint) => {
return await fetch(endpoint).then((r) => r.json());
},
@@ -156,9 +494,9 @@ console.log(`Saved ${savedCount} items`);
Taskchain also supports dynamic mutation:
```typescript
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
```
Error context is rich — a chain failure includes the chain name, failing task name, task index, and preserves the original error as `.cause`.
@@ -220,27 +558,6 @@ await initTask.trigger(); // No-op
console.log(initTask.hasTriggered); // true
```
### TaskRunner — Managed Queue with Concurrency Control
Process a queue of tasks with a configurable parallelism limit:
```typescript
import { TaskRunner } from '@push.rocks/taskbuffer';
const runner = new TaskRunner();
runner.setMaxParallelJobs(3); // Run up to 3 tasks concurrently
await runner.start();
runner.addTask(taskA);
runner.addTask(taskB);
runner.addTask(taskC);
runner.addTask(taskD); // Queued until a slot opens
// When done:
await runner.stop();
```
## 🏷️ Labels — Multi-Tenant Task Filtering
Attach arbitrary key-value labels to any task for filtering, grouping, or multi-tenant isolation:
@@ -411,6 +728,10 @@ manager.addTask(deployTask);
manager.addAndScheduleTask(backupTask, '0 2 * * *'); // Daily at 2 AM
manager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes
// Register constraint groups
manager.addConstraintGroup(globalCap);
manager.addConstraintGroup(perDomainMutex);
// Query metadata
const meta = manager.getTaskMetadata('Deploy');
console.log(meta);
@@ -433,7 +754,7 @@ const allMeta = manager.getAllTasksMetadata();
const scheduled = manager.getScheduledTasks();
const nextRuns = manager.getNextScheduledRuns(5);
// Trigger by name
// Trigger by name (routes through constraints)
await manager.triggerTaskByName('Deploy');
// One-shot: add, execute, collect report, remove
@@ -462,6 +783,202 @@ manager.removeTask(task); // Removes from map and unsubscribes event forwarding
manager.descheduleTaskByName('Deploy'); // Remove cron schedule only
```
### Remove Constraint Groups
```typescript
manager.removeConstraintGroup('domain-mutex'); // By name
```
## 🩺 Service Lifecycle Management
For long-running components like database connections, HTTP servers, and message queues, taskbuffer provides `Service` and `ServiceManager` — a complete lifecycle management system with health checks, dependency ordering, retry, auto-restart, and typed instance access.
### Basic Service — Builder Pattern
```typescript
import { Service, ServiceManager } from '@push.rocks/taskbuffer';
const dbService = new Service<DatabasePool>('Database')
.critical()
.withStart(async () => {
const pool = new DatabasePool({ host: 'localhost', port: 5432 });
await pool.connect();
return pool; // stored as service.instance
})
.withStop(async (pool) => {
await pool.disconnect(); // receives the instance from start
})
.withHealthCheck(async (pool) => {
return await pool.ping(); // receives the instance too
});
await dbService.start();
dbService.instance!.query('SELECT 1'); // typed access to the pool
await dbService.stop();
```
The `start()` return value is stored as `service.instance` and automatically passed to `stop()` and `healthCheck()` functions — no need for external closures or shared variables.
### Service with Dependencies & Health Checks
```typescript
const cacheService = new Service('Redis')
.optional()
.withStart(async () => new RedisClient())
.withStop(async (client) => client.quit())
.withHealthCheck(async (client) => client.isReady, {
intervalMs: 10000, // check every 10s
timeoutMs: 3000, // 3s timeout per check
failuresBeforeDegraded: 3, // 3 consecutive failures → 'degraded'
failuresBeforeFailed: 5, // 5 consecutive failures → 'failed'
autoRestart: true, // auto-restart when failed
maxAutoRestarts: 5, // give up after 5 restart attempts
autoRestartDelayMs: 2000, // start with 2s delay
autoRestartBackoffFactor: 2, // double delay each attempt
});
const apiService = new Service('API')
.critical()
.dependsOn('Database', 'Redis')
.withStart(async () => {
const server = createServer();
await server.listen(3000);
return server;
})
.withStop(async (server) => server.close())
.withStartupTimeout(10000); // fail if start takes > 10s
```
### ServiceManager — Orchestration
`ServiceManager` handles dependency-ordered startup, failure isolation, and aggregated health reporting:
```typescript
const manager = new ServiceManager({
name: 'MyApp',
startupTimeoutMs: 60000, // global startup timeout
shutdownTimeoutMs: 15000, // per-service shutdown timeout
defaultRetry: { maxRetries: 3, baseDelayMs: 1000, backoffFactor: 2 },
});
manager.addService(dbService);
manager.addService(cacheService);
manager.addService(apiService);
await manager.start();
// ✅ Starts Database first, then Redis (parallel with DB since independent),
// then API (after both deps are running)
// ❌ If Database (critical) fails → rollback, stop everything, throw
// ⚠️ If Redis (optional) fails → log warning, continue, health = 'degraded'
// Health aggregation
const health = manager.getHealth();
// { overall: 'healthy', services: [...], startedAt: 1706284800000, uptime: 42000 }
// Cascade restart — stops dependents first, restarts target, then restarts dependents
await manager.restartService('Database');
// Graceful reverse-order shutdown
await manager.stop();
```
### Subclass Pattern
For complex services, extend `Service` and override the lifecycle hooks:
```typescript
class PostgresService extends Service<Pool> {
constructor(private config: PoolConfig) {
super('Postgres');
this.critical();
}
protected async serviceStart(): Promise<Pool> {
const pool = new Pool(this.config);
await pool.connect();
return pool;
}
protected async serviceStop(): Promise<void> {
await this.instance?.end();
}
protected async serviceHealthCheck(): Promise<boolean> {
const result = await this.instance?.query('SELECT 1');
return result?.rows.length === 1;
}
}
```
### Waiting for Service Readiness
Programmatically wait for a service to reach a specific state:
```typescript
// Wait for the service to be running (with timeout)
await dbService.waitForRunning(10000);
// Wait for any state
await service.waitForState(['running', 'degraded'], 5000);
// Wait for shutdown
await service.waitForStopped();
```
### Service Labels
Tag services with metadata for filtering and grouping:
```typescript
const service = new Service('Redis')
.withLabels({ type: 'cache', env: 'production', region: 'eu-west' })
.withStart(async () => new RedisClient())
.withStop(async (client) => client.quit());
// Query by label in ServiceManager
const caches = manager.getServicesByLabel('type', 'cache');
const prodStatuses = manager.getServicesStatusByLabel('env', 'production');
```
### Service Events
Every `Service` emits events via an rxjs `Subject<IServiceEvent>`:
```typescript
service.eventSubject.subscribe((event) => {
console.log(`[${event.type}] ${event.serviceName}${event.state}`);
});
// [started] Database → running
// [healthCheck] Database → running
// [degraded] Database → degraded
// [autoRestarting] Database → failed
// [started] Database → running
// [recovered] Database → running
// [stopped] Database → stopped
```
| Event Type | When |
| --- | --- |
| `'started'` | Service started successfully |
| `'stopped'` | Service stopped |
| `'failed'` | Service start failed or health check threshold exceeded |
| `'degraded'` | Health check failures exceeded `failuresBeforeDegraded` |
| `'recovered'` | Health check succeeded while in degraded state |
| `'retrying'` | ServiceManager retrying a failed start attempt |
| `'healthCheck'` | Health check completed (success or failure) |
| `'autoRestarting'` | Auto-restart scheduled after health check failure |
`ServiceManager.serviceSubject` aggregates events from all registered services.
### Service State Machine
```
stopped → starting → running → degraded → failed
↑ ↓ ↓ ↓
└── stopping ←───────────────────┴─────────┘
(auto-restart)
```
## 🎨 Web Component Dashboard
Visualize your tasks in real-time with the included Lit-based web component:
@@ -570,32 +1087,6 @@ await task.trigger('SELECT * FROM users'); // Setup runs here
await task.trigger('SELECT * FROM orders'); // Setup skipped, pool reused
```
### Blocking Tasks
Make one task wait for another to finish before executing:
```typescript
const initTask = new Task({
name: 'Init',
taskFunction: async () => {
await initializeSystem();
},
});
const workerTask = new Task({
name: 'Worker',
taskFunction: async () => {
await doWork();
},
});
workerTask.blockingTasks.push(initTask);
// Triggering worker will automatically wait for init to complete
initTask.trigger();
workerTask.trigger(); // Waits until initTask.finished resolves
```
### Database Migration Pipeline
```typescript
@@ -616,15 +1107,24 @@ try {
### Multi-Tenant SaaS Monitoring
Combine labels + events for a real-time multi-tenant dashboard:
Combine labels + events + constraints for a real-time multi-tenant system:
```typescript
const manager = new TaskManager();
// Per-tenant concurrency limit
const tenantLimit = new TaskConstraintGroup<{ tenantId: string }>({
name: 'tenant-concurrency',
maxConcurrent: 2,
constraintKeyForExecution: (task, input?) => task.data.tenantId,
});
manager.addConstraintGroup(tenantLimit);
// Create tenant-scoped tasks
function createTenantTask(tenantId: string, taskName: string, fn: () => Promise<any>) {
const task = new Task({
const task = new Task<undefined, [], { tenantId: string }>({
name: `${tenantId}:${taskName}`,
data: { tenantId },
labels: { tenantId },
taskFunction: fn,
});
@@ -653,14 +1153,32 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| Class | Description |
| --- | --- |
| `Task<T, TSteps>` | Core task unit with optional step tracking, labels, and event streaming |
| `TaskManager` | Centralized orchestrator with scheduling, label queries, and aggregated events |
| `Task<T, TSteps, TData>` | Core task unit with typed data, optional step tracking, labels, and event streaming |
| `TaskManager` | Centralized orchestrator with constraint groups, scheduling, label queries, and aggregated events |
| `TaskConstraintGroup<TData>` | Concurrency, mutual exclusion, and cooldown constraints with key-based grouping |
| `Taskchain` | Sequential task executor with data flow between tasks |
| `Taskparallel` | Concurrent task executor via `Promise.all()` |
| `TaskOnce` | Single-execution guard |
| `TaskDebounced` | Debounced task using rxjs |
| `TaskRunner` | Sequential queue with configurable parallelism |
| `TaskStep` | Step tracking unit (internal, exposed via metadata) |
| `Service<T>` | Long-running component with start/stop lifecycle, health checks, auto-restart, and typed instance access |
| `ServiceManager` | Service orchestrator with dependency ordering, failure isolation, retry, and health aggregation |
### Task Constructor Options
| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `taskFunction` | `ITaskFunction<T>` | *required* | The async function to execute |
| `name` | `string` | — | Task identifier (required for TaskManager) |
| `data` | `TData` | `{}` | Typed data bag for constraint matching and routing |
| `steps` | `ReadonlyArray<{name, description, percentage}>` | — | Step definitions for progress tracking |
| `buffered` | `boolean` | — | Enable request buffering |
| `bufferMax` | `number` | — | Max buffered calls |
| `preTask` | `Task \| () => Task` | — | Task to run before |
| `afterTask` | `Task \| () => Task` | — | Task to run after |
| `taskSetup` | `() => Promise<T>` | — | One-time setup function |
| `catchErrors` | `boolean` | `false` | Swallow errors instead of rejecting |
| `labels` | `Record<string, string>` | `{}` | Initial labels |
### Task Methods
@@ -682,6 +1200,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| Property | Type | Description |
| --- | --- | --- |
| `name` | `string` | Task identifier |
| `data` | `TData` | Typed data bag |
| `running` | `boolean` | Whether the task is currently executing |
| `idle` | `boolean` | Inverse of `running` |
| `labels` | `Record<string, string>` | Attached labels |
@@ -690,7 +1209,36 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| `errorCount` | `number` | Total error count across all runs |
| `runCount` | `number` | Total execution count |
| `lastRun` | `Date \| undefined` | Timestamp of last execution |
| `blockingTasks` | `Task[]` | Tasks that must finish before this one starts |
### TaskConstraintGroup Constructor Options
| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `name` | `string` | *required* | Constraint group identifier |
| `constraintKeyForExecution` | `(task, input?) => string \| null` | *required* | Returns key for grouping, or `null` to skip. Receives both the task and runtime input. |
| `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key |
| `cooldownMs` | `number` | `0` | Minimum ms between completions per key |
| `shouldExecute` | `(task, input?) => boolean \| Promise<boolean>` | — | Pre-execution check. Return `false` to skip; deferred resolves `undefined`. |
| `rateLimit` | `IRateLimitConfig` | — | Sliding window: `{ maxPerWindow, windowMs }`. Counts running + completed tasks. |
| `resultSharingMode` | `TResultSharingMode` | `'none'` | `'none'` or `'share-latest'`. Queued tasks get first task's result without executing. |
### TaskConstraintGroup Methods
| Method | Returns | Description |
| --- | --- | --- |
| `getConstraintKey(task, input?)` | `string \| null` | Get the constraint key for a task + input |
| `checkShouldExecute(task, input?)` | `Promise<boolean>` | Run the `shouldExecute` callback (defaults to `true`) |
| `canRun(key)` | `boolean` | Check if a slot is available (considers concurrency, cooldown, and rate limit) |
| `acquireSlot(key)` | `void` | Claim a running slot |
| `releaseSlot(key)` | `void` | Release a slot and record completion time + rate-limit timestamp |
| `getCooldownRemaining(key)` | `number` | Milliseconds until cooldown expires |
| `getRateLimitDelay(key)` | `number` | Milliseconds until a rate-limit slot opens |
| `getNextAvailableDelay(key)` | `number` | Max of cooldown + rate-limit delay — unified "when can I run" |
| `getRunningCount(key)` | `number` | Current running count for key |
| `recordResult(key, result)` | `void` | Store result for sharing (no-op if mode is `'none'`) |
| `getLastResult(key)` | `{result, timestamp} \| undefined` | Get last shared result for key |
| `hasResultSharing()` | `boolean` | Whether result sharing is enabled |
| `reset()` | `void` | Clear all state (running counts, cooldowns, rate-limit timestamps, shared results) |
### TaskManager Methods
@@ -699,7 +1247,11 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| `addTask(task)` | `void` | Register a task (wires event forwarding) |
| `removeTask(task)` | `void` | Remove task and unsubscribe events |
| `getTaskByName(name)` | `Task \| undefined` | Look up by name |
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name |
| `triggerTaskByName(name)` | `Promise<any>` | Trigger by name (routes through constraints) |
| `triggerTask(task)` | `Promise<any>` | Trigger directly (routes through constraints) |
| `triggerTaskConstrained(task, input?)` | `Promise<any>` | Core constraint evaluation entry point |
| `addConstraintGroup(group)` | `void` | Register a constraint group |
| `removeConstraintGroup(name)` | `void` | Remove a constraint group by name |
| `addAndScheduleTask(task, cron)` | `void` | Register + schedule |
| `scheduleTaskByName(name, cron)` | `void` | Schedule existing task |
| `descheduleTaskByName(name)` | `void` | Remove schedule |
@@ -719,19 +1271,98 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
| --- | --- | --- |
| `taskSubject` | `Subject<ITaskEvent>` | Aggregated events from all added tasks |
| `taskMap` | `ObjectMap<Task>` | Internal task registry |
| `constraintGroups` | `TaskConstraintGroup[]` | Registered constraint groups |
### Service Builder Methods
| Method | Returns | Description |
| --- | --- | --- |
| `critical()` | `this` | Mark as critical (startup failure aborts ServiceManager) |
| `optional()` | `this` | Mark as optional (startup failure is tolerated) |
| `dependsOn(...names)` | `this` | Declare dependencies by service name |
| `withStart(fn)` | `this` | Set start function: `() => Promise<T>` |
| `withStop(fn)` | `this` | Set stop function: `(instance: T) => Promise<void>` |
| `withHealthCheck(fn, config?)` | `this` | Set health check: `(instance: T) => Promise<boolean>` |
| `withRetry(config)` | `this` | Set retry config: `{ maxRetries, baseDelayMs, maxDelayMs, backoffFactor }` |
| `withStartupTimeout(ms)` | `this` | Per-service startup timeout |
| `withLabels(labels)` | `this` | Attach key-value labels |
### Service Methods
| Method | Returns | Description |
| --- | --- | --- |
| `start()` | `Promise<T>` | Start the service (no-op if already running) |
| `stop()` | `Promise<void>` | Stop the service (no-op if already stopped) |
| `checkHealth()` | `Promise<boolean \| undefined>` | Run health check manually |
| `waitForState(target, timeoutMs?)` | `Promise<void>` | Wait for service to reach a state |
| `waitForRunning(timeoutMs?)` | `Promise<void>` | Wait for `'running'` state |
| `waitForStopped(timeoutMs?)` | `Promise<void>` | Wait for `'stopped'` state |
| `getStatus()` | `IServiceStatus` | Full status snapshot |
| `setLabel(key, value)` | `void` | Set a label |
| `getLabel(key)` | `string \| undefined` | Get a label value |
| `removeLabel(key)` | `boolean` | Remove a label |
| `hasLabel(key, value?)` | `boolean` | Check label existence / value |
### Service Properties
| Property | Type | Description |
| --- | --- | --- |
| `name` | `string` | Service identifier |
| `state` | `TServiceState` | Current state (`stopped`, `starting`, `running`, `degraded`, `failed`, `stopping`) |
| `instance` | `T \| undefined` | The value returned from `start()` |
| `criticality` | `TServiceCriticality` | `'critical'` or `'optional'` |
| `dependencies` | `string[]` | Dependency names |
| `labels` | `Record<string, string>` | Attached labels |
| `eventSubject` | `Subject<IServiceEvent>` | rxjs Subject emitting lifecycle events |
| `errorCount` | `number` | Total error count |
| `retryCount` | `number` | Retry attempts during last startup |
### ServiceManager Methods
| Method | Returns | Description |
| --- | --- | --- |
| `addService(service)` | `void` | Register a service |
| `addServiceFromOptions(options)` | `Service<T>` | Create and register from options |
| `removeService(name)` | `void` | Remove service (throws if others depend on it) |
| `start()` | `Promise<void>` | Start all services in dependency order |
| `stop()` | `Promise<void>` | Stop all services in reverse order |
| `restartService(name)` | `Promise<void>` | Cascade restart with dependents |
| `getService(name)` | `Service \| undefined` | Look up by name |
| `getServiceStatus(name)` | `IServiceStatus \| undefined` | Single service status |
| `getAllStatuses()` | `IServiceStatus[]` | All service statuses |
| `getHealth()` | `IServiceManagerHealth` | Aggregated health report |
| `getServicesByLabel(key, value)` | `Service[]` | Filter services by label |
| `getServicesStatusByLabel(key, value)` | `IServiceStatus[]` | Filter statuses by label |
### Exported Types
```typescript
import type {
// Task types
ITaskMetadata,
ITaskExecutionReport,
ITaskExecution,
IScheduledTaskInfo,
ITaskEvent,
TTaskEventType,
ITaskStep,
ITaskFunction,
ITaskConstraintGroupOptions,
IRateLimitConfig,
TResultSharingMode,
StepNames,
// Service types
IServiceOptions,
IServiceStatus,
IServiceEvent,
IServiceManagerOptions,
IServiceManagerHealth,
IRetryConfig,
IHealthCheckConfig,
TServiceState,
TServiceCriticality,
TServiceEventType,
TOverallHealth,
} from '@push.rocks/taskbuffer';
```

View File

@@ -137,38 +137,7 @@ tap.test('should reject Taskparallel when a child task throws', async () => {
expect(didReject).toBeTrue();
});
// Test 7: TaskRunner continues processing after a task error
tap.test('should continue TaskRunner queue after a task error', async () => {
const runner = new taskbuffer.TaskRunner();
const executionOrder: string[] = [];
const badTask = new taskbuffer.Task({
name: 'runner-bad-task',
taskFunction: async () => {
executionOrder.push('bad');
throw new Error('runner task failure');
},
});
const goodTask = new taskbuffer.Task({
name: 'runner-good-task',
taskFunction: async () => {
executionOrder.push('good');
},
});
await runner.start();
runner.addTask(badTask);
runner.addTask(goodTask);
// Wait for both tasks to be processed
await smartdelay.delayFor(500);
await runner.stop();
expect(executionOrder).toContain('bad');
expect(executionOrder).toContain('good');
});
// Test 8: BufferRunner handles errors without hanging
// Test 7: BufferRunner handles errors without hanging
tap.test('should handle BufferRunner errors without hanging', async () => {
let callCount = 0;
const bufferedTask = new taskbuffer.Task({

1435
test/test.13.constraints.ts Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,894 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay';
// ── Test 1: Basic service start/stop lifecycle ─────────────
tap.test('should start and stop a simple service', async () => {
let started = false;
let stopped = false;
const service = new taskbuffer.Service('TestService')
.withStart(async () => { started = true; })
.withStop(async () => { stopped = true; });
expect(service.state).toEqual('stopped');
await service.start();
expect(service.state).toEqual('running');
expect(started).toBeTrue();
await service.stop();
expect(service.state).toEqual('stopped');
expect(stopped).toBeTrue();
});
// ── Test 2: Builder pattern chaining ───────────────────────
tap.test('should support builder pattern chaining', async () => {
const service = new taskbuffer.Service('ChainedService')
.critical()
.dependsOn('Dep1', 'Dep2')
.withStart(async () => 'result')
.withStop(async () => {})
.withRetry({ maxRetries: 5, baseDelayMs: 100 });
expect(service.criticality).toEqual('critical');
expect(service.dependencies).toEqual(['Dep1', 'Dep2']);
expect(service.retryConfig).toBeTruthy();
expect(service.retryConfig!.maxRetries).toEqual(5);
});
// ── Test 3: Subclass pattern ───────────────────────────────
tap.test('should support subclass pattern', async () => {
class MyService extends taskbuffer.Service<string> {
public startCalled = false;
public stopCalled = false;
constructor() {
super('MySubclassService');
this.optional();
}
protected async serviceStart(): Promise<string> {
this.startCalled = true;
return 'hello';
}
protected async serviceStop(): Promise<void> {
this.stopCalled = true;
}
}
const service = new MyService();
const result = await service.start();
expect(service.startCalled).toBeTrue();
expect(service.state).toEqual('running');
await service.stop();
expect(service.stopCalled).toBeTrue();
expect(service.state).toEqual('stopped');
});
// ── Test 4: Constructor with options object ────────────────
tap.test('should accept options object in constructor', async () => {
let started = false;
const service = new taskbuffer.Service({
name: 'OptionsService',
criticality: 'critical',
dependencies: ['A', 'B'],
start: async () => { started = true; },
stop: async () => {},
retry: { maxRetries: 10 },
});
expect(service.name).toEqual('OptionsService');
expect(service.criticality).toEqual('critical');
expect(service.dependencies).toEqual(['A', 'B']);
await service.start();
expect(started).toBeTrue();
await service.stop();
});
// ── Test 5: ServiceManager dependency ordering ─────────────
tap.test('should start services in dependency order', async () => {
const order: string[] = [];
const manager = new taskbuffer.ServiceManager({ name: 'TestManager' });
manager.addService(
new taskbuffer.Service('C')
.dependsOn('B')
.withStart(async () => { order.push('C'); })
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('A')
.withStart(async () => { order.push('A'); })
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('B')
.dependsOn('A')
.withStart(async () => { order.push('B'); })
.withStop(async () => {}),
);
await manager.start();
// A must come before B, B must come before C
expect(order.indexOf('A')).toBeLessThan(order.indexOf('B'));
expect(order.indexOf('B')).toBeLessThan(order.indexOf('C'));
await manager.stop();
});
// ── Test 6: Critical service failure aborts startup ────────
tap.test('should abort startup when a critical service fails', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'CriticalTest' });
let serviceCStopped = false;
manager.addService(
new taskbuffer.Service('Working')
.critical()
.withStart(async () => {})
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('Broken')
.critical()
.withStart(async () => { throw new Error('boom'); })
.withStop(async () => {})
.withRetry({ maxRetries: 0 }),
);
manager.addService(
new taskbuffer.Service('AfterBroken')
.dependsOn('Broken')
.withStart(async () => { serviceCStopped = true; })
.withStop(async () => {}),
);
let caught = false;
try {
await manager.start();
} catch (err) {
caught = true;
expect((err as Error).message).toInclude('Broken');
}
expect(caught).toBeTrue();
// AfterBroken should never have started because Broken (its dep) failed
expect(serviceCStopped).toBeFalse();
});
// ── Test 7: Optional service failure continues startup ─────
tap.test('should continue startup when an optional service fails', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'OptionalTest' });
let criticalStarted = false;
manager.addService(
new taskbuffer.Service('Critical')
.critical()
.withStart(async () => { criticalStarted = true; })
.withStop(async () => {})
.withRetry({ maxRetries: 0 }),
);
manager.addService(
new taskbuffer.Service('Optional')
.optional()
.withStart(async () => { throw new Error('oops'); })
.withStop(async () => {})
.withRetry({ maxRetries: 0 }),
);
// Should NOT throw
await manager.start();
expect(criticalStarted).toBeTrue();
const health = manager.getHealth();
expect(health.overall).toEqual('degraded');
const optionalStatus = manager.getServiceStatus('Optional');
expect(optionalStatus!.state).toEqual('failed');
await manager.stop();
});
// ── Test 8: Retry with backoff for optional services ───────
tap.test('should retry failed optional services', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'RetryTest' });
let attempts = 0;
manager.addService(
new taskbuffer.Service('Flaky')
.optional()
.withStart(async () => {
attempts++;
if (attempts < 3) {
throw new Error(`attempt ${attempts} failed`);
}
})
.withStop(async () => {})
.withRetry({ maxRetries: 5, baseDelayMs: 50, maxDelayMs: 100, backoffFactor: 1 }),
);
await manager.start();
expect(attempts).toEqual(3);
const status = manager.getServiceStatus('Flaky');
expect(status!.state).toEqual('running');
await manager.stop();
});
// ── Test 9: Reverse-order shutdown ─────────────────────────
tap.test('should stop services in reverse dependency order', async () => {
const order: string[] = [];
const manager = new taskbuffer.ServiceManager({ name: 'ShutdownTest' });
manager.addService(
new taskbuffer.Service('Base')
.withStart(async () => {})
.withStop(async () => { order.push('Base'); }),
);
manager.addService(
new taskbuffer.Service('Middle')
.dependsOn('Base')
.withStart(async () => {})
.withStop(async () => { order.push('Middle'); }),
);
manager.addService(
new taskbuffer.Service('Top')
.dependsOn('Middle')
.withStart(async () => {})
.withStop(async () => { order.push('Top'); }),
);
await manager.start();
await manager.stop();
// Top should stop before Middle, Middle before Base
expect(order.indexOf('Top')).toBeLessThan(order.indexOf('Middle'));
expect(order.indexOf('Middle')).toBeLessThan(order.indexOf('Base'));
});
// ── Test 10: Circular dependency detection ─────────────────
tap.test('should throw on circular dependency', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'CycleTest' });
manager.addService(
new taskbuffer.Service('A')
.dependsOn('B')
.withStart(async () => {})
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('B')
.dependsOn('A')
.withStart(async () => {})
.withStop(async () => {}),
);
let caught = false;
try {
await manager.start();
} catch (err) {
caught = true;
expect((err as Error).message).toInclude('Circular dependency');
}
expect(caught).toBeTrue();
});
// ── Test 11: restartService cascades to dependents ─────────
tap.test('should restart service and its dependents', async () => {
const startOrder: string[] = [];
const stopOrder: string[] = [];
const manager = new taskbuffer.ServiceManager({ name: 'RestartTest' });
manager.addService(
new taskbuffer.Service('Base')
.withStart(async () => { startOrder.push('Base'); })
.withStop(async () => { stopOrder.push('Base'); })
.withRetry({ maxRetries: 0 }),
);
manager.addService(
new taskbuffer.Service('Dep')
.dependsOn('Base')
.withStart(async () => { startOrder.push('Dep'); })
.withStop(async () => { stopOrder.push('Dep'); })
.withRetry({ maxRetries: 0 }),
);
await manager.start();
expect(startOrder).toEqual(['Base', 'Dep']);
// Clear tracking
startOrder.length = 0;
stopOrder.length = 0;
await manager.restartService('Base');
// Dep should be stopped first, then Base, then Base restarted, then Dep
expect(stopOrder).toEqual(['Dep', 'Base']);
expect(startOrder).toEqual(['Base', 'Dep']);
await manager.stop();
});
// ── Test 12: getHealth returns correct aggregated status ───
tap.test('should return correct health aggregation', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'HealthTest' });
manager.addService(
new taskbuffer.Service('OK')
.critical()
.withStart(async () => {})
.withStop(async () => {})
.withRetry({ maxRetries: 0 }),
);
manager.addService(
new taskbuffer.Service('AlsoOK')
.optional()
.withStart(async () => {})
.withStop(async () => {})
.withRetry({ maxRetries: 0 }),
);
await manager.start();
const health = manager.getHealth();
expect(health.overall).toEqual('healthy');
expect(health.services.length).toEqual(2);
expect(health.startedAt).toBeTruthy();
expect(health.uptime).toBeGreaterThanOrEqual(0);
await manager.stop();
});
// ── Test 13: Events emitted on state transitions ───────────
tap.test('should emit events on state transitions', async () => {
const events: taskbuffer.IServiceEvent[] = [];
const manager = new taskbuffer.ServiceManager({ name: 'EventTest' });
manager.addService(
new taskbuffer.Service('Svc')
.withStart(async () => {})
.withStop(async () => {})
.withRetry({ maxRetries: 0 }),
);
manager.serviceSubject.subscribe((event) => {
events.push(event);
});
await manager.start();
await manager.stop();
const types = events.map((e) => e.type);
expect(types).toContain('started');
expect(types).toContain('stopped');
});
// ── Test 14: Parallel startup of independent services ──────
tap.test('should start independent services in parallel', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'ParallelTest' });
const startTimes: Record<string, number> = {};
manager.addService(
new taskbuffer.Service('A')
.withStart(async () => {
startTimes['A'] = Date.now();
await smartdelay.delayFor(100);
})
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('B')
.withStart(async () => {
startTimes['B'] = Date.now();
await smartdelay.delayFor(100);
})
.withStop(async () => {}),
);
await manager.start();
// Both should start at roughly the same time (within 50ms)
const diff = Math.abs(startTimes['A'] - startTimes['B']);
expect(diff).toBeLessThan(50);
await manager.stop();
});
// ── Test 15: getStatus snapshot ────────────────────────────
tap.test('should return accurate status snapshot', async () => {
const service = new taskbuffer.Service('StatusTest')
.critical()
.dependsOn('X')
.withStart(async () => {})
.withStop(async () => {});
const statusBefore = service.getStatus();
expect(statusBefore.state).toEqual('stopped');
expect(statusBefore.name).toEqual('StatusTest');
expect(statusBefore.criticality).toEqual('critical');
expect(statusBefore.dependencies).toEqual(['X']);
expect(statusBefore.errorCount).toEqual(0);
await service.start();
const statusAfter = service.getStatus();
expect(statusAfter.state).toEqual('running');
expect(statusAfter.startedAt).toBeTruthy();
expect(statusAfter.uptime).toBeGreaterThanOrEqual(0);
await service.stop();
});
// ── Test 16: Missing dependency detection ──────────────────
tap.test('should throw when a dependency is not registered', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'MissingDepTest' });
manager.addService(
new taskbuffer.Service('Lonely')
.dependsOn('Ghost')
.withStart(async () => {})
.withStop(async () => {}),
);
let caught = false;
try {
await manager.start();
} catch (err) {
caught = true;
expect((err as Error).message).toInclude('Ghost');
expect((err as Error).message).toInclude('not registered');
}
expect(caught).toBeTrue();
});
// ── Test 17: No-op on double start/stop ────────────────────
tap.test('should be a no-op when starting an already-running service', async () => {
let startCount = 0;
const service = new taskbuffer.Service('DoubleStart')
.withStart(async () => { startCount++; })
.withStop(async () => {});
await service.start();
await service.start(); // should be no-op
expect(startCount).toEqual(1);
await service.stop();
await service.stop(); // should be no-op
});
// ── Test 18: addServiceFromOptions convenience ─────────────
tap.test('should support addServiceFromOptions', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'ConvenienceTest' });
let started = false;
const service = manager.addServiceFromOptions({
name: 'Easy',
start: async () => { started = true; },
stop: async () => {},
});
expect(service).toBeInstanceOf(taskbuffer.Service);
expect(service.name).toEqual('Easy');
await manager.start();
expect(started).toBeTrue();
await manager.stop();
});
// ═══════════════════════════════════════════════════════════
// NEW TESTS: Improvements 1-5
// ═══════════════════════════════════════════════════════════
// ── Test 19: service.instance stores start result ──────────
tap.test('should store start result as service.instance', async () => {
const pool = { query: (sql: string) => `result: ${sql}` };
const service = new taskbuffer.Service<typeof pool>('DB')
.withStart(async () => pool)
.withStop(async (inst) => {});
expect(service.instance).toBeUndefined();
const result = await service.start();
expect(result).toEqual(pool);
expect(service.instance).toEqual(pool);
expect(service.instance!.query('SELECT 1')).toEqual('result: SELECT 1');
// Status should report hasInstance
expect(service.getStatus().hasInstance).toBeTrue();
await service.stop();
expect(service.instance).toBeUndefined();
expect(service.getStatus().hasInstance).toBeFalse();
});
// ── Test 20: stop receives the instance ────────────────────
tap.test('should pass instance to stop function', async () => {
let receivedInstance: any = null;
const service = new taskbuffer.Service<{ id: number }>('InstanceStop')
.withStart(async () => ({ id: 42 }))
.withStop(async (inst) => { receivedInstance = inst; });
await service.start();
await service.stop();
expect(receivedInstance).toBeTruthy();
expect(receivedInstance.id).toEqual(42);
});
// ── Test 21: healthCheck receives the instance ─────────────
tap.test('should pass instance to health check function', async () => {
let checkedInstance: any = null;
const service = new taskbuffer.Service<{ healthy: boolean }>('InstanceHealth')
.withStart(async () => ({ healthy: true }))
.withStop(async () => {})
.withHealthCheck(async (inst) => {
checkedInstance = inst;
return inst.healthy;
}, { intervalMs: 60000 }); // long interval so it doesn't auto-run
await service.start();
// Manually trigger health check
const ok = await service.checkHealth();
expect(ok).toBeTrue();
expect(checkedInstance).toBeTruthy();
expect(checkedInstance.healthy).toBeTrue();
await service.stop();
});
// ── Test 22: double start returns existing instance ────────
tap.test('should return existing instance on double start', async () => {
let callCount = 0;
const service = new taskbuffer.Service<number>('DoubleInstance')
.withStart(async () => { callCount++; return callCount; })
.withStop(async () => {});
const first = await service.start();
const second = await service.start();
expect(first).toEqual(1);
expect(second).toEqual(1); // should return same instance, not call start again
expect(callCount).toEqual(1);
await service.stop();
});
// ── Test 23: Labels on Service ─────────────────────────────
tap.test('should support labels on services', async () => {
const service = new taskbuffer.Service('LabeledService')
.withLabels({ type: 'database', env: 'production' })
.withStart(async () => {})
.withStop(async () => {});
expect(service.hasLabel('type')).toBeTrue();
expect(service.hasLabel('type', 'database')).toBeTrue();
expect(service.hasLabel('type', 'cache')).toBeFalse();
expect(service.getLabel('env')).toEqual('production');
service.setLabel('region', 'eu-west');
expect(service.hasLabel('region')).toBeTrue();
service.removeLabel('env');
expect(service.hasLabel('env')).toBeFalse();
// Labels in status
const status = service.getStatus();
expect(status.labels).toBeTruthy();
expect(status.labels!.type).toEqual('database');
expect(status.labels!.region).toEqual('eu-west');
});
// ── Test 24: Labels via IServiceOptions ────────────────────
tap.test('should accept labels in options constructor', async () => {
const service = new taskbuffer.Service({
name: 'OptionsLabeled',
start: async () => {},
stop: async () => {},
labels: { kind: 'cache' },
});
expect(service.hasLabel('kind', 'cache')).toBeTrue();
});
// ── Test 25: ServiceManager.getServicesByLabel ─────────────
tap.test('should query services by label in ServiceManager', async () => {
const manager = new taskbuffer.ServiceManager({ name: 'LabelQueryTest' });
manager.addService(
new taskbuffer.Service('Redis')
.withLabels({ type: 'cache', tier: 'fast' })
.withStart(async () => {})
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('Memcached')
.withLabels({ type: 'cache', tier: 'fast' })
.withStart(async () => {})
.withStop(async () => {}),
);
manager.addService(
new taskbuffer.Service('Postgres')
.withLabels({ type: 'database', tier: 'slow' })
.withStart(async () => {})
.withStop(async () => {}),
);
const caches = manager.getServicesByLabel('type', 'cache');
expect(caches.length).toEqual(2);
const databases = manager.getServicesByLabel('type', 'database');
expect(databases.length).toEqual(1);
expect(databases[0].name).toEqual('Postgres');
const statuses = manager.getServicesStatusByLabel('tier', 'fast');
expect(statuses.length).toEqual(2);
await manager.stop();
});
// ── Test 26: waitForState / waitForRunning ──────────────────
tap.test('should support waitForState and waitForRunning', async () => {
const service = new taskbuffer.Service('WaitService')
.withStart(async () => {
await smartdelay.delayFor(50);
})
.withStop(async () => {});
// Start in background
const startPromise = service.start();
// Wait for running state
await service.waitForRunning(2000);
expect(service.state).toEqual('running');
await startPromise;
// Now wait for stopped
const stopPromise = service.stop();
await service.waitForStopped(2000);
expect(service.state).toEqual('stopped');
await stopPromise;
});
// ── Test 27: waitForState timeout ──────────────────────────
tap.test('should timeout when waiting for a state that never comes', async () => {
const service = new taskbuffer.Service('TimeoutWait')
.withStart(async () => {})
.withStop(async () => {});
// Service is stopped, wait for 'running' with a short timeout
let caught = false;
try {
await service.waitForState('running', 100);
} catch (err) {
caught = true;
expect((err as Error).message).toInclude('timed out');
expect((err as Error).message).toInclude('running');
}
expect(caught).toBeTrue();
});
// ── Test 28: waitForState already in target state ──────────
tap.test('should resolve immediately if already in target state', async () => {
const service = new taskbuffer.Service('AlreadyThere')
.withStart(async () => {})
.withStop(async () => {});
// Already stopped
await service.waitForState('stopped', 100); // should not throw
await service.start();
await service.waitForState('running', 100); // should not throw
await service.stop();
});
// ── Test 29: Per-service startup timeout ───────────────────
tap.test('should timeout when serviceStart hangs', async () => {
const service = new taskbuffer.Service('SlowStart')
.withStart(async () => {
await smartdelay.delayFor(5000); // very slow
})
.withStop(async () => {})
.withStartupTimeout(100); // 100ms timeout
let caught = false;
try {
await service.start();
} catch (err) {
caught = true;
expect((err as Error).message).toInclude('startup timed out');
expect((err as Error).message).toInclude('100ms');
}
expect(caught).toBeTrue();
expect(service.state).toEqual('failed');
});
// ── Test 30: Startup timeout via options ────────────────────
tap.test('should accept startupTimeoutMs in options constructor', async () => {
const service = new taskbuffer.Service({
name: 'TimeoutOptions',
start: async () => { await smartdelay.delayFor(5000); },
stop: async () => {},
startupTimeoutMs: 100,
});
let caught = false;
try {
await service.start();
} catch (err) {
caught = true;
expect((err as Error).message).toInclude('startup timed out');
}
expect(caught).toBeTrue();
});
// ── Test 31: Auto-restart on health check failure ──────────
tap.test('should auto-restart when health checks fail', async () => {
let startCount = 0;
let healthy = false;
const service = new taskbuffer.Service('AutoRestart')
.withStart(async () => {
startCount++;
// After first restart, become healthy
if (startCount >= 2) {
healthy = true;
}
})
.withStop(async () => {})
.withHealthCheck(async () => healthy, {
intervalMs: 30000, // we'll call checkHealth manually
failuresBeforeDegraded: 1,
failuresBeforeFailed: 2,
autoRestart: true,
maxAutoRestarts: 3,
autoRestartDelayMs: 50,
autoRestartBackoffFactor: 1,
});
await service.start();
expect(startCount).toEqual(1);
// Fail health checks manually to push to failed
await service.checkHealth(); // 1 failure -> degraded
expect(service.state).toEqual('degraded');
await service.checkHealth(); // 2 failures -> failed + auto-restart scheduled
expect(service.state).toEqual('failed');
// Wait for auto-restart to complete
await service.waitForRunning(2000);
expect(service.state).toEqual('running');
expect(startCount).toEqual(2);
await service.stop();
});
// ── Test 32: Auto-restart max attempts ─────────────────────
tap.test('should stop auto-restarting after max attempts', async () => {
let startCount = 0;
const events: string[] = [];
const service = new taskbuffer.Service('MaxRestart')
.withStart(async () => {
startCount++;
if (startCount > 1) {
throw new Error('always fails');
}
})
.withStop(async () => {})
.withHealthCheck(async () => false, {
intervalMs: 60000,
failuresBeforeDegraded: 1,
failuresBeforeFailed: 2,
autoRestart: true,
maxAutoRestarts: 2,
autoRestartDelayMs: 50,
autoRestartBackoffFactor: 1,
});
service.eventSubject.subscribe((e) => events.push(e.type));
await service.start();
expect(startCount).toEqual(1);
// Push to failed
await service.checkHealth();
await service.checkHealth();
// Wait for auto-restart attempts to exhaust
await smartdelay.delayFor(500);
// Should have emitted autoRestarting events
expect(events).toContain('autoRestarting');
// Service should be in failed state (restarts exhausted)
expect(service.state).toEqual('failed');
// Clean up
await service.stop();
});
// ── Test 33: Builder chaining with new methods ─────────────
tap.test('should chain all new builder methods', async () => {
const service = new taskbuffer.Service('FullBuilder')
.critical()
.dependsOn('A')
.withStart(async () => ({ conn: true }))
.withStop(async (inst) => {})
.withHealthCheck(async (inst) => inst.conn)
.withRetry({ maxRetries: 3 })
.withStartupTimeout(5000)
.withLabels({ env: 'test' });
expect(service.criticality).toEqual('critical');
expect(service.startupTimeoutMs).toEqual(5000);
expect(service.hasLabel('env', 'test')).toBeTrue();
});
export default tap.start();

View File

@@ -36,12 +36,9 @@ tap.test('expect run tasks in sequence', async () => {
});
const testPromise = testTaskchain.trigger();
await smartdelay.delayFor(2100);
// tslint:disable-next-line:no-unused-expression
expect(task1Executed).toBeTrue();
// tslint:disable-next-line:no-unused-expression
expect(task2Executed).toBeFalse();
await smartdelay.delayFor(2100);
// tslint:disable-next-line:no-unused-expression
expect(task2Executed).toBeTrue();
await testPromise;
});

View File

@@ -33,7 +33,6 @@ tap.test('should run the task as expected', async () => {
);
myTaskManager.start();
await myTaskManager.triggerTaskByName('myTask');
// tslint:disable-next-line:no-unused-expression
expect(referenceBoolean).toBeTrue();
});

View File

@@ -1,52 +1,151 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay';
let counter1 = 0;
let counter2 = 0;
let counter3 = 0;
// Test 1: Basic buffered execution with afterTask chain
tap.test('should run buffered tasks with afterTask chain', async () => {
let counter1 = 0;
let counter2 = 0;
let counter3 = 0;
tap.test('should run buffered', async (tools) => {
const task = new taskbuffer.Task({
name: 'a buffered task',
taskFunction: async () => {
counter1++;
await tools.delayFor(2000);
console.log(`task 1 ran ${counter1} times`);
},
buffered: true,
bufferMax: 1,
afterTask: () => {
return task2;
},
});
const task2 = new taskbuffer.Task({
name: 'a buffered task',
taskFunction: async () => {
counter2++;
await tools.delayFor(2000);
console.log(`task2 ran ${counter2} times`);
},
buffered: true,
bufferMax: 1,
afterTask: () => {
return task3;
},
});
const task3 = new taskbuffer.Task({
name: 'a buffered task',
name: 'buffered-chain-3',
taskFunction: async () => {
counter3++;
await tools.delayFor(2000);
console.log(`task3 ran ${counter3} times`);
await smartdelay.delayFor(50);
},
buffered: true,
bufferMax: 1,
});
while (counter1 < 10) {
await tools.delayFor(5000);
task.trigger();
const task2 = new taskbuffer.Task({
name: 'buffered-chain-2',
taskFunction: async () => {
counter2++;
await smartdelay.delayFor(50);
},
buffered: true,
bufferMax: 1,
afterTask: () => task3,
});
const task1 = new taskbuffer.Task({
name: 'buffered-chain-1',
taskFunction: async () => {
counter1++;
await smartdelay.delayFor(50);
},
buffered: true,
bufferMax: 1,
afterTask: () => task2,
});
// Trigger 3 times with enough spacing for the chain to complete
for (let i = 0; i < 3; i++) {
task1.trigger();
await smartdelay.delayFor(250); // enough for chain of 3 x 50ms tasks
}
// Wait for final chain to finish
await smartdelay.delayFor(500);
// Each task in the chain should have run at least once
expect(counter1).toBeGreaterThanOrEqual(1);
expect(counter2).toBeGreaterThanOrEqual(1);
expect(counter3).toBeGreaterThanOrEqual(1);
// afterTask chain means task2 count should match task1 (each trigger chains)
expect(counter2).toEqual(counter1);
expect(counter3).toEqual(counter1);
});
tap.start();
// Test 2: bufferMax limits concurrent buffered executions
tap.test('should respect bufferMax for concurrent buffered calls', async () => {
let running = 0;
let maxRunning = 0;
let totalRuns = 0;
const task = new taskbuffer.Task({
name: 'buffer-max-test',
taskFunction: async () => {
running++;
maxRunning = Math.max(maxRunning, running);
totalRuns++;
await smartdelay.delayFor(100);
running--;
},
buffered: true,
bufferMax: 2,
});
// Fire many triggers rapidly — only bufferMax should run concurrently
for (let i = 0; i < 10; i++) {
task.trigger();
}
// Wait for all buffered executions to complete
await smartdelay.delayFor(1000);
expect(maxRunning).toBeLessThanOrEqual(2);
expect(totalRuns).toBeGreaterThanOrEqual(1);
});
// Test 3: bufferMax limits how many runs are queued during execution
tap.test('should limit queued runs to bufferMax during execution', async () => {
let runCount = 0;
const task = new taskbuffer.Task({
name: 'buffer-queue-test',
taskFunction: async () => {
runCount++;
await smartdelay.delayFor(100);
},
buffered: true,
bufferMax: 2,
});
// Rapid-fire 5 triggers — bufferMax:2 means counter caps at 2
// so only 2 runs will happen (the initial run + 1 buffered rerun)
task.trigger();
task.trigger();
task.trigger();
task.trigger();
task.trigger();
await smartdelay.delayFor(500);
expect(runCount).toEqual(2);
});
// Test 4: Triggers spaced after completion queue new runs
tap.test('should re-trigger after previous buffered run completes', async () => {
let runCount = 0;
const task = new taskbuffer.Task({
name: 'retrigger-test',
taskFunction: async () => {
runCount++;
await smartdelay.delayFor(50);
},
buffered: true,
bufferMax: 1,
});
// First trigger starts execution
task.trigger();
// Wait for it to complete
await smartdelay.delayFor(100);
// Second trigger starts a new execution (task is now idle)
task.trigger();
await smartdelay.delayFor(100);
// Third trigger
task.trigger();
await smartdelay.delayFor(100);
expect(runCount).toEqual(3);
});
export default tap.start();

View File

@@ -1,34 +0,0 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as taskbuffer from '../ts/index.js';
let testTaskRunner: taskbuffer.TaskRunner;
tap.test('should create a valid taskrunner', async () => {
testTaskRunner = new taskbuffer.TaskRunner();
await testTaskRunner.start();
});
tap.test('should execute task when its scheduled', async (tools) => {
const done = tools.defer();
testTaskRunner.addTask(
new taskbuffer.Task({
taskFunction: async () => {
console.log('hi');
},
}),
);
testTaskRunner.addTask(
new taskbuffer.Task({
taskFunction: async () => {
console.log('there');
done.resolve();
},
}),
);
await done.promise;
});
tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.1.1',
version: '8.0.2',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -4,15 +4,32 @@ export { Taskchain } from './taskbuffer.classes.taskchain.js';
export { Taskparallel } from './taskbuffer.classes.taskparallel.js';
export { TaskManager } from './taskbuffer.classes.taskmanager.js';
export { TaskOnce } from './taskbuffer.classes.taskonce.js';
export { TaskRunner } from './taskbuffer.classes.taskrunner.js';
export { TaskDebounced } from './taskbuffer.classes.taskdebounced.js';
export { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
// Task step system
export { TaskStep } from './taskbuffer.classes.taskstep.js';
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
// Metadata interfaces
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType } from './taskbuffer.interfaces.js';
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
// Service lifecycle system
export { Service } from './taskbuffer.classes.service.js';
export { ServiceManager } from './taskbuffer.classes.servicemanager.js';
export type {
IServiceOptions,
IServiceStatus,
IServiceEvent,
IServiceManagerOptions,
IServiceManagerHealth,
IRetryConfig,
IHealthCheckConfig,
TServiceState,
TServiceCriticality,
TServiceEventType,
TOverallHealth,
} from './taskbuffer.interfaces.js';
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
export { distributedCoordination };

View File

@@ -6,7 +6,7 @@ export class BufferRunner {
// initialize by default
public bufferCounter: number = 0;
constructor(taskArg: Task<any>) {
constructor(taskArg: Task<any, any, any>) {
this.task = taskArg;
}

View File

@@ -9,7 +9,7 @@ export interface ICycleObject {
export class CycleCounter {
public task: Task;
public cycleObjectArray: ICycleObject[] = [];
constructor(taskArg: Task<any>) {
constructor(taskArg: Task<any, any, any>) {
this.task = taskArg;
}
public getPromiseForCycle(cycleCountArg: number) {

View File

@@ -0,0 +1,528 @@
import * as plugins from './taskbuffer.plugins.js';
import { logger } from './taskbuffer.logging.js';
import type {
TServiceState,
TServiceCriticality,
IServiceEvent,
IServiceStatus,
IRetryConfig,
IHealthCheckConfig,
IServiceOptions,
} from './taskbuffer.interfaces.js';
/**
* Service represents a long-running component with start/stop lifecycle,
* health checking, and retry capabilities.
*
* Use via builder pattern:
* new Service('MyService')
* .critical()
* .dependsOn('Database')
* .withStart(async () => { ... })
* .withStop(async (instance) => { ... })
* .withHealthCheck(async (instance) => { ... })
*
* Or extend for complex services:
* class MyService extends Service {
* protected async serviceStart() { ... }
* protected async serviceStop() { ... }
* }
*/
export class Service<T = any> {
public readonly name: string;
public readonly eventSubject = new plugins.smartrx.rxjs.Subject<IServiceEvent>();
// ── Internal state ─────────────────────────────────
private _state: TServiceState = 'stopped';
private _criticality: TServiceCriticality = 'optional';
private _dependencies: string[] = [];
private _retryConfig: IRetryConfig | undefined;
private _healthCheckConfig: IHealthCheckConfig | undefined;
private _startupTimeoutMs: number | undefined;
// Builder-provided functions
private _startFn: (() => Promise<T>) | undefined;
private _stopFn: ((instance: T) => Promise<void>) | undefined;
private _healthCheckFn: ((instance: T) => Promise<boolean>) | undefined;
// Instance: the resolved start result
private _instance: T | undefined;
// Labels
public labels: Record<string, string> = {};
// Runtime tracking
private _startedAt: number | undefined;
private _stoppedAt: number | undefined;
private _errorCount = 0;
private _lastError: string | undefined;
private _retryCount = 0;
// Health check tracking
private _healthCheckTimer: ReturnType<typeof setTimeout> | undefined;
private _lastHealthCheck: number | undefined;
private _healthCheckOk: boolean | undefined;
private _consecutiveHealthFailures = 0;
// Auto-restart tracking
private _autoRestartCount = 0;
private _autoRestartTimer: ReturnType<typeof setTimeout> | undefined;
constructor(nameOrOptions: string | IServiceOptions<T>) {
if (typeof nameOrOptions === 'string') {
this.name = nameOrOptions;
} else {
this.name = nameOrOptions.name;
this._startFn = nameOrOptions.start;
this._stopFn = nameOrOptions.stop;
this._healthCheckFn = nameOrOptions.healthCheck;
this._criticality = nameOrOptions.criticality || 'optional';
this._dependencies = nameOrOptions.dependencies || [];
this._retryConfig = nameOrOptions.retry;
this._healthCheckConfig = nameOrOptions.healthCheckConfig;
this._startupTimeoutMs = nameOrOptions.startupTimeoutMs;
if (nameOrOptions.labels) {
this.labels = { ...nameOrOptions.labels };
}
}
}
// ── Builder methods ──────────────────────────────────
public critical(): this {
this._criticality = 'critical';
return this;
}
public optional(): this {
this._criticality = 'optional';
return this;
}
public dependsOn(...serviceNames: string[]): this {
this._dependencies.push(...serviceNames);
return this;
}
public withStart(fn: () => Promise<T>): this {
this._startFn = fn;
return this;
}
public withStop(fn: (instance: T) => Promise<void>): this {
this._stopFn = fn;
return this;
}
public withHealthCheck(fn: (instance: T) => Promise<boolean>, config?: IHealthCheckConfig): this {
this._healthCheckFn = fn;
if (config) {
this._healthCheckConfig = config;
}
return this;
}
public withRetry(config: IRetryConfig): this {
this._retryConfig = config;
return this;
}
public withStartupTimeout(ms: number): this {
this._startupTimeoutMs = ms;
return this;
}
public withLabels(labelsArg: Record<string, string>): this {
Object.assign(this.labels, labelsArg);
return this;
}
// ── Label helpers ──────────────────────────────────
public setLabel(key: string, value: string): void {
this.labels[key] = value;
}
public getLabel(key: string): string | undefined {
return this.labels[key];
}
public removeLabel(key: string): boolean {
if (key in this.labels) {
delete this.labels[key];
return true;
}
return false;
}
public hasLabel(key: string, value?: string): boolean {
if (value !== undefined) {
return this.labels[key] === value;
}
return key in this.labels;
}
// ── Overridable hooks (for subclassing) ──────────────
protected async serviceStart(): Promise<T> {
if (this._startFn) {
return this._startFn();
}
throw new Error(`Service '${this.name}': no start function provided. Use withStart() or override serviceStart().`);
}
protected async serviceStop(): Promise<void> {
if (this._stopFn) {
return this._stopFn(this._instance as T);
}
// Default: no-op stop is fine (some services don't need explicit cleanup)
}
protected async serviceHealthCheck(): Promise<boolean> {
if (this._healthCheckFn) {
return this._healthCheckFn(this._instance as T);
}
// No health check configured — assume healthy if running
return this._state === 'running';
}
// ── Lifecycle (called by ServiceManager) ─────────────
public async start(): Promise<T> {
if (this._state === 'running') {
return this._instance as T;
}
this.setState('starting');
try {
let result: T;
if (this._startupTimeoutMs) {
result = await Promise.race([
this.serviceStart(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`Service '${this.name}': startup timed out after ${this._startupTimeoutMs}ms`)), this._startupTimeoutMs)
),
]);
} else {
result = await this.serviceStart();
}
this._instance = result;
this._startedAt = Date.now();
this._stoppedAt = undefined;
this._consecutiveHealthFailures = 0;
this._healthCheckOk = true;
this._autoRestartCount = 0;
this.setState('running');
this.emitEvent('started');
this.startHealthCheckTimer();
return result;
} catch (err) {
this._errorCount++;
this._lastError = err instanceof Error ? err.message : String(err);
this.setState('failed');
this.emitEvent('failed', { error: this._lastError });
throw err;
}
}
public async stop(): Promise<void> {
if (this._state === 'stopped' || this._state === 'stopping') {
return;
}
this.stopHealthCheckTimer();
this.clearAutoRestartTimer();
this.setState('stopping');
try {
await this.serviceStop();
} catch (err) {
logger.log('warn', `Service '${this.name}' error during stop: ${err instanceof Error ? err.message : String(err)}`);
}
this._instance = undefined;
this._stoppedAt = Date.now();
this.setState('stopped');
this.emitEvent('stopped');
}
public async checkHealth(): Promise<boolean | undefined> {
if (!this._healthCheckFn && !this.hasOverriddenHealthCheck()) {
return undefined;
}
try {
const config = this._healthCheckConfig;
const timeoutMs = config?.timeoutMs ?? 5000;
const result = await Promise.race([
this.serviceHealthCheck(),
new Promise<boolean>((_, reject) =>
setTimeout(() => reject(new Error('Health check timed out')), timeoutMs)
),
]);
this._lastHealthCheck = Date.now();
this._healthCheckOk = result;
if (result) {
this._consecutiveHealthFailures = 0;
if (this._state === 'degraded') {
this.setState('running');
this.emitEvent('recovered');
}
} else {
this._consecutiveHealthFailures++;
this.handleHealthFailure();
}
this.emitEvent('healthCheck');
return result;
} catch (err) {
this._lastHealthCheck = Date.now();
this._healthCheckOk = false;
this._consecutiveHealthFailures++;
this.handleHealthFailure();
this.emitEvent('healthCheck');
return false;
}
}
// ── Wait / readiness ──────────────────────────────────
public async waitForState(
targetState: TServiceState | TServiceState[],
timeoutMs?: number,
): Promise<void> {
const states = Array.isArray(targetState) ? targetState : [targetState];
// Already in target state
if (states.includes(this._state)) {
return;
}
return new Promise<void>((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined;
let settled = false;
const settle = (fn: () => void) => {
if (settled) return;
settled = true;
subscription.unsubscribe();
if (timer) clearTimeout(timer);
fn();
};
const subscription = this.eventSubject.subscribe((event) => {
if (states.includes(event.state)) {
settle(resolve);
}
});
// Re-check after subscribing to close the race window
if (states.includes(this._state)) {
settle(resolve);
return;
}
if (timeoutMs !== undefined) {
timer = setTimeout(() => {
settle(() =>
reject(
new Error(
`Service '${this.name}': timed out waiting for state [${states.join(', ')}] after ${timeoutMs}ms (current: ${this._state})`,
),
),
);
}, timeoutMs);
}
});
}
public async waitForRunning(timeoutMs?: number): Promise<void> {
return this.waitForState('running', timeoutMs);
}
public async waitForStopped(timeoutMs?: number): Promise<void> {
return this.waitForState('stopped', timeoutMs);
}
// ── State ────────────────────────────────────────────
public get state(): TServiceState {
return this._state;
}
public get criticality(): TServiceCriticality {
return this._criticality;
}
public get dependencies(): string[] {
return [...this._dependencies];
}
public get retryConfig(): IRetryConfig | undefined {
return this._retryConfig;
}
public get startupTimeoutMs(): number | undefined {
return this._startupTimeoutMs;
}
public get instance(): T | undefined {
return this._instance;
}
public get errorCount(): number {
return this._errorCount;
}
public get retryCount(): number {
return this._retryCount;
}
public set retryCount(value: number) {
this._retryCount = value;
}
public getStatus(): IServiceStatus {
return {
name: this.name,
state: this._state,
criticality: this._criticality,
startedAt: this._startedAt,
stoppedAt: this._stoppedAt,
lastHealthCheck: this._lastHealthCheck,
healthCheckOk: this._healthCheckOk,
uptime: this._startedAt && this._state === 'running'
? Date.now() - this._startedAt
: undefined,
errorCount: this._errorCount,
lastError: this._lastError,
retryCount: this._retryCount,
dependencies: [...this._dependencies],
labels: { ...this.labels },
hasInstance: this._instance !== undefined,
};
}
// ── Internal helpers ─────────────────────────────────
private setState(state: TServiceState): void {
this._state = state;
}
private emitEvent(type: IServiceEvent['type'], extra?: Partial<IServiceEvent>): void {
this.eventSubject.next({
type,
serviceName: this.name,
state: this._state,
timestamp: Date.now(),
...extra,
});
}
private handleHealthFailure(): void {
const config = this._healthCheckConfig;
const failuresBeforeDegraded = config?.failuresBeforeDegraded ?? 3;
const failuresBeforeFailed = config?.failuresBeforeFailed ?? 5;
if (this._state === 'running' && this._consecutiveHealthFailures >= failuresBeforeDegraded) {
this.setState('degraded');
this.emitEvent('degraded');
}
if (this._consecutiveHealthFailures >= failuresBeforeFailed) {
this.setState('failed');
this._lastError = `Health check failed ${this._consecutiveHealthFailures} consecutive times`;
this.emitEvent('failed', { error: this._lastError });
this.stopHealthCheckTimer();
// Auto-restart if configured
if (config?.autoRestart) {
this.scheduleAutoRestart();
}
}
}
private scheduleAutoRestart(): void {
const config = this._healthCheckConfig;
const maxRestarts = config?.maxAutoRestarts ?? 3;
if (maxRestarts > 0 && this._autoRestartCount >= maxRestarts) {
logger.log('warn', `Service '${this.name}': max auto-restarts (${maxRestarts}) exceeded`);
return;
}
const baseDelay = config?.autoRestartDelayMs ?? 5000;
const factor = config?.autoRestartBackoffFactor ?? 2;
const delay = Math.min(baseDelay * Math.pow(factor, this._autoRestartCount), 60000);
this._autoRestartCount++;
this.emitEvent('autoRestarting', { attempt: this._autoRestartCount });
this._autoRestartTimer = setTimeout(async () => {
this._autoRestartTimer = undefined;
try {
// Stop first to clean up, then start fresh
this._instance = undefined;
this._stoppedAt = Date.now();
this.setState('stopped');
await this.start();
// Success — reset counter
this._autoRestartCount = 0;
} catch (err) {
logger.log('warn', `Service '${this.name}': auto-restart attempt ${this._autoRestartCount} failed: ${err instanceof Error ? err.message : String(err)}`);
// Schedule another attempt
this.scheduleAutoRestart();
}
}, delay);
if (this._autoRestartTimer && typeof this._autoRestartTimer === 'object' && 'unref' in this._autoRestartTimer) {
(this._autoRestartTimer as any).unref();
}
}
private clearAutoRestartTimer(): void {
if (this._autoRestartTimer) {
clearTimeout(this._autoRestartTimer);
this._autoRestartTimer = undefined;
}
}
private startHealthCheckTimer(): void {
if (!this._healthCheckFn && !this.hasOverriddenHealthCheck()) {
return;
}
const config = this._healthCheckConfig;
const intervalMs = config?.intervalMs ?? 30000;
this.stopHealthCheckTimer();
const tick = () => {
if (this._state !== 'running' && this._state !== 'degraded') {
return;
}
this.checkHealth().catch(() => {});
this._healthCheckTimer = setTimeout(tick, intervalMs);
if (this._healthCheckTimer && typeof this._healthCheckTimer === 'object' && 'unref' in this._healthCheckTimer) {
(this._healthCheckTimer as any).unref();
}
};
this._healthCheckTimer = setTimeout(tick, intervalMs);
if (this._healthCheckTimer && typeof this._healthCheckTimer === 'object' && 'unref' in this._healthCheckTimer) {
(this._healthCheckTimer as any).unref();
}
}
private stopHealthCheckTimer(): void {
if (this._healthCheckTimer) {
clearTimeout(this._healthCheckTimer);
this._healthCheckTimer = undefined;
}
}
private hasOverriddenHealthCheck(): boolean {
return this.serviceHealthCheck !== Service.prototype.serviceHealthCheck;
}
}

View File

@@ -0,0 +1,436 @@
import * as plugins from './taskbuffer.plugins.js';
import { logger } from './taskbuffer.logging.js';
import { Service } from './taskbuffer.classes.service.js';
import type {
IServiceEvent,
IServiceStatus,
IServiceManagerOptions,
IServiceManagerHealth,
IServiceOptions,
IRetryConfig,
TOverallHealth,
} from './taskbuffer.interfaces.js';
/**
* ServiceManager orchestrates multiple Service instances with:
* - Dependency-ordered startup (topological sort, level-by-level parallel)
* - Failure isolation (optional services don't crash the system)
* - Retry with exponential backoff for failed optional services
* - Reverse-dependency-ordered shutdown
* - Aggregated health status
*/
export class ServiceManager {
public readonly name: string;
public readonly serviceSubject = new plugins.smartrx.rxjs.Subject<IServiceEvent>();
private services = new Map<string, Service>();
private startupOrder: string[][] = []; // levels of service names
private options: Required<IServiceManagerOptions>;
private _startedAt: number | undefined;
private subscriptions: plugins.smartrx.rxjs.Subscription[] = [];
constructor(options?: IServiceManagerOptions) {
this.name = options?.name || 'ServiceManager';
this.options = {
name: this.name,
defaultRetry: options?.defaultRetry || { maxRetries: 3, baseDelayMs: 1000, maxDelayMs: 30000, backoffFactor: 2 },
defaultHealthCheck: options?.defaultHealthCheck || {},
startupTimeoutMs: options?.startupTimeoutMs ?? 120000,
shutdownTimeoutMs: options?.shutdownTimeoutMs ?? 30000,
};
}
// ── Service Registration ─────────────────────────────
public addService(service: Service): void {
if (this.services.has(service.name)) {
throw new Error(`Service '${service.name}' is already registered`);
}
this.services.set(service.name, service);
// Forward service events to the aggregated subject
const sub = service.eventSubject.subscribe((event) => {
this.serviceSubject.next(event);
});
this.subscriptions.push(sub);
}
public addServiceFromOptions<T>(options: IServiceOptions<T>): Service<T> {
const service = new Service<T>(options);
this.addService(service);
return service;
}
public removeService(name: string): void {
// Check no other service depends on this one
for (const [svcName, svc] of this.services) {
if (svc.dependencies.includes(name)) {
throw new Error(`Cannot remove service '${name}': service '${svcName}' depends on it`);
}
}
this.services.delete(name);
}
// ── Lifecycle ────────────────────────────────────────
public async start(): Promise<void> {
// Build startup order via topological sort
this.startupOrder = this.topologicalSort();
this._startedAt = Date.now();
const startupPromise = this.startAllLevels();
// Enforce global startup timeout
if (this.options.startupTimeoutMs) {
const timeout = new plugins.smartdelay.Timeout(this.options.startupTimeoutMs);
const timeoutPromise = timeout.promise.then(() => {
throw new Error(`${this.name}: global startup timeout exceeded (${this.options.startupTimeoutMs}ms)`);
});
try {
await Promise.race([startupPromise, timeoutPromise]);
} finally {
timeout.cancel();
}
} else {
await startupPromise;
}
}
private async startAllLevels(): Promise<void> {
const startedServices: string[] = [];
logger.log('info', `${this.name}: starting ${this.services.size} services in ${this.startupOrder.length} levels`);
for (let levelIdx = 0; levelIdx < this.startupOrder.length; levelIdx++) {
const level = this.startupOrder[levelIdx];
logger.log('info', `${this.name}: starting level ${levelIdx}: [${level.join(', ')}]`);
const results = await Promise.allSettled(
level.map(async (name) => {
const service = this.services.get(name)!;
try {
await this.startServiceWithRetry(service);
startedServices.push(name);
} catch (err) {
if (service.criticality === 'critical') {
throw err;
}
// Optional service — log and continue
logger.log('warn', `${this.name}: optional service '${name}' failed to start: ${err instanceof Error ? err.message : String(err)}`);
}
}),
);
// Check if any critical service failed
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (result.status === 'rejected') {
const name = level[i];
const service = this.services.get(name);
if (service && service.criticality === 'critical') {
logger.log('error', `${this.name}: critical service '${name}' failed, aborting startup`);
// Rollback: stop all started services in reverse order
await this.stopServices(startedServices.reverse());
throw new Error(`Critical service '${name}' failed to start: ${result.reason instanceof Error ? result.reason.message : String(result.reason)}`);
}
}
}
}
const statuses = this.getAllStatuses();
const running = statuses.filter((s) => s.state === 'running').length;
const failed = statuses.filter((s) => s.state === 'failed').length;
logger.log('info', `${this.name}: startup complete — ${running} running, ${failed} failed`);
}
public async stop(): Promise<void> {
logger.log('info', `${this.name}: stopping all services`);
// Stop in reverse startup order
const reversedLevels = [...this.startupOrder].reverse();
for (const level of reversedLevels) {
const runningInLevel = level.filter((name) => {
const svc = this.services.get(name);
return svc && svc.state !== 'stopped';
});
if (runningInLevel.length === 0) continue;
await Promise.allSettled(
runningInLevel.map(async (name) => {
const service = this.services.get(name)!;
try {
const timeout = new plugins.smartdelay.Timeout(this.options.shutdownTimeoutMs);
const timeoutPromise = timeout.promise.then(() => {
throw new Error(`Timeout stopping service '${name}'`);
});
try {
await Promise.race([service.stop(), timeoutPromise]);
} finally {
timeout.cancel();
}
} catch (err) {
logger.log('warn', `${this.name}: error stopping '${name}': ${err instanceof Error ? err.message : String(err)}`);
}
}),
);
}
// Clean up subscriptions
for (const sub of this.subscriptions) {
sub.unsubscribe();
}
this.subscriptions = [];
this._startedAt = undefined;
logger.log('info', `${this.name}: all services stopped`);
}
// ── Querying ─────────────────────────────────────────
public getService(name: string): Service | undefined {
return this.services.get(name);
}
public getServiceStatus(name: string): IServiceStatus | undefined {
return this.services.get(name)?.getStatus();
}
public getAllStatuses(): IServiceStatus[] {
return Array.from(this.services.values()).map((s) => s.getStatus());
}
public getServicesByLabel(key: string, value: string): Service[] {
return Array.from(this.services.values()).filter((s) => s.labels[key] === value);
}
public getServicesStatusByLabel(key: string, value: string): IServiceStatus[] {
return this.getServicesByLabel(key, value).map((s) => s.getStatus());
}
public getHealth(): IServiceManagerHealth {
const statuses = this.getAllStatuses();
let overall: TOverallHealth = 'healthy';
const hasCriticalDown = statuses.some(
(s) => s.criticality === 'critical' && s.state !== 'running' && s.state !== 'stopped',
);
const hasAnyDown = statuses.some(
(s) => s.state !== 'running' && s.state !== 'stopped',
);
if (hasCriticalDown) {
overall = 'unhealthy';
} else if (hasAnyDown) {
overall = 'degraded';
}
return {
overall,
services: statuses,
startedAt: this._startedAt,
uptime: this._startedAt ? Date.now() - this._startedAt : undefined,
};
}
// ── Runtime Operations ───────────────────────────────
public async restartService(name: string): Promise<void> {
const service = this.services.get(name);
if (!service) {
throw new Error(`Service '${name}' not found`);
}
// Find all transitive dependents
const dependents = this.getTransitiveDependents(name);
// Stop dependents in reverse dependency order
const dependentLevels = this.getLevelsForServices(dependents).reverse();
for (const level of dependentLevels) {
await Promise.allSettled(
level.map((depName) => {
const svc = this.services.get(depName);
return svc && svc.state !== 'stopped' ? svc.stop() : Promise.resolve();
}),
);
}
// Stop the target service
await service.stop();
// Start the target service
await this.startServiceWithRetry(service);
// Restart dependents in dependency order
const dependentLevelsForward = this.getLevelsForServices(dependents);
for (const level of dependentLevelsForward) {
await Promise.allSettled(
level.map(async (depName) => {
const svc = this.services.get(depName)!;
try {
await this.startServiceWithRetry(svc);
} catch (err) {
logger.log('warn', `${this.name}: failed to restart dependent '${depName}': ${err instanceof Error ? err.message : String(err)}`);
}
}),
);
}
}
// ── Internal helpers ─────────────────────────────────
private async startServiceWithRetry(service: Service): Promise<void> {
const retryConfig = service.retryConfig || this.options.defaultRetry;
const maxRetries = retryConfig.maxRetries ?? 3;
const baseDelay = retryConfig.baseDelayMs ?? 1000;
const maxDelay = retryConfig.maxDelayMs ?? 30000;
const factor = retryConfig.backoffFactor ?? 2;
let lastError: Error | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await service.start();
return;
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err));
service.retryCount = attempt + 1;
if (attempt < maxRetries) {
const delay = Math.min(baseDelay * Math.pow(factor, attempt), maxDelay);
const jitter = 0.8 + Math.random() * 0.4;
const actualDelay = Math.floor(delay * jitter);
service.eventSubject.next({
type: 'retrying',
serviceName: service.name,
state: service.state,
timestamp: Date.now(),
error: lastError.message,
attempt: attempt + 1,
});
logger.log('info', `${this.name}: retrying '${service.name}' in ${actualDelay}ms (attempt ${attempt + 1}/${maxRetries})`);
await plugins.smartdelay.delayFor(actualDelay);
}
}
}
throw lastError!;
}
private async stopServices(names: string[]): Promise<void> {
await Promise.allSettled(
names.map(async (name) => {
const service = this.services.get(name);
if (service && service.state !== 'stopped') {
try {
await service.stop();
} catch (err) {
logger.log('warn', `${this.name}: error stopping '${name}' during rollback: ${err instanceof Error ? err.message : String(err)}`);
}
}
}),
);
}
/**
* Topological sort using Kahn's algorithm.
* Returns levels of service names — services within a level can start in parallel.
*/
private topologicalSort(): string[][] {
const names = new Set(this.services.keys());
const inDegree = new Map<string, number>();
const dependents = new Map<string, string[]>(); // dep -> services that depend on it
// Initialize
for (const name of names) {
inDegree.set(name, 0);
dependents.set(name, []);
}
// Build graph
for (const [name, service] of this.services) {
for (const dep of service.dependencies) {
if (!names.has(dep)) {
throw new Error(`Service '${name}' depends on '${dep}', which is not registered`);
}
inDegree.set(name, (inDegree.get(name) || 0) + 1);
dependents.get(dep)!.push(name);
}
}
// Process level by level
const levels: string[][] = [];
const remaining = new Set(names);
while (remaining.size > 0) {
const level: string[] = [];
for (const name of remaining) {
if ((inDegree.get(name) || 0) === 0) {
level.push(name);
}
}
if (level.length === 0) {
// Cycle detected
const cycleNodes = Array.from(remaining).join(', ');
throw new Error(`Circular dependency detected involving services: ${cycleNodes}`);
}
// Remove this level's nodes and update in-degrees
for (const name of level) {
remaining.delete(name);
for (const dependent of dependents.get(name) || []) {
inDegree.set(dependent, (inDegree.get(dependent) || 0) - 1);
}
}
levels.push(level);
}
return levels;
}
/**
* Get all services that transitively depend on the given service.
*/
private getTransitiveDependents(name: string): string[] {
const result = new Set<string>();
const queue = [name];
while (queue.length > 0) {
const current = queue.shift()!;
for (const [svcName, svc] of this.services) {
if (svc.dependencies.includes(current) && !result.has(svcName)) {
result.add(svcName);
queue.push(svcName);
}
}
}
return Array.from(result);
}
/**
* Organize a set of service names into dependency-ordered levels.
*/
private getLevelsForServices(names: string[]): string[][] {
if (names.length === 0) return [];
const nameSet = new Set(names);
const levels: string[][] = [];
// Simple approach: filter the global startup order to include only the given names
for (const level of this.startupOrder) {
const filtered = level.filter((n) => nameSet.has(n));
if (filtered.length > 0) {
levels.push(filtered);
}
}
return levels;
}
}

View File

@@ -19,18 +19,18 @@ export type TPreOrAfterTaskFunction = () => Task<any>;
// Type helper to extract step names from array
export type StepNames<T> = T extends ReadonlyArray<{ name: infer N }> ? N : never;
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []> {
export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = [], TData extends Record<string, unknown> = Record<string, unknown>> {
public static extractTask<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
preOrAfterTaskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
): Task<T, TSteps> {
preOrAfterTaskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
): Task<T, TSteps, any> {
switch (true) {
case !preOrAfterTaskArg:
return null;
case preOrAfterTaskArg instanceof Task:
return preOrAfterTaskArg as Task<T, TSteps>;
return preOrAfterTaskArg as Task<T, TSteps, any>;
case typeof preOrAfterTaskArg === 'function':
const taskFunction = preOrAfterTaskArg as TPreOrAfterTaskFunction;
return taskFunction() as unknown as Task<T, TSteps>;
return taskFunction() as unknown as Task<T, TSteps, any>;
default:
return null;
}
@@ -42,7 +42,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
return done.promise;
};
public static isTask = (taskArg: Task<any>): boolean => {
public static isTask = (taskArg: Task<any, any, any>): boolean => {
if (taskArg instanceof Task && typeof taskArg.taskFunction === 'function') {
return true;
} else {
@@ -51,8 +51,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
};
public static isTaskTouched<T = undefined, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T, TSteps>[],
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
touchedTasksArray: Task<T, TSteps, any>[],
): boolean {
const taskToCheck = Task.extractTask(taskArg);
let result = false;
@@ -65,25 +65,16 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
}
public static runTask = async <T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }> = []>(
taskArg: Task<T, TSteps> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps>[] },
taskArg: Task<T, TSteps, any> | TPreOrAfterTaskFunction,
optionsArg: { x?: any; touchedTasksArray?: Task<T, TSteps, any>[] },
) => {
const taskToRun = Task.extractTask(taskArg);
const done = plugins.smartpromise.defer();
// Wait for all blocking tasks to finish
for (const task of taskToRun.blockingTasks) {
await task.finished;
}
if (!taskToRun.setupValue && taskToRun.taskSetup) {
taskToRun.setupValue = await taskToRun.taskSetup();
}
if (taskToRun.execDelay) {
await plugins.smartdelay.delayFor(taskToRun.execDelay);
}
taskToRun.running = true;
taskToRun.runCount++;
taskToRun.lastRun = new Date();
@@ -100,26 +91,10 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
// Complete all steps when task finishes
taskToRun.completeAllSteps();
taskToRun.emitEvent(taskToRun.lastError ? 'failed' : 'completed');
// When the task has finished running, resolve the finished promise
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
})
.catch((err) => {
taskToRun.running = false;
taskToRun.emitEvent('failed', { error: err instanceof Error ? err.message : String(err) });
// Resolve finished so blocking dependants don't hang
taskToRun.resolveFinished();
// Create a new finished promise for the next run
taskToRun.finished = new Promise((resolve) => {
taskToRun.resolveFinished = resolve;
});
});
const options = {
@@ -127,7 +102,7 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
...optionsArg,
};
const x = options.x;
const touchedTasksArray: Task<T, TSteps>[] = options.touchedTasksArray;
const touchedTasksArray: Task<T, TSteps, any>[] = options.touchedTasksArray;
touchedTasksArray.push(taskToRun);
@@ -198,18 +173,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
public cronJob: plugins.smarttime.CronJob;
public bufferMax: number;
public execDelay: number;
public timeout: number;
public preTask: Task<T, any> | TPreOrAfterTaskFunction;
public afterTask: Task<T, any> | TPreOrAfterTaskFunction;
public data: TData;
// Add a list to store the blocking tasks
public blockingTasks: Task[] = [];
// Add a promise that will resolve when the task has finished
private finished: Promise<void>;
private resolveFinished: () => void;
public preTask: Task<T, any, any> | TPreOrAfterTaskFunction;
public afterTask: Task<T, any, any> | TPreOrAfterTaskFunction;
public running: boolean = false;
public bufferRunner = new BufferRunner(this);
@@ -275,12 +244,12 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
constructor(optionsArg: {
taskFunction: ITaskFunction<T>;
preTask?: Task<T, any> | TPreOrAfterTaskFunction;
afterTask?: Task<T, any> | TPreOrAfterTaskFunction;
preTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
afterTask?: Task<T, any, any> | TPreOrAfterTaskFunction;
buffered?: boolean;
bufferMax?: number;
execDelay?: number;
name?: string;
data?: TData;
taskSetup?: ITaskSetupFunction<T>;
steps?: TSteps;
catchErrors?: boolean;
@@ -291,8 +260,8 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.afterTask = optionsArg.afterTask;
this.buffered = optionsArg.buffered;
this.bufferMax = optionsArg.bufferMax;
this.execDelay = optionsArg.execDelay;
this.name = optionsArg.name;
this.data = optionsArg.data ?? ({} as TData);
this.taskSetup = optionsArg.taskSetup;
this.catchErrors = optionsArg.catchErrors ?? false;
this.labels = optionsArg.labels ? { ...optionsArg.labels } : {};
@@ -309,11 +278,6 @@ export class Task<T = undefined, TSteps extends ReadonlyArray<{ name: string; de
this.steps.set(stepConfig.name, step);
}
}
// Create the finished promise
this.finished = new Promise((resolve) => {
this.resolveFinished = resolve;
});
}
public trigger(x?: any): Promise<any> {

View File

@@ -0,0 +1,164 @@
import type { Task } from './taskbuffer.classes.task.js';
import type { ITaskConstraintGroupOptions, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<string, unknown>> {
public name: string;
public maxConcurrent: number;
public cooldownMs: number;
public rateLimit: IRateLimitConfig | null;
public resultSharingMode: TResultSharingMode;
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
private runningCounts = new Map<string, number>();
private lastCompletionTimes = new Map<string, number>();
private completionTimestamps = new Map<string, number[]>();
private lastResults = new Map<string, { result: any; timestamp: number }>();
constructor(options: ITaskConstraintGroupOptions<TData>) {
this.name = options.name;
this.constraintKeyForExecution = options.constraintKeyForExecution;
this.maxConcurrent = options.maxConcurrent ?? Infinity;
this.cooldownMs = options.cooldownMs ?? 0;
this.shouldExecuteFn = options.shouldExecute;
this.rateLimit = options.rateLimit ?? null;
this.resultSharingMode = options.resultSharingMode ?? 'none';
}
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
const key = this.constraintKeyForExecution(task, input);
return key ?? null;
}
public async checkShouldExecute(task: Task<any, any, TData>, input?: any): Promise<boolean> {
if (!this.shouldExecuteFn) {
return true;
}
return this.shouldExecuteFn(task, input);
}
public canRun(subGroupKey: string): boolean {
const running = this.runningCounts.get(subGroupKey) ?? 0;
if (running >= this.maxConcurrent) {
return false;
}
if (this.cooldownMs > 0) {
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
if (lastCompletion !== undefined) {
const elapsed = Date.now() - lastCompletion;
if (elapsed < this.cooldownMs) {
return false;
}
}
}
if (this.rateLimit) {
this.pruneCompletionTimestamps(subGroupKey);
const timestamps = this.completionTimestamps.get(subGroupKey);
const completedInWindow = timestamps ? timestamps.length : 0;
const running = this.runningCounts.get(subGroupKey) ?? 0;
if (completedInWindow + running >= this.rateLimit.maxPerWindow) {
return false;
}
}
return true;
}
public acquireSlot(subGroupKey: string): void {
const current = this.runningCounts.get(subGroupKey) ?? 0;
this.runningCounts.set(subGroupKey, current + 1);
}
public releaseSlot(subGroupKey: string): void {
const current = this.runningCounts.get(subGroupKey) ?? 0;
const next = Math.max(0, current - 1);
if (next === 0) {
this.runningCounts.delete(subGroupKey);
} else {
this.runningCounts.set(subGroupKey, next);
}
this.lastCompletionTimes.set(subGroupKey, Date.now());
if (this.rateLimit) {
const timestamps = this.completionTimestamps.get(subGroupKey) ?? [];
timestamps.push(Date.now());
this.completionTimestamps.set(subGroupKey, timestamps);
}
}
public getCooldownRemaining(subGroupKey: string): number {
if (this.cooldownMs <= 0) {
return 0;
}
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
if (lastCompletion === undefined) {
return 0;
}
const elapsed = Date.now() - lastCompletion;
return Math.max(0, this.cooldownMs - elapsed);
}
public getRunningCount(subGroupKey: string): number {
return this.runningCounts.get(subGroupKey) ?? 0;
}
// Rate limit helpers
private pruneCompletionTimestamps(subGroupKey: string): void {
const timestamps = this.completionTimestamps.get(subGroupKey);
if (!timestamps || !this.rateLimit) return;
const cutoff = Date.now() - this.rateLimit.windowMs;
let i = 0;
while (i < timestamps.length && timestamps[i] <= cutoff) {
i++;
}
if (i > 0) {
timestamps.splice(0, i);
}
}
public getRateLimitDelay(subGroupKey: string): number {
if (!this.rateLimit) return 0;
this.pruneCompletionTimestamps(subGroupKey);
const timestamps = this.completionTimestamps.get(subGroupKey);
const completedInWindow = timestamps ? timestamps.length : 0;
const running = this.runningCounts.get(subGroupKey) ?? 0;
if (completedInWindow + running < this.rateLimit.maxPerWindow) {
return 0;
}
// If only running tasks fill the window (no completions yet), we can't compute a delay
if (!timestamps || timestamps.length === 0) {
return 1; // minimal delay; drain will re-check after running tasks complete
}
// The oldest timestamp in the window determines when a slot opens
const oldestInWindow = timestamps[0];
const expiry = oldestInWindow + this.rateLimit.windowMs;
return Math.max(0, expiry - Date.now());
}
public getNextAvailableDelay(subGroupKey: string): number {
return Math.max(this.getCooldownRemaining(subGroupKey), this.getRateLimitDelay(subGroupKey));
}
// Result sharing helpers
public recordResult(subGroupKey: string, result: any): void {
if (this.resultSharingMode === 'none') return;
this.lastResults.set(subGroupKey, { result, timestamp: Date.now() });
}
public getLastResult(subGroupKey: string): { result: any; timestamp: number } | undefined {
return this.lastResults.get(subGroupKey);
}
public hasResultSharing(): boolean {
return this.resultSharingMode !== 'none';
}
public reset(): void {
this.runningCounts.clear();
this.lastCompletionTimes.clear();
this.completionTimestamps.clear();
this.lastResults.clear();
}
}

View File

@@ -1,10 +1,11 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
import {
AbstractDistributedCoordinator,
type IDistributedTaskRequestResult,
} from './taskbuffer.classes.distributedcoordinator.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
export interface ICronJob {
@@ -19,23 +20,28 @@ export interface ITaskManagerConstructorOptions {
export class TaskManager {
public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
public taskMap = new plugins.lik.ObjectMap<Task<any, any, any>>();
public readonly taskSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
private taskSubscriptions = new Map<Task<any, any>, plugins.smartrx.rxjs.Subscription>();
private taskSubscriptions = new Map<Task<any, any, any>, plugins.smartrx.rxjs.Subscription>();
private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null,
};
// Constraint system
public constraintGroups: TaskConstraintGroup<any>[] = [];
private constraintQueue: IConstrainedTaskEntry[] = [];
private drainTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, options);
}
public getTaskByName(taskName: string): Task<any, any> {
public getTaskByName(taskName: string): Task<any, any, any> {
return this.taskMap.findSync((task) => task.name === taskName);
}
public addTask(task: Task<any, any>): void {
public addTask(task: Task<any, any, any>): void {
if (!task.name) {
throw new Error('Task must have a name to be added to taskManager');
}
@@ -46,7 +52,7 @@ export class TaskManager {
this.taskSubscriptions.set(task, subscription);
}
public removeTask(task: Task<any, any>): void {
public removeTask(task: Task<any, any, any>): void {
this.taskMap.remove(task);
const subscription = this.taskSubscriptions.get(task);
if (subscription) {
@@ -55,21 +61,189 @@ export class TaskManager {
}
}
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
public addAndScheduleTask(task: Task<any, any, any>, cronString: string) {
this.addTask(task);
this.scheduleTaskByName(task.name, cronString);
}
// Constraint group management
public addConstraintGroup(group: TaskConstraintGroup<any>): void {
this.constraintGroups.push(group);
}
public removeConstraintGroup(name: string): void {
this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name);
}
// Core constraint evaluation
public async triggerTaskConstrained(task: Task<any, any, any>, input?: any): Promise<any> {
// Gather applicable constraints
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(task, input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply → check shouldExecute then trigger directly
if (applicableGroups.length === 0) {
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
return undefined;
}
return task.trigger(input);
}
// Check if all constraints allow running
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
return this.executeWithConstraintTracking(task, input, applicableGroups);
}
// Blocked → enqueue with deferred promise and cached constraint keys
const deferred = plugins.smartpromise.defer<any>();
const constraintKeys = new Map<string, string>();
for (const { group, key } of applicableGroups) {
constraintKeys.set(group.name, key);
}
this.constraintQueue.push({ task, input, deferred, constraintKeys });
return deferred.promise;
}
private async checkAllShouldExecute(task: Task<any, any, any>, input?: any): Promise<boolean> {
for (const group of this.constraintGroups) {
const shouldRun = await group.checkShouldExecute(task, input);
if (!shouldRun) {
return false;
}
}
return true;
}
private async executeWithConstraintTracking(
task: Task<any, any, any>,
input: any,
groups: Array<{ group: TaskConstraintGroup<any>; key: string }>,
): Promise<any> {
// Acquire slots synchronously to prevent race conditions
for (const { group, key } of groups) {
group.acquireSlot(key);
}
// Check shouldExecute after acquiring slots
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
// Release slots and drain queue
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
return undefined;
}
try {
const result = await task.trigger(input);
// Record result for groups with result sharing (only on true success, not caught errors)
if (!task.lastError) {
for (const { group, key } of groups) {
group.recordResult(key, result);
}
}
return result;
} finally {
// Release slots
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
}
}
private drainConstraintQueue(): void {
let shortestCooldown = Infinity;
const stillQueued: IConstrainedTaskEntry[] = [];
for (const entry of this.constraintQueue) {
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(entry.task, entry.input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply anymore (group removed?) → check shouldExecute then run
if (applicableGroups.length === 0) {
this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => {
if (!shouldRun) {
entry.deferred.resolve(undefined);
return;
}
entry.task.trigger(entry.input).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
});
continue;
}
// Check result sharing — if any applicable group has a shared result, resolve immediately
const sharingGroups = applicableGroups.filter(({ group }) => group.hasResultSharing());
if (sharingGroups.length > 0) {
const groupWithResult = sharingGroups.find(({ group, key }) =>
group.getLastResult(key) !== undefined
);
if (groupWithResult) {
entry.deferred.resolve(groupWithResult.group.getLastResult(groupWithResult.key)!.result);
continue;
}
}
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
// executeWithConstraintTracking handles shouldExecute check internally
this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
} else {
stillQueued.push(entry);
// Track shortest delay for timer scheduling (cooldown + rate limit)
for (const { group, key } of applicableGroups) {
const remaining = group.getNextAvailableDelay(key);
if (remaining > 0 && remaining < shortestCooldown) {
shortestCooldown = remaining;
}
}
}
}
this.constraintQueue = stillQueued;
// Schedule next drain if there are cooldown-blocked entries
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
if (stillQueued.length > 0 && shortestCooldown < Infinity) {
this.drainTimer = setTimeout(() => {
this.drainTimer = null;
this.drainConstraintQueue();
}, shortestCooldown + 1);
}
}
public async triggerTaskByName(taskName: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskName);
if (!taskToTrigger) {
throw new Error(`No task with the name ${taskName} found.`);
}
return taskToTrigger.trigger();
return this.triggerTaskConstrained(taskToTrigger);
}
public async triggerTask(task: Task<any, any>) {
return task.trigger();
public async triggerTask(task: Task<any, any, any>) {
return this.triggerTaskConstrained(task);
}
public scheduleTaskByName(taskName: string, cronString: string) {
@@ -80,7 +254,7 @@ export class TaskManager {
this.handleTaskScheduling(taskToSchedule, cronString);
}
private handleTaskScheduling(task: Task<any, any>, cronString: string) {
private handleTaskScheduling(task: Task<any, any, any>, cronString: string) {
const cronJob = this.cronJobManager.addCronjob(
cronString,
async (triggerTime: number) => {
@@ -98,7 +272,7 @@ export class TaskManager {
}
}
try {
await task.trigger();
await this.triggerTaskConstrained(task);
} catch (err) {
logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
@@ -107,7 +281,7 @@ export class TaskManager {
task.cronJob = cronJob;
}
private logTaskState(task: Task<any, any>) {
private logTaskState(task: Task<any, any, any>) {
logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
@@ -116,7 +290,7 @@ export class TaskManager {
}
private async performDistributedConsultation(
task: Task<any, any>,
task: Task<any, any, any>,
triggerTime: number,
): Promise<IDistributedTaskRequestResult> {
logger.log('info', 'Found a distributed coordinator, performing consultation.');
@@ -144,7 +318,7 @@ export class TaskManager {
}
}
public async descheduleTask(task: Task<any, any>) {
public async descheduleTask(task: Task<any, any, any>) {
await this.descheduleTaskByName(task.name);
}
@@ -169,6 +343,10 @@ export class TaskManager {
subscription.unsubscribe();
}
this.taskSubscriptions.clear();
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
}
// Get metadata for a specific task
@@ -186,7 +364,7 @@ export class TaskManager {
// Get scheduled tasks with their schedules and next run times
public getScheduledTasks(): IScheduledTaskInfo[] {
const scheduledTasks: IScheduledTaskInfo[] = [];
for (const task of this.taskMap.getArray()) {
if (task.cronJob) {
scheduledTasks.push({
@@ -199,7 +377,7 @@ export class TaskManager {
});
}
}
return scheduledTasks;
}
@@ -213,11 +391,11 @@ export class TaskManager {
}))
.sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime())
.slice(0, limit);
return scheduledRuns;
}
public getTasksByLabel(key: string, value: string): Task<any, any>[] {
public getTasksByLabel(key: string, value: string): Task<any, any, any>[] {
return this.taskMap.getArray().filter(task => task.labels[key] === value);
}
@@ -227,7 +405,7 @@ export class TaskManager {
// Add, execute, and remove a task while collecting metadata
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
task: Task<T, TSteps>,
task: Task<T, TSteps, any>,
options?: {
schedule?: string;
trackProgress?: boolean;
@@ -235,19 +413,18 @@ export class TaskManager {
): Promise<ITaskExecutionReport> {
// Add task to manager
this.addTask(task);
// Optionally schedule it
if (options?.schedule) {
this.scheduleTaskByName(task.name!, options.schedule);
}
const startTime = Date.now();
const progressUpdates: Array<{ stepName: string; timestamp: number }> = [];
try {
// Execute the task
const result = await task.trigger();
// Execute the task through constraints
const result = await this.triggerTaskConstrained(task);
// Collect execution report
const report: ITaskExecutionReport = {
taskName: task.name || 'unnamed',
@@ -261,15 +438,15 @@ export class TaskManager {
progress: task.getProgress(),
result,
};
// Remove task from manager
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
return report;
} catch (error) {
// Create error report
@@ -285,15 +462,15 @@ export class TaskManager {
progress: task.getProgress(),
error: error as Error,
};
// Remove task from manager even on error
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
throw errorReport;
}
}

View File

@@ -1,69 +0,0 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { logger } from './taskbuffer.logging.js';
export class TaskRunner {
public maxParallelJobs: number = 1;
public status: 'stopped' | 'running' = 'stopped';
public runningTasks: plugins.lik.ObjectMap<Task> =
new plugins.lik.ObjectMap<Task>();
public queuedTasks: Task[] = [];
constructor() {
this.runningTasks.eventSubject.subscribe(async (eventArg) => {
this.checkExecution();
});
}
/**
* adds a task to the queue
*/
public addTask(taskArg: Task) {
this.queuedTasks.push(taskArg);
this.checkExecution();
}
/**
* set amount of parallel tasks
* be careful, you might lose dependability of tasks
*/
public setMaxParallelJobs(maxParallelJobsArg: number) {
this.maxParallelJobs = maxParallelJobsArg;
}
/**
* starts the task queue
*/
public async start() {
this.status = 'running';
}
/**
* checks whether execution is on point
*/
public async checkExecution() {
if (
this.runningTasks.getArray().length < this.maxParallelJobs &&
this.status === 'running' &&
this.queuedTasks.length > 0
) {
const nextJob = this.queuedTasks.shift();
this.runningTasks.add(nextJob);
try {
await nextJob.trigger();
} catch (err) {
logger.log('error', `TaskRunner: task "${nextJob.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
this.runningTasks.remove(nextJob);
this.checkExecution();
}
}
/**
* stops the task queue
*/
public async stop() {
this.status = 'stopped';
}
}

View File

@@ -1,4 +1,34 @@
import type { ITaskStep } from './taskbuffer.classes.taskstep.js';
import type { Task } from './taskbuffer.classes.task.js';
export interface IRateLimitConfig {
maxPerWindow: number; // max completions allowed within the sliding window
windowMs: number; // sliding window duration in ms
}
export type TResultSharingMode = 'none' | 'share-latest';
export interface ITaskConstraintGroupOptions<TData extends Record<string, unknown> = Record<string, unknown>> {
name: string;
constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
maxConcurrent?: number; // default: Infinity
cooldownMs?: number; // default: 0
shouldExecute?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
rateLimit?: IRateLimitConfig;
resultSharingMode?: TResultSharingMode; // default: 'none'
}
export interface ITaskExecution<TData extends Record<string, unknown> = Record<string, unknown>> {
task: Task<any, any, TData>;
input: any;
}
export interface IConstrainedTaskEntry {
task: Task<any, any, any>;
input: any;
deferred: import('@push.rocks/smartpromise').Deferred<any>;
constraintKeys: Map<string, string>; // groupName -> key
}
export interface ITaskMetadata {
name: string;
@@ -49,4 +79,114 @@ export interface ITaskEvent {
timestamp: number;
stepName?: string; // present when type === 'step'
error?: string; // present when type === 'failed'
}
// ── Service Lifecycle Types ──────────────────────────────────────
export type TServiceState =
| 'stopped'
| 'starting'
| 'running'
| 'degraded'
| 'failed'
| 'stopping';
export type TServiceCriticality = 'critical' | 'optional';
export type TServiceEventType =
| 'started'
| 'stopped'
| 'failed'
| 'degraded'
| 'recovered'
| 'retrying'
| 'healthCheck'
| 'autoRestarting';
export interface IServiceEvent {
type: TServiceEventType;
serviceName: string;
state: TServiceState;
timestamp: number;
error?: string;
attempt?: number;
}
export interface IServiceStatus {
name: string;
state: TServiceState;
criticality: TServiceCriticality;
startedAt?: number;
stoppedAt?: number;
lastHealthCheck?: number;
healthCheckOk?: boolean;
uptime?: number;
errorCount: number;
lastError?: string;
retryCount: number;
dependencies: string[];
labels?: Record<string, string>;
hasInstance?: boolean;
}
export interface IRetryConfig {
/** Maximum retry attempts. 0 = no retries. Default: 3 */
maxRetries?: number;
/** Base delay in ms. Default: 1000 */
baseDelayMs?: number;
/** Maximum delay cap in ms. Default: 30000 */
maxDelayMs?: number;
/** Multiplier per attempt. Default: 2 */
backoffFactor?: number;
}
export interface IHealthCheckConfig {
/** Interval in ms between health checks. Default: 30000 */
intervalMs?: number;
/** Timeout for a single health check call. Default: 5000 */
timeoutMs?: number;
/** Consecutive failures before marking degraded. Default: 3 */
failuresBeforeDegraded?: number;
/** Consecutive failures before marking failed. Default: 5 */
failuresBeforeFailed?: number;
/** Auto-restart the service when it transitions to failed. Default: false */
autoRestart?: boolean;
/** Maximum number of auto-restart attempts. 0 = unlimited. Default: 3 */
maxAutoRestarts?: number;
/** Base delay in ms before first auto-restart. Default: 5000 */
autoRestartDelayMs?: number;
/** Backoff multiplier per auto-restart attempt. Default: 2 */
autoRestartBackoffFactor?: number;
}
export interface IServiceOptions<T = any> {
name: string;
start: () => Promise<T>;
stop: (instance: T) => Promise<void>;
healthCheck?: (instance: T) => Promise<boolean>;
criticality?: TServiceCriticality;
dependencies?: string[];
retry?: IRetryConfig;
healthCheckConfig?: IHealthCheckConfig;
startupTimeoutMs?: number;
labels?: Record<string, string>;
}
export interface IServiceManagerOptions {
name?: string;
defaultRetry?: IRetryConfig;
defaultHealthCheck?: IHealthCheckConfig;
/** Timeout in ms for the entire startup sequence. Default: 120000 */
startupTimeoutMs?: number;
/** Timeout in ms for the entire shutdown sequence. Default: 30000 */
shutdownTimeoutMs?: number;
}
export type TOverallHealth = 'healthy' | 'degraded' | 'unhealthy';
export interface IServiceManagerHealth {
overall: TOverallHealth;
services: IServiceStatus[];
startedAt?: number;
uptime?: number;
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '4.1.1',
version: '8.0.2',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -8,20 +8,20 @@ import type { TaskManager, ITaskMetadata, IScheduledTaskInfo } from '../ts/index
export class TaskbufferDashboard extends DeesElement {
// Properties
@property({ type: Object })
public taskManager: TaskManager | null = null;
accessor taskManager: TaskManager | null = null;
@property({ type: Number })
public refreshInterval: number = 1000; // milliseconds
accessor refreshInterval: number = 1000; // milliseconds
// Internal state
@state()
private tasks: ITaskMetadata[] = [];
accessor tasks: ITaskMetadata[] = [];
@state()
private scheduledTasks: IScheduledTaskInfo[] = [];
accessor scheduledTasks: IScheduledTaskInfo[] = [];
@state()
private isRunning: boolean = false;
accessor isRunning: boolean = false;
private refreshTimer: any;