From 0811b04dfd061b92f6897de609125952f3fb12fc Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 15 Feb 2026 15:15:37 +0000 Subject: [PATCH] BREAKING CHANGE(constraints): make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook --- changelog.md | 8 + readme.hints.md | 8 +- readme.md | 120 +++++++++-- test/test.13.constraints.ts | 214 +++++++++++++++++-- ts/00_commitinfo_data.ts | 2 +- ts/index.ts | 2 +- ts/taskbuffer.classes.taskconstraintgroup.ts | 17 +- ts/taskbuffer.classes.taskmanager.ts | 58 ++++- ts/taskbuffer.interfaces.ts | 9 +- ts_web/00_commitinfo_data.ts | 2 +- 10 files changed, 389 insertions(+), 51 deletions(-) diff --git a/changelog.md b/changelog.md index 4250554..7947b5e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-02-15 - 6.0.0 - BREAKING CHANGE(constraints) +make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook + +- Rename ITaskConstraintGroupOptions.constraintKeyForTask -> constraintKeyForExecution(task, input?) and update TaskConstraintGroup.getConstraintKey signature +- Add optional shouldExecute(task, input?) hook; TaskManager checks shouldExecute before immediate runs, after acquiring slots, and when draining the constraint queue (queued tasks are skipped when shouldExecute returns false) +- Export ITaskExecution type and store constraintKeys on queued entries (IConstrainedTaskEntry.constraintKeys) +- Documentation and tests updated to demonstrate input-aware constraint keys and shouldExecute pruning + ## 2026-02-15 - 5.0.1 - fix(tests) add and tighten constraint-related tests covering return values, error propagation, concurrency, cooldown timing, and constraint removal diff --git a/readme.hints.md b/readme.hints.md index acbbd93..e10886d 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -12,11 +12,13 @@ - Typed data bag accessible as `task.data` ### TaskConstraintGroup -- `new TaskConstraintGroup({ name, constraintKeyForTask, maxConcurrent?, cooldownMs? })` -- `constraintKeyForTask(task)` returns a string key (constraint applies) or `null` (skip) +- `new TaskConstraintGroup({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute? })` +- `constraintKeyForExecution(task, input?)` returns a string key (constraint applies) or `null` (skip). Receives both task and runtime input. +- `shouldExecute(task, input?)` — optional pre-execution check. Returns `false` to skip (deferred resolves `undefined`). Can be async. - `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key - `cooldownMs` (default: `0`) — minimum ms gap between completions per key -- Methods: `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRunningCount(key)`, `reset()` +- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRunningCount(key)`, `reset()` +- `ITaskExecution` type exported from index — `{ task, input }` tuple ### TaskManager Constraint Integration - `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)` diff --git a/readme.md b/readme.md index a864e1e..99577bd 100644 --- a/readme.md +++ b/readme.md @@ -120,7 +120,7 @@ const manager = new TaskManager(); const domainMutex = new TaskConstraintGroup<{ domain: string }>({ name: 'domain-mutex', maxConcurrent: 1, - constraintKeyForTask: (task) => task.data.domain, + constraintKeyForExecution: (task, input?) => task.data.domain, }); manager.addConstraintGroup(domainMutex); @@ -156,7 +156,7 @@ Cap how many tasks can run concurrently across a group: const dnsLimit = new TaskConstraintGroup<{ group: string }>({ name: 'dns-concurrency', maxConcurrent: 3, - constraintKeyForTask: (task) => + constraintKeyForExecution: (task) => task.data.group === 'dns' ? 'dns' : null, // null = skip constraint }); @@ -173,7 +173,7 @@ const rateLimiter = new TaskConstraintGroup<{ domain: string }>({ name: 'api-rate-limit', maxConcurrent: 1, cooldownMs: 11000, - constraintKeyForTask: (task) => task.data.domain, + constraintKeyForExecution: (task) => task.data.domain, }); manager.addConstraintGroup(rateLimiter); @@ -187,7 +187,7 @@ Limit total concurrent tasks system-wide: const globalCap = new TaskConstraintGroup({ name: 'global-cap', maxConcurrent: 10, - constraintKeyForTask: () => 'all', // same key = shared limit + constraintKeyForExecution: () => 'all', // same key = shared limit }); manager.addConstraintGroup(globalCap); @@ -208,26 +208,119 @@ await manager.triggerTask(dnsTask); ### Selective Constraints -Return `null` from `constraintKeyForTask` to exempt a task from a constraint group: +Return `null` from `constraintKeyForExecution` to exempt a task from a constraint group: ```typescript const constraint = new TaskConstraintGroup<{ priority: string }>({ name: 'low-priority-limit', maxConcurrent: 2, - constraintKeyForTask: (task) => + constraintKeyForExecution: (task) => task.data.priority === 'low' ? 'low-priority' : null, // high priority tasks skip this constraint }); ``` +### Input-Aware Constraints 🎯 + +The `constraintKeyForExecution` function receives both the **task** and the **runtime input** passed to `trigger(input)`. This means the same task triggered with different inputs can be constrained independently: + +```typescript +const extractTLD = (domain: string) => { + const parts = domain.split('.'); + return parts.slice(-2).join('.'); +}; + +// Same TLD → serialized. Different TLDs → parallel. +const tldMutex = new TaskConstraintGroup({ + name: 'tld-mutex', + maxConcurrent: 1, + constraintKeyForExecution: (task, input?: string) => { + if (!input) return null; + return extractTLD(input); // "example.com", "other.org", etc. + }, +}); + +manager.addConstraintGroup(tldMutex); + +// These two serialize (same TLD "example.com") +const p1 = manager.triggerTaskConstrained(getCert, 'app.example.com'); +const p2 = manager.triggerTaskConstrained(getCert, 'api.example.com'); + +// This runs in parallel (different TLD "other.org") +const p3 = manager.triggerTaskConstrained(getCert, 'my.other.org'); +``` + +You can also combine `task.data` and `input` for composite keys: + +```typescript +const providerDomain = new TaskConstraintGroup<{ provider: string }>({ + name: 'provider-domain', + maxConcurrent: 1, + constraintKeyForExecution: (task, input?: string) => { + return `${task.data.provider}:${input || 'default'}`; + }, +}); +``` + +### Pre-Execution Check with `shouldExecute` ✅ + +The `shouldExecute` callback runs right before a queued task executes. If it returns `false`, the task is skipped and its promise resolves with `undefined`. This is perfect for scenarios where a prior execution's outcome makes subsequent queued tasks unnecessary: + +```typescript +const certCache = new Map(); + +const certConstraint = new TaskConstraintGroup({ + name: 'cert-mutex', + maxConcurrent: 1, + constraintKeyForExecution: (task, input?: string) => { + if (!input) return null; + return extractTLD(input); + }, + shouldExecute: (task, input?: string) => { + if (!input) return true; + // Skip if a wildcard cert already covers this TLD + return certCache.get(extractTLD(input)) !== 'wildcard'; + }, +}); + +const getCert = new Task({ + name: 'get-certificate', + taskFunction: async (domain: string) => { + const cert = await acme.getCert(domain); + if (cert.isWildcard) certCache.set(extractTLD(domain), 'wildcard'); + return cert; + }, +}); + +manager.addConstraintGroup(certConstraint); +manager.addTask(getCert); + +const r1 = manager.triggerTaskConstrained(getCert, 'app.example.com'); // runs, gets wildcard +const r2 = manager.triggerTaskConstrained(getCert, 'api.example.com'); // queued → skipped! +const r3 = manager.triggerTaskConstrained(getCert, 'my.other.org'); // parallel (different TLD) + +const [cert1, cert2, cert3] = await Promise.all([r1, r2, r3]); +// cert2 === undefined (skipped because wildcard already covers example.com) +``` + +**`shouldExecute` semantics:** + +- Runs right before execution (after slot acquisition, before `trigger()`) +- Also checked on immediate (non-queued) triggers +- Returns `false` → skip execution, deferred resolves with `undefined` +- Can be async (return `Promise`) +- Has closure access to external state modified by prior executions +- If multiple constraint groups have `shouldExecute`, **all** must return `true` + ### How It Works When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager: -1. Evaluates all registered constraint groups against the task -2. If no constraints apply (all matchers return `null`) → runs immediately -3. If all applicable constraints have capacity → acquires slots and runs +1. Evaluates all registered constraint groups against the task and input +2. If no constraints apply (all matchers return `null`) → checks `shouldExecute` → runs or skips +3. If all applicable constraints have capacity → acquires slots → checks `shouldExecute` → runs or skips 4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained 5. Cooldown-blocked tasks auto-retry after the shortest remaining cooldown expires +6. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned ## 🎯 Core Concepts @@ -732,7 +825,7 @@ const manager = new TaskManager(); const tenantLimit = new TaskConstraintGroup<{ tenantId: string }>({ name: 'tenant-concurrency', maxConcurrent: 2, - constraintKeyForTask: (task) => task.data.tenantId, + constraintKeyForExecution: (task, input?) => task.data.tenantId, }); manager.addConstraintGroup(tenantLimit); @@ -829,15 +922,17 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | Option | Type | Default | Description | | --- | --- | --- | --- | | `name` | `string` | *required* | Constraint group identifier | -| `constraintKeyForTask` | `(task) => string \| null` | *required* | Returns key for grouping, or `null` to skip | +| `constraintKeyForExecution` | `(task, input?) => string \| null` | *required* | Returns key for grouping, or `null` to skip. Receives both the task and runtime input. | | `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key | | `cooldownMs` | `number` | `0` | Minimum ms between completions per key | +| `shouldExecute` | `(task, input?) => boolean \| Promise` | — | Pre-execution check. Return `false` to skip; deferred resolves `undefined`. | ### TaskConstraintGroup Methods | Method | Returns | Description | | --- | --- | --- | -| `getConstraintKey(task)` | `string \| null` | Get the constraint key for a task | +| `getConstraintKey(task, input?)` | `string \| null` | Get the constraint key for a task + input | +| `checkShouldExecute(task, input?)` | `Promise` | Run the `shouldExecute` callback (defaults to `true`) | | `canRun(key)` | `boolean` | Check if a slot is available | | `acquireSlot(key)` | `void` | Claim a running slot | | `releaseSlot(key)` | `void` | Release a slot and record completion time | @@ -884,6 +979,7 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); import type { ITaskMetadata, ITaskExecutionReport, + ITaskExecution, IScheduledTaskInfo, ITaskEvent, TTaskEventType, diff --git a/test/test.13.constraints.ts b/test/test.13.constraints.ts index c90c81d..ff4498f 100644 --- a/test/test.13.constraints.ts +++ b/test/test.13.constraints.ts @@ -54,7 +54,7 @@ tap.test('should enforce group concurrency limit', async () => { const constraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({ name: 'concurrency-test', maxConcurrent: 2, - constraintKeyForTask: (task) => + constraintKeyForExecution: (task) => task.data.group === 'workers' ? 'workers' : null, }); manager.addConstraintGroup(constraint); @@ -97,7 +97,7 @@ tap.test('should enforce key-based mutual exclusion', async () => { const constraint = new taskbuffer.TaskConstraintGroup<{ domain: string }>({ name: 'domain-mutex', maxConcurrent: 1, - constraintKeyForTask: (task) => task.data.domain, + constraintKeyForExecution: (task) => task.data.domain, }); manager.addConstraintGroup(constraint); @@ -149,7 +149,7 @@ tap.test('should enforce cooldown between task executions', async () => { name: 'cooldown-test', maxConcurrent: 1, cooldownMs: 300, - constraintKeyForTask: (task) => task.data.key, + constraintKeyForExecution: (task) => task.data.key, }); manager.addConstraintGroup(constraint); @@ -194,13 +194,13 @@ tap.test('should apply multiple constraint groups to one task', async () => { const globalConstraint = new taskbuffer.TaskConstraintGroup({ name: 'global', maxConcurrent: 3, - constraintKeyForTask: () => 'all', + constraintKeyForExecution: () => 'all', }); const groupConstraint = new taskbuffer.TaskConstraintGroup<{ group: string }>({ name: 'group', maxConcurrent: 1, - constraintKeyForTask: (task) => task.data.group, + constraintKeyForExecution: (task) => task.data.group, }); manager.addConstraintGroup(globalConstraint); @@ -242,7 +242,7 @@ tap.test('should run task unconstrained when matcher returns null', async () => const constraint = new taskbuffer.TaskConstraintGroup<{ skip: boolean }>({ name: 'selective', maxConcurrent: 1, - constraintKeyForTask: (task) => (task.data.skip ? null : 'constrained'), + constraintKeyForExecution: (task) => (task.data.skip ? null : 'constrained'), }); manager.addConstraintGroup(constraint); @@ -269,7 +269,7 @@ tap.test('should release slot and drain queue when task fails', async () => { const constraint = new taskbuffer.TaskConstraintGroup<{ key: string }>({ name: 'error-drain', maxConcurrent: 1, - constraintKeyForTask: (task) => task.data.key, + constraintKeyForExecution: (task) => task.data.key, }); manager.addConstraintGroup(constraint); @@ -313,7 +313,7 @@ tap.test('should route triggerTaskByName through constraints', async () => { const constraint = new taskbuffer.TaskConstraintGroup({ name: 'manager-integration', maxConcurrent: 1, - constraintKeyForTask: () => 'all', + constraintKeyForExecution: () => 'all', }); manager.addConstraintGroup(constraint); @@ -356,7 +356,7 @@ tap.test('should remove a constraint group by name', async () => { const constraint = new taskbuffer.TaskConstraintGroup({ name: 'removable', maxConcurrent: 1, - constraintKeyForTask: () => 'all', + constraintKeyForExecution: () => 'all', }); manager.addConstraintGroup(constraint); @@ -373,7 +373,7 @@ tap.test('should reset constraint group state', async () => { name: 'resettable', maxConcurrent: 2, cooldownMs: 1000, - constraintKeyForTask: () => 'key', + constraintKeyForExecution: () => 'key', }); // Simulate usage @@ -395,7 +395,7 @@ tap.test('should return correct result from queued tasks', async () => { const constraint = new taskbuffer.TaskConstraintGroup({ name: 'return-value-test', maxConcurrent: 1, - constraintKeyForTask: () => 'shared', + constraintKeyForExecution: () => 'shared', }); manager.addConstraintGroup(constraint); @@ -434,7 +434,7 @@ tap.test('should propagate errors from queued tasks (catchErrors: false)', async const constraint = new taskbuffer.TaskConstraintGroup({ name: 'error-propagation', maxConcurrent: 1, - constraintKeyForTask: () => 'shared', + constraintKeyForExecution: () => 'shared', }); manager.addConstraintGroup(constraint); @@ -484,7 +484,7 @@ tap.test('should route triggerTask() through constraints', async () => { const constraint = new taskbuffer.TaskConstraintGroup({ name: 'trigger-task-test', maxConcurrent: 1, - constraintKeyForTask: () => 'all', + constraintKeyForExecution: () => 'all', }); manager.addConstraintGroup(constraint); @@ -523,7 +523,7 @@ tap.test('should route addExecuteRemoveTask() through constraints', async () => const constraint = new taskbuffer.TaskConstraintGroup({ name: 'add-execute-remove-test', maxConcurrent: 1, - constraintKeyForTask: () => 'all', + constraintKeyForExecution: () => 'all', }); manager.addConstraintGroup(constraint); @@ -561,7 +561,7 @@ tap.test('should execute queued tasks in FIFO order', async () => { const constraint = new taskbuffer.TaskConstraintGroup({ name: 'fifo-test', maxConcurrent: 1, - constraintKeyForTask: () => 'shared', + constraintKeyForExecution: () => 'shared', }); manager.addConstraintGroup(constraint); @@ -603,7 +603,7 @@ tap.test('should enforce both concurrency and cooldown together', async () => { name: 'combined-test', maxConcurrent: 2, cooldownMs: 200, - constraintKeyForTask: () => 'shared', + constraintKeyForExecution: () => 'shared', }); manager.addConstraintGroup(constraint); @@ -645,7 +645,7 @@ tap.test('should unblock queued tasks when constraint group is removed', async ( const constraint = new taskbuffer.TaskConstraintGroup({ name: 'removable-constraint', maxConcurrent: 1, - constraintKeyForTask: () => 'shared', + constraintKeyForExecution: () => 'shared', }); manager.addConstraintGroup(constraint); @@ -690,4 +690,184 @@ tap.test('should unblock queued tasks when constraint group is removed', async ( await manager.stop(); }); +// Test 20: Intra-task concurrency by input — same task, different inputs, key extracts TLD +tap.test('should serialize same-TLD inputs and parallelize different-TLD inputs', async () => { + const manager = new taskbuffer.TaskManager(); + const log: string[] = []; + + const extractTLD = (domain: string) => { + const parts = domain.split('.'); + return parts.slice(-2).join('.'); + }; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'tld-mutex', + maxConcurrent: 1, + constraintKeyForExecution: (_task, input?: string) => { + if (!input) return null; + return extractTLD(input); + }, + }); + manager.addConstraintGroup(constraint); + + const getCert = new taskbuffer.Task({ + name: 'get-cert', + taskFunction: async (domain: string) => { + log.push(`${domain}-start`); + await smartdelay.delayFor(100); + log.push(`${domain}-end`); + }, + }); + manager.addTask(getCert); + + await Promise.all([ + manager.triggerTaskConstrained(getCert, 'a.example.com'), + manager.triggerTaskConstrained(getCert, 'b.example.com'), + manager.triggerTaskConstrained(getCert, 'c.other.org'), + ]); + + // a.example.com and b.example.com share TLD "example.com" → serialized + const aEndIdx = log.indexOf('a.example.com-end'); + const bStartIdx = log.indexOf('b.example.com-start'); + expect(bStartIdx).toBeGreaterThanOrEqual(aEndIdx); + + // c.other.org has different TLD → runs in parallel with a.example.com + const aStartIdx = log.indexOf('a.example.com-start'); + const cStartIdx = log.indexOf('c.other.org-start'); + expect(cStartIdx).toBeLessThan(aEndIdx); + + await manager.stop(); +}); + +// Test 21: shouldExecute skips queued task based on external state +tap.test('should skip queued task when shouldExecute returns false', async () => { + const manager = new taskbuffer.TaskManager(); + const execLog: string[] = []; + const certCache = new Map(); + + const extractTLD = (domain: string) => { + const parts = domain.split('.'); + return parts.slice(-2).join('.'); + }; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'cert-mutex', + maxConcurrent: 1, + constraintKeyForExecution: (_task, input?: string) => { + if (!input) return null; + return extractTLD(input); + }, + shouldExecute: (_task, input?: string) => { + if (!input) return true; + return certCache.get(extractTLD(input)) !== 'wildcard'; + }, + }); + manager.addConstraintGroup(constraint); + + const getCert = new taskbuffer.Task({ + name: 'get-cert-skip', + taskFunction: async (domain: string) => { + execLog.push(domain); + // First execution sets wildcard in cache + certCache.set(extractTLD(domain), 'wildcard'); + await smartdelay.delayFor(100); + return `cert-for-${domain}`; + }, + }); + manager.addTask(getCert); + + const [r1, r2] = await Promise.all([ + manager.triggerTaskConstrained(getCert, 'app.example.com'), + manager.triggerTaskConstrained(getCert, 'api.example.com'), + ]); + + // First ran, second was skipped + expect(execLog).toEqual(['app.example.com']); + expect(r1).toEqual('cert-for-app.example.com'); + expect(r2).toEqual(undefined); + + await manager.stop(); +}); + +// Test 22: shouldExecute on immediate (non-queued) trigger +tap.test('should skip immediate trigger when shouldExecute returns false', async () => { + const manager = new taskbuffer.TaskManager(); + let executed = false; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'always-skip', + maxConcurrent: 10, + constraintKeyForExecution: () => 'all', + shouldExecute: () => false, + }); + manager.addConstraintGroup(constraint); + + const task = new taskbuffer.Task({ + name: 'skip-immediate', + taskFunction: async () => { + executed = true; + return 'should-not-see'; + }, + }); + manager.addTask(task); + + const result = await manager.triggerTaskConstrained(task); + expect(executed).toBeFalse(); + expect(result).toEqual(undefined); + + await manager.stop(); +}); + +// Test 23: Mixed task.data + input constraint key +tap.test('should use both task.data and input in constraint key', async () => { + const manager = new taskbuffer.TaskManager(); + let running = 0; + let maxRunning = 0; + + const constraint = new taskbuffer.TaskConstraintGroup<{ provider: string }>({ + name: 'provider-domain', + maxConcurrent: 1, + constraintKeyForExecution: (task, input?: string) => { + return `${task.data.provider}:${input || 'default'}`; + }, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (name: string, provider: string) => + new taskbuffer.Task({ + name, + data: { provider }, + taskFunction: async () => { + running++; + maxRunning = Math.max(maxRunning, running); + await smartdelay.delayFor(100); + running--; + }, + }); + + // Same provider + same domain input → should serialize + const t1 = makeTask('mixed-1', 'acme'); + const t2 = makeTask('mixed-2', 'acme'); + // Different provider + same domain → parallel + const t3 = makeTask('mixed-3', 'cloudflare'); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + await Promise.all([ + manager.triggerTaskConstrained(t1, 'example.com'), + manager.triggerTaskConstrained(t2, 'example.com'), + manager.triggerTaskConstrained(t3, 'example.com'), + ]); + + // t1 and t2 share key "acme:example.com" → serialized (max 1 at a time) + // t3 has key "cloudflare:example.com" → parallel with t1 + // So maxRunning should be exactly 2 (t1 + t3, or t3 + t2) + expect(maxRunning).toBeLessThanOrEqual(2); + expect(maxRunning).toBeGreaterThanOrEqual(2); + + await manager.stop(); +}); + export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 959a6f8..e009d9e 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '5.0.1', + version: '6.0.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' } diff --git a/ts/index.ts b/ts/index.ts index 26060ea..69c8300 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -12,7 +12,7 @@ export { TaskStep } from './taskbuffer.classes.taskstep.js'; export type { ITaskStep } from './taskbuffer.classes.taskstep.js'; // Metadata interfaces -export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js'; +export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution } from './taskbuffer.interfaces.js'; import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js'; export { distributedCoordination }; diff --git a/ts/taskbuffer.classes.taskconstraintgroup.ts b/ts/taskbuffer.classes.taskconstraintgroup.ts index 67bfc8e..651303f 100644 --- a/ts/taskbuffer.classes.taskconstraintgroup.ts +++ b/ts/taskbuffer.classes.taskconstraintgroup.ts @@ -5,23 +5,32 @@ export class TaskConstraintGroup = Record< public name: string; public maxConcurrent: number; public cooldownMs: number; - private constraintKeyForTask: (task: Task) => string | null | undefined; + private constraintKeyForExecution: (task: Task, input?: any) => string | null | undefined; + private shouldExecuteFn?: (task: Task, input?: any) => boolean | Promise; private runningCounts = new Map(); private lastCompletionTimes = new Map(); constructor(options: ITaskConstraintGroupOptions) { this.name = options.name; - this.constraintKeyForTask = options.constraintKeyForTask; + this.constraintKeyForExecution = options.constraintKeyForExecution; this.maxConcurrent = options.maxConcurrent ?? Infinity; this.cooldownMs = options.cooldownMs ?? 0; + this.shouldExecuteFn = options.shouldExecute; } - public getConstraintKey(task: Task): string | null { - const key = this.constraintKeyForTask(task); + public getConstraintKey(task: Task, input?: any): string | null { + const key = this.constraintKeyForExecution(task, input); return key ?? null; } + public async checkShouldExecute(task: Task, input?: any): Promise { + if (!this.shouldExecuteFn) { + return true; + } + return this.shouldExecuteFn(task, input); + } + public canRun(subGroupKey: string): boolean { const running = this.runningCounts.get(subGroupKey) ?? 0; if (running >= this.maxConcurrent) { diff --git a/ts/taskbuffer.classes.taskmanager.ts b/ts/taskbuffer.classes.taskmanager.ts index 8f0dd16..bb81448 100644 --- a/ts/taskbuffer.classes.taskmanager.ts +++ b/ts/taskbuffer.classes.taskmanager.ts @@ -80,14 +80,18 @@ export class TaskManager { // Gather applicable constraints const applicableGroups: Array<{ group: TaskConstraintGroup; key: string }> = []; for (const group of this.constraintGroups) { - const key = group.getConstraintKey(task); + const key = group.getConstraintKey(task, input); if (key !== null) { applicableGroups.push({ group, key }); } } - // No constraints apply → trigger directly + // No constraints apply → check shouldExecute then trigger directly if (applicableGroups.length === 0) { + const shouldRun = await this.checkAllShouldExecute(task, input); + if (!shouldRun) { + return undefined; + } return task.trigger(input); } @@ -97,22 +101,47 @@ export class TaskManager { return this.executeWithConstraintTracking(task, input, applicableGroups); } - // Blocked → enqueue with deferred promise + // Blocked → enqueue with deferred promise and cached constraint keys const deferred = plugins.smartpromise.defer(); - this.constraintQueue.push({ task, input, deferred }); + const constraintKeys = new Map(); + for (const { group, key } of applicableGroups) { + constraintKeys.set(group.name, key); + } + this.constraintQueue.push({ task, input, deferred, constraintKeys }); return deferred.promise; } + private async checkAllShouldExecute(task: Task, input?: any): Promise { + for (const group of this.constraintGroups) { + const shouldRun = await group.checkShouldExecute(task, input); + if (!shouldRun) { + return false; + } + } + return true; + } + private async executeWithConstraintTracking( task: Task, input: any, groups: Array<{ group: TaskConstraintGroup; key: string }>, ): Promise { - // Acquire slots + // Acquire slots synchronously to prevent race conditions for (const { group, key } of groups) { group.acquireSlot(key); } + // Check shouldExecute after acquiring slots + const shouldRun = await this.checkAllShouldExecute(task, input); + if (!shouldRun) { + // Release slots and drain queue + for (const { group, key } of groups) { + group.releaseSlot(key); + } + this.drainConstraintQueue(); + return undefined; + } + try { return await task.trigger(input); } finally { @@ -131,23 +160,30 @@ export class TaskManager { for (const entry of this.constraintQueue) { const applicableGroups: Array<{ group: TaskConstraintGroup; key: string }> = []; for (const group of this.constraintGroups) { - const key = group.getConstraintKey(entry.task); + const key = group.getConstraintKey(entry.task, entry.input); if (key !== null) { applicableGroups.push({ group, key }); } } - // No constraints apply anymore (group removed?) → run directly + // No constraints apply anymore (group removed?) → check shouldExecute then run if (applicableGroups.length === 0) { - entry.task.trigger(entry.input).then( - (result) => entry.deferred.resolve(result), - (err) => entry.deferred.reject(err), - ); + this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => { + if (!shouldRun) { + entry.deferred.resolve(undefined); + return; + } + entry.task.trigger(entry.input).then( + (result) => entry.deferred.resolve(result), + (err) => entry.deferred.reject(err), + ); + }); continue; } const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key)); if (allCanRun) { + // executeWithConstraintTracking handles shouldExecute check internally this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then( (result) => entry.deferred.resolve(result), (err) => entry.deferred.reject(err), diff --git a/ts/taskbuffer.interfaces.ts b/ts/taskbuffer.interfaces.ts index 4c1215f..1f56597 100644 --- a/ts/taskbuffer.interfaces.ts +++ b/ts/taskbuffer.interfaces.ts @@ -3,15 +3,22 @@ import type { Task } from './taskbuffer.classes.task.js'; export interface ITaskConstraintGroupOptions = Record> { name: string; - constraintKeyForTask: (task: Task) => string | null | undefined; + constraintKeyForExecution: (task: Task, input?: any) => string | null | undefined; maxConcurrent?: number; // default: Infinity cooldownMs?: number; // default: 0 + shouldExecute?: (task: Task, input?: any) => boolean | Promise; +} + +export interface ITaskExecution = Record> { + task: Task; + input: any; } export interface IConstrainedTaskEntry { task: Task; input: any; deferred: import('@push.rocks/smartpromise').Deferred; + constraintKeys: Map; // groupName -> key } export interface ITaskMetadata { diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 959a6f8..e009d9e 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '5.0.1', + version: '6.0.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' }