From 3ab90d9895171829b7974a4850d7b42f620ec83d Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 15 Feb 2026 21:51:55 +0000 Subject: [PATCH] feat(taskbuffer): add sliding-window rate limiting and result-sharing to TaskConstraintGroup and integrate with TaskManager --- changelog.md | 12 + readme.hints.md | 23 +- readme.md | 121 +++- test/test.13.constraints.ts | 562 +++++++++++++++++++ ts/00_commitinfo_data.ts | 2 +- ts/index.ts | 2 +- ts/taskbuffer.classes.taskconstraintgroup.ts | 77 ++- ts/taskbuffer.classes.taskmanager.ts | 25 +- ts/taskbuffer.interfaces.ts | 9 + ts_web/00_commitinfo_data.ts | 2 +- 10 files changed, 819 insertions(+), 16 deletions(-) diff --git a/changelog.md b/changelog.md index dfb2b1e..de7f039 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,17 @@ # Changelog +## 2026-02-15 - 6.1.0 - feat(taskbuffer) +add sliding-window rate limiting and result-sharing to TaskConstraintGroup and integrate with TaskManager + +- Added IRateLimitConfig and TResultSharingMode types and exported them from the public index +- TaskConstraintGroup: added rateLimit and resultSharingMode options, internal completion timestamp tracking, and last-result storage +- TaskConstraintGroup: new helpers - pruneCompletionTimestamps, getRateLimitDelay, getNextAvailableDelay, recordResult, getLastResult, hasResultSharing +- TaskConstraintGroup: rate-limit logic enforces maxPerWindow (counts running + completions) and composes with cooldown/maxConcurrent +- TaskManager: records successful task results to constraint groups and resolves queued entries immediately when a shared result exists +- TaskManager: queue drain now considers unified next-available delay (cooldown + rate limit) when scheduling retries +- Documentation updated: README and hints with usage examples for sliding-window rate limiting and result sharing +- Comprehensive tests added for rate limiting, concurrency interaction, and result-sharing behavior + ## 2026-02-15 - 6.0.1 - fix(taskbuffer) no changes to commit diff --git a/readme.hints.md b/readme.hints.md index e10886d..274fd7c 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -12,14 +12,31 @@ - Typed data bag accessible as `task.data` ### TaskConstraintGroup -- `new TaskConstraintGroup({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute? })` +- `new TaskConstraintGroup({ name, constraintKeyForExecution, maxConcurrent?, cooldownMs?, shouldExecute?, rateLimit?, resultSharingMode? })` - `constraintKeyForExecution(task, input?)` returns a string key (constraint applies) or `null` (skip). Receives both task and runtime input. - `shouldExecute(task, input?)` — optional pre-execution check. Returns `false` to skip (deferred resolves `undefined`). Can be async. - `maxConcurrent` (default: `Infinity`) — max concurrent tasks per key - `cooldownMs` (default: `0`) — minimum ms gap between completions per key -- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRunningCount(key)`, `reset()` +- `rateLimit` (optional) — `{ maxPerWindow: number, windowMs: number }` sliding window rate limiter. Counts both running + completed tasks in window. +- `resultSharingMode` (default: `'none'`) — `'none'` | `'share-latest'`. When `'share-latest'`, queued tasks for the same key resolve with the first task's result without executing. +- Methods: `getConstraintKey(task, input?)`, `checkShouldExecute(task, input?)`, `canRun(key)`, `acquireSlot(key)`, `releaseSlot(key)`, `getCooldownRemaining(key)`, `getRateLimitDelay(key)`, `getNextAvailableDelay(key)`, `getRunningCount(key)`, `recordResult(key, result)`, `getLastResult(key)`, `hasResultSharing()`, `reset()` - `ITaskExecution` type exported from index — `{ task, input }` tuple +### Rate Limiting (v6.1.0+) +- Sliding window rate limiter: `rateLimit: { maxPerWindow: N, windowMs: ms }` +- Counts running + completed tasks against the window cap +- Per-key independence: saturating key A doesn't block key B +- Composable with `maxConcurrent` and `cooldownMs` +- `getNextAvailableDelay(key)` returns `Math.max(cooldownRemaining, rateLimitDelay)` — unified "how long until I can run" answer +- Drain timer auto-schedules based on shortest delay across all constraints + +### Result Sharing (v6.1.0+) +- `resultSharingMode: 'share-latest'` — queued tasks for the same key get the first task's result without executing +- Only successful results are shared (errors from `catchErrors: true` or thrown errors are NOT shared) +- `shouldExecute` is NOT called for shared results (the task's purpose was already fulfilled) +- `lastResults` persists until `reset()` — for time-bounded sharing, use `shouldExecute` to control staleness +- Composable with rate limiting: rate-limited waiters get shared result without waiting for the window + ### TaskManager Constraint Integration - `manager.addConstraintGroup(group)` / `manager.removeConstraintGroup(name)` - `triggerTaskByName()`, `triggerTask()`, `addExecuteRemoveTask()`, cron callbacks all route through `triggerTaskConstrained()` @@ -28,7 +45,7 @@ ### Exported from index.ts - `TaskConstraintGroup` class -- `ITaskConstraintGroupOptions` type +- `ITaskConstraintGroupOptions`, `IRateLimitConfig`, `TResultSharingMode` types ## Error Handling (v3.6.0+) - `Task` now has `catchErrors` constructor option (default: `false`) diff --git a/readme.md b/readme.md index 99577bd..2423d2d 100644 --- a/readme.md +++ b/readme.md @@ -13,7 +13,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community ## 🌟 Features - **🎯 Type-Safe Task Management** — Full TypeScript support with generics and type inference -- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, and cooldown enforcement via `TaskConstraintGroup` +- **🔒 Constraint-Based Concurrency** — Per-key mutual exclusion, group concurrency limits, cooldown enforcement, sliding-window rate limiting, and result sharing via `TaskConstraintGroup` - **📊 Real-Time Progress Tracking** — Step-based progress with percentage weights - **⚡ Smart Buffering** — Intelligent request debouncing and batching - **⏰ Cron Scheduling** — Schedule tasks with cron expressions @@ -311,6 +311,105 @@ const [cert1, cert2, cert3] = await Promise.all([r1, r2, r3]); - Has closure access to external state modified by prior executions - If multiple constraint groups have `shouldExecute`, **all** must return `true` +### Sliding Window Rate Limiting + +Enforce "N completions per time window" with burst capability. Unlike `cooldownMs` (which forces even spacing between executions), `rateLimit` allows bursts up to the cap, then blocks until the window slides: + +```typescript +// Let's Encrypt style: 300 new orders per 3 hours +const acmeRateLimit = new TaskConstraintGroup({ + name: 'acme-rate', + constraintKeyForExecution: () => 'acme-account', + rateLimit: { + maxPerWindow: 300, + windowMs: 3 * 60 * 60 * 1000, // 3 hours + }, +}); + +manager.addConstraintGroup(acmeRateLimit); + +// All 300 can burst immediately. The 301st waits until the oldest +// completion falls out of the 3-hour window. +for (const domain of domains) { + manager.triggerTaskConstrained(certTask, { domain }); +} +``` + +Compose multiple rate limits for layered protection: + +```typescript +// Per-domain weekly cap AND global order rate +const perDomainWeekly = new TaskConstraintGroup({ + name: 'per-domain-weekly', + constraintKeyForExecution: (task, input) => input.registeredDomain, + rateLimit: { maxPerWindow: 50, windowMs: 7 * 24 * 60 * 60 * 1000 }, +}); + +const globalOrderRate = new TaskConstraintGroup({ + name: 'global-order-rate', + constraintKeyForExecution: () => 'global', + rateLimit: { maxPerWindow: 300, windowMs: 3 * 60 * 60 * 1000 }, +}); + +manager.addConstraintGroup(perDomainWeekly); +manager.addConstraintGroup(globalOrderRate); +``` + +Combine with `maxConcurrent` and `cooldownMs` for fine-grained control: + +```typescript +const throttled = new TaskConstraintGroup({ + name: 'acme-throttle', + constraintKeyForExecution: () => 'acme', + maxConcurrent: 5, // max 5 concurrent requests + cooldownMs: 1000, // 1s gap after each completion + rateLimit: { + maxPerWindow: 300, + windowMs: 3 * 60 * 60 * 1000, + }, +}); +``` + +### Result Sharing — Deduplication for Concurrent Requests + +When multiple callers request the same resource concurrently, `resultSharingMode: 'share-latest'` ensures only one execution occurs. All queued waiters receive the same result: + +```typescript +const certMutex = new TaskConstraintGroup({ + name: 'cert-per-tld', + constraintKeyForExecution: (task, input) => extractTld(input.domain), + maxConcurrent: 1, + resultSharingMode: 'share-latest', +}); + +manager.addConstraintGroup(certMutex); + +const certTask = new Task({ + name: 'obtain-cert', + taskFunction: async (input) => { + return await acmeClient.obtainWildcard(input.domain); + }, +}); +manager.addTask(certTask); + +// Three requests for *.example.com arrive simultaneously +const [cert1, cert2, cert3] = await Promise.all([ + manager.triggerTaskConstrained(certTask, { domain: 'api.example.com' }), + manager.triggerTaskConstrained(certTask, { domain: 'www.example.com' }), + manager.triggerTaskConstrained(certTask, { domain: 'mail.example.com' }), +]); + +// Only ONE ACME request was made. +// cert1 === cert2 === cert3 — all callers got the same cert object. +``` + +**Result sharing semantics:** + +- `shouldExecute` is NOT called for shared results (the task's purpose was already fulfilled) +- Error results are NOT shared — queued tasks execute independently after a failure +- `lastResults` persists until `reset()` — for time-bounded sharing, use `shouldExecute` to control staleness +- Composable with rate limiting: rate-limited waiters get shared results without waiting for the window + ### How It Works When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskByName`, `addExecuteRemoveTask`, or cron), the manager: @@ -319,8 +418,9 @@ When you trigger a task through `TaskManager` (via `triggerTask`, `triggerTaskBy 2. If no constraints apply (all matchers return `null`) → checks `shouldExecute` → runs or skips 3. If all applicable constraints have capacity → acquires slots → checks `shouldExecute` → runs or skips 4. If any constraint blocks → enqueues the task; when a running task completes, the queue is drained -5. Cooldown-blocked tasks auto-retry after the shortest remaining cooldown expires -6. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned +5. Cooldown/rate-limit-blocked tasks auto-retry after the shortest remaining delay expires +6. Queued tasks check for shared results first (if any group has `resultSharingMode: 'share-latest'`) +7. Queued tasks re-check `shouldExecute` when their turn comes — stale work is automatically pruned ## 🎯 Core Concepts @@ -926,6 +1026,8 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | `maxConcurrent` | `number` | `Infinity` | Max concurrent tasks per key | | `cooldownMs` | `number` | `0` | Minimum ms between completions per key | | `shouldExecute` | `(task, input?) => boolean \| Promise` | — | Pre-execution check. Return `false` to skip; deferred resolves `undefined`. | +| `rateLimit` | `IRateLimitConfig` | — | Sliding window: `{ maxPerWindow, windowMs }`. Counts running + completed tasks. | +| `resultSharingMode` | `TResultSharingMode` | `'none'` | `'none'` or `'share-latest'`. Queued tasks get first task's result without executing. | ### TaskConstraintGroup Methods @@ -933,12 +1035,17 @@ const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme'); | --- | --- | --- | | `getConstraintKey(task, input?)` | `string \| null` | Get the constraint key for a task + input | | `checkShouldExecute(task, input?)` | `Promise` | Run the `shouldExecute` callback (defaults to `true`) | -| `canRun(key)` | `boolean` | Check if a slot is available | +| `canRun(key)` | `boolean` | Check if a slot is available (considers concurrency, cooldown, and rate limit) | | `acquireSlot(key)` | `void` | Claim a running slot | -| `releaseSlot(key)` | `void` | Release a slot and record completion time | +| `releaseSlot(key)` | `void` | Release a slot and record completion time + rate-limit timestamp | | `getCooldownRemaining(key)` | `number` | Milliseconds until cooldown expires | +| `getRateLimitDelay(key)` | `number` | Milliseconds until a rate-limit slot opens | +| `getNextAvailableDelay(key)` | `number` | Max of cooldown + rate-limit delay — unified "when can I run" | | `getRunningCount(key)` | `number` | Current running count for key | -| `reset()` | `void` | Clear all state | +| `recordResult(key, result)` | `void` | Store result for sharing (no-op if mode is `'none'`) | +| `getLastResult(key)` | `{result, timestamp} \| undefined` | Get last shared result for key | +| `hasResultSharing()` | `boolean` | Whether result sharing is enabled | +| `reset()` | `void` | Clear all state (running counts, cooldowns, rate-limit timestamps, shared results) | ### TaskManager Methods @@ -986,6 +1093,8 @@ import type { ITaskStep, ITaskFunction, ITaskConstraintGroupOptions, + IRateLimitConfig, + TResultSharingMode, StepNames, } from '@push.rocks/taskbuffer'; ``` diff --git a/test/test.13.constraints.ts b/test/test.13.constraints.ts index ff4498f..199e4fb 100644 --- a/test/test.13.constraints.ts +++ b/test/test.13.constraints.ts @@ -870,4 +870,566 @@ tap.test('should use both task.data and input in constraint key', async () => { await manager.stop(); }); +// ============================================================================= +// Rate Limiting Tests +// ============================================================================= + +// Test 24: Basic N-per-window rate limiting +tap.test('should enforce N-per-window rate limit', async () => { + const manager = new taskbuffer.TaskManager(); + const execTimestamps: number[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'rate-limit-basic', + maxConcurrent: Infinity, + constraintKeyForExecution: () => 'shared', + rateLimit: { + maxPerWindow: 3, + windowMs: 1000, + }, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `rl-${id}`, + taskFunction: async () => { + execTimestamps.push(Date.now()); + return `done-${id}`; + }, + }); + + const tasks = [makeTask(1), makeTask(2), makeTask(3), makeTask(4), makeTask(5)]; + for (const t of tasks) manager.addTask(t); + + const results = await Promise.all(tasks.map((t) => manager.triggerTaskConstrained(t))); + + // All 5 should eventually complete + expect(results).toEqual(['done-1', 'done-2', 'done-3', 'done-4', 'done-5']); + + // First 3 should execute nearly simultaneously + const firstBatchSpread = execTimestamps[2] - execTimestamps[0]; + expect(firstBatchSpread).toBeLessThan(100); + + // 4th and 5th should wait for the window to slide (at least ~900ms after first) + const fourthDelay = execTimestamps[3] - execTimestamps[0]; + expect(fourthDelay).toBeGreaterThanOrEqual(900); + + await manager.stop(); +}); + +// Test 25: Rate limit + maxConcurrent interaction +tap.test('should enforce both rate limit and maxConcurrent independently', async () => { + const manager = new taskbuffer.TaskManager(); + let running = 0; + let maxRunning = 0; + const execTimestamps: number[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'rate-concurrent', + maxConcurrent: 2, + constraintKeyForExecution: () => 'shared', + rateLimit: { + maxPerWindow: 3, + windowMs: 2000, + }, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `rc-${id}`, + taskFunction: async () => { + running++; + maxRunning = Math.max(maxRunning, running); + execTimestamps.push(Date.now()); + await smartdelay.delayFor(50); + running--; + }, + }); + + const tasks = [makeTask(1), makeTask(2), makeTask(3), makeTask(4)]; + for (const t of tasks) manager.addTask(t); + + await Promise.all(tasks.map((t) => manager.triggerTaskConstrained(t))); + + // Concurrency limit should be enforced + expect(maxRunning).toBeLessThanOrEqual(2); + + // 4th task should wait for rate limit window (only 3 allowed per 2s) + const fourthDelay = execTimestamps[3] - execTimestamps[0]; + expect(fourthDelay).toBeGreaterThanOrEqual(1900); + + await manager.stop(); +}); + +// Test 26: Rate limit + cooldownMs interaction +tap.test('should enforce both rate limit and cooldown together', async () => { + const manager = new taskbuffer.TaskManager(); + const execTimestamps: number[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'rate-cooldown', + maxConcurrent: 1, + cooldownMs: 200, + constraintKeyForExecution: () => 'shared', + rateLimit: { + maxPerWindow: 2, + windowMs: 2000, + }, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `rcd-${id}`, + taskFunction: async () => { + execTimestamps.push(Date.now()); + }, + }); + + const tasks = [makeTask(1), makeTask(2), makeTask(3)]; + for (const t of tasks) manager.addTask(t); + + await Promise.all(tasks.map((t) => manager.triggerTaskConstrained(t))); + + // Cooldown between first and second: at least 200ms + const gap12 = execTimestamps[1] - execTimestamps[0]; + expect(gap12).toBeGreaterThanOrEqual(150); + + // Third task blocked by rate limit (only 2 per 2000ms window) AND cooldown + const gap13 = execTimestamps[2] - execTimestamps[0]; + expect(gap13).toBeGreaterThanOrEqual(1900); + + await manager.stop(); +}); + +// Test 27: Per-key rate limit independence +tap.test('should apply rate limit per key independently', async () => { + const manager = new taskbuffer.TaskManager(); + const execLog: string[] = []; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'rate-per-key', + constraintKeyForExecution: (_task, input?: string) => input, + rateLimit: { + maxPerWindow: 1, + windowMs: 2000, + }, + }); + manager.addConstraintGroup(constraint); + + const task = new taskbuffer.Task({ + name: 'rate-key-task', + taskFunction: async (input: string) => { + execLog.push(input); + }, + }); + manager.addTask(task); + + // Trigger 2 for key-A and 1 for key-B + const [r1, r2, r3] = await Promise.all([ + manager.triggerTaskConstrained(task, 'key-A'), + manager.triggerTaskConstrained(task, 'key-B'), + manager.triggerTaskConstrained(task, 'key-A'), // should wait for window + ]); + + // key-A and key-B first calls should both execute immediately + expect(execLog[0]).toEqual('key-A'); + expect(execLog[1]).toEqual('key-B'); + // key-A second call eventually executes + expect(execLog).toContain('key-A'); + expect(execLog.length).toEqual(3); + + await manager.stop(); +}); + +// Test 28: getNextAvailableDelay returns correct value +tap.test('should return correct getNextAvailableDelay and canRun after waiting', async () => { + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'delay-check', + constraintKeyForExecution: () => 'key', + rateLimit: { + maxPerWindow: 1, + windowMs: 500, + }, + }); + + // Initially: can run, no delay + expect(constraint.canRun('key')).toBeTrue(); + expect(constraint.getNextAvailableDelay('key')).toEqual(0); + + // Acquire and release to record a completion + constraint.acquireSlot('key'); + constraint.releaseSlot('key'); + + // Now: rate limit saturated + expect(constraint.canRun('key')).toBeFalse(); + const delay = constraint.getNextAvailableDelay('key'); + expect(delay).toBeGreaterThan(0); + expect(delay).toBeLessThanOrEqual(500); + + // Wait for window to slide + await smartdelay.delayFor(delay + 50); + + expect(constraint.canRun('key')).toBeTrue(); + expect(constraint.getNextAvailableDelay('key')).toEqual(0); +}); + +// Test 29: reset() clears rate-limit timestamps +tap.test('should clear rate limit timestamps on reset', async () => { + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'reset-rate', + constraintKeyForExecution: () => 'key', + rateLimit: { + maxPerWindow: 1, + windowMs: 60000, + }, + }); + + constraint.acquireSlot('key'); + constraint.releaseSlot('key'); + expect(constraint.canRun('key')).toBeFalse(); + + constraint.reset(); + expect(constraint.canRun('key')).toBeTrue(); + expect(constraint.getRateLimitDelay('key')).toEqual(0); +}); + +// ============================================================================= +// Result Sharing Tests +// ============================================================================= + +// Test 30: Basic result sharing — multiple waiters get first task's result +tap.test('should share result with queued tasks (share-latest mode)', async () => { + const manager = new taskbuffer.TaskManager(); + let execCount = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'share-basic', + maxConcurrent: 1, + constraintKeyForExecution: () => 'shared', + resultSharingMode: 'share-latest', + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `share-${id}`, + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(100); + return 'shared-result'; + }, + }); + + const t1 = makeTask(1); + const t2 = makeTask(2); + const t3 = makeTask(3); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + const [r1, r2, r3] = await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + manager.triggerTaskConstrained(t3), + ]); + + // Only 1 execution, all get same result + expect(execCount).toEqual(1); + expect(r1).toEqual('shared-result'); + expect(r2).toEqual('shared-result'); + expect(r3).toEqual('shared-result'); + + await manager.stop(); +}); + +// Test 31: Different keys get independent results +tap.test('should share results independently per key', async () => { + const manager = new taskbuffer.TaskManager(); + let execCount = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'share-per-key', + maxConcurrent: 1, + constraintKeyForExecution: (_task, input?: string) => input, + resultSharingMode: 'share-latest', + }); + manager.addConstraintGroup(constraint); + + const task = new taskbuffer.Task({ + name: 'keyed-share', + taskFunction: async (input: string) => { + execCount++; + await smartdelay.delayFor(50); + return `result-for-${input}`; + }, + }); + manager.addTask(task); + + const [r1, r2] = await Promise.all([ + manager.triggerTaskConstrained(task, 'key-A'), + manager.triggerTaskConstrained(task, 'key-B'), + ]); + + // Different keys → both execute independently + expect(execCount).toEqual(2); + expect(r1).toEqual('result-for-key-A'); + expect(r2).toEqual('result-for-key-B'); + + await manager.stop(); +}); + +// Test 32: Default mode ('none') — no sharing +tap.test('should not share results when mode is none (default)', async () => { + const manager = new taskbuffer.TaskManager(); + let execCount = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'no-share', + maxConcurrent: 1, + constraintKeyForExecution: () => 'shared', + // resultSharingMode defaults to 'none' + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `noshare-${id}`, + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(50); + return `result-${execCount}`; + }, + }); + + const t1 = makeTask(1); + const t2 = makeTask(2); + + manager.addTask(t1); + manager.addTask(t2); + + const [r1, r2] = await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + ]); + + // Both should execute independently + expect(execCount).toEqual(2); + expect(r1).toEqual('result-1'); + expect(r2).toEqual('result-2'); + + await manager.stop(); +}); + +// Test 33: Sharing takes priority over shouldExecute for queued tasks +tap.test('should not call shouldExecute for shared results', async () => { + const manager = new taskbuffer.TaskManager(); + let shouldExecuteCalls = 0; + let execCount = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'share-vs-should', + maxConcurrent: 1, + constraintKeyForExecution: () => 'shared', + resultSharingMode: 'share-latest', + shouldExecute: () => { + shouldExecuteCalls++; + return true; + }, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `svs-${id}`, + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(100); + return 'shared-value'; + }, + }); + + const t1 = makeTask(1); + const t2 = makeTask(2); + const t3 = makeTask(3); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + const initialShouldExecuteCalls = shouldExecuteCalls; + + await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + manager.triggerTaskConstrained(t3), + ]); + + // Only 1 execution + expect(execCount).toEqual(1); + + // shouldExecute called once for the first task, but not for shared results + // (t2 and t3 get shared result without going through executeWithConstraintTracking) + const totalShouldExecuteCalls = shouldExecuteCalls - initialShouldExecuteCalls; + expect(totalShouldExecuteCalls).toEqual(1); + + await manager.stop(); +}); + +// Test 34: Error results NOT shared — queued task executes after failure +tap.test('should not share error results with queued tasks', async () => { + const manager = new taskbuffer.TaskManager(); + let execCount = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'share-error', + maxConcurrent: 1, + constraintKeyForExecution: () => 'shared', + resultSharingMode: 'share-latest', + }); + manager.addConstraintGroup(constraint); + + const failTask = new taskbuffer.Task({ + name: 'fail-share', + catchErrors: true, + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(50); + throw new Error('fail'); + }, + }); + + const successTask = new taskbuffer.Task({ + name: 'success-share', + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(50); + return 'success-result'; + }, + }); + + manager.addTask(failTask); + manager.addTask(successTask); + + const [r1, r2] = await Promise.all([ + manager.triggerTaskConstrained(failTask), + manager.triggerTaskConstrained(successTask), + ]); + + // Both should have executed (error result not shared) + expect(execCount).toEqual(2); + expect(r2).toEqual('success-result'); + + await manager.stop(); +}); + +// Test 35: Multiple constraint groups — sharing from one group applies +tap.test('should share result when any applicable group has sharing enabled', async () => { + const manager = new taskbuffer.TaskManager(); + let execCount = 0; + + const sharingGroup = new taskbuffer.TaskConstraintGroup({ + name: 'sharing-group', + maxConcurrent: 1, + constraintKeyForExecution: () => 'shared', + resultSharingMode: 'share-latest', + }); + + const nonSharingGroup = new taskbuffer.TaskConstraintGroup({ + name: 'non-sharing-group', + maxConcurrent: 5, + constraintKeyForExecution: () => 'all', + // resultSharingMode defaults to 'none' + }); + + manager.addConstraintGroup(sharingGroup); + manager.addConstraintGroup(nonSharingGroup); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `multi-share-${id}`, + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(100); + return 'multi-group-result'; + }, + }); + + const t1 = makeTask(1); + const t2 = makeTask(2); + + manager.addTask(t1); + manager.addTask(t2); + + const [r1, r2] = await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + ]); + + // Only 1 execution due to sharing from the sharing group + expect(execCount).toEqual(1); + expect(r1).toEqual('multi-group-result'); + expect(r2).toEqual('multi-group-result'); + + await manager.stop(); +}); + +// Test 36: Result sharing + rate limit combo +tap.test('should resolve rate-limited waiters with shared result', async () => { + const manager = new taskbuffer.TaskManager(); + let execCount = 0; + + const constraint = new taskbuffer.TaskConstraintGroup({ + name: 'share-rate', + maxConcurrent: 1, + constraintKeyForExecution: () => 'shared', + resultSharingMode: 'share-latest', + rateLimit: { + maxPerWindow: 1, + windowMs: 5000, + }, + }); + manager.addConstraintGroup(constraint); + + const makeTask = (id: number) => + new taskbuffer.Task({ + name: `sr-${id}`, + taskFunction: async () => { + execCount++; + await smartdelay.delayFor(50); + return 'rate-shared-result'; + }, + }); + + const t1 = makeTask(1); + const t2 = makeTask(2); + const t3 = makeTask(3); + + manager.addTask(t1); + manager.addTask(t2); + manager.addTask(t3); + + const startTime = Date.now(); + const [r1, r2, r3] = await Promise.all([ + manager.triggerTaskConstrained(t1), + manager.triggerTaskConstrained(t2), + manager.triggerTaskConstrained(t3), + ]); + const elapsed = Date.now() - startTime; + + // Only 1 execution; waiters get shared result without waiting for rate limit window + expect(execCount).toEqual(1); + expect(r1).toEqual('rate-shared-result'); + expect(r2).toEqual('rate-shared-result'); + expect(r3).toEqual('rate-shared-result'); + + // Should complete quickly (not waiting 5s for rate limit window) + expect(elapsed).toBeLessThan(1000); + + await manager.stop(); +}); + export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 782323b..31cd368 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '6.0.1', + version: '6.1.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' } diff --git a/ts/index.ts b/ts/index.ts index 69c8300..007266b 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -12,7 +12,7 @@ export { TaskStep } from './taskbuffer.classes.taskstep.js'; export type { ITaskStep } from './taskbuffer.classes.taskstep.js'; // Metadata interfaces -export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution } from './taskbuffer.interfaces.js'; +export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js'; import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js'; export { distributedCoordination }; diff --git a/ts/taskbuffer.classes.taskconstraintgroup.ts b/ts/taskbuffer.classes.taskconstraintgroup.ts index 651303f..ae7d837 100644 --- a/ts/taskbuffer.classes.taskconstraintgroup.ts +++ b/ts/taskbuffer.classes.taskconstraintgroup.ts @@ -1,15 +1,19 @@ import type { Task } from './taskbuffer.classes.task.js'; -import type { ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js'; +import type { ITaskConstraintGroupOptions, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js'; export class TaskConstraintGroup = Record> { public name: string; public maxConcurrent: number; public cooldownMs: number; + public rateLimit: IRateLimitConfig | null; + public resultSharingMode: TResultSharingMode; private constraintKeyForExecution: (task: Task, input?: any) => string | null | undefined; private shouldExecuteFn?: (task: Task, input?: any) => boolean | Promise; private runningCounts = new Map(); private lastCompletionTimes = new Map(); + private completionTimestamps = new Map(); + private lastResults = new Map(); constructor(options: ITaskConstraintGroupOptions) { this.name = options.name; @@ -17,6 +21,8 @@ export class TaskConstraintGroup = Record< this.maxConcurrent = options.maxConcurrent ?? Infinity; this.cooldownMs = options.cooldownMs ?? 0; this.shouldExecuteFn = options.shouldExecute; + this.rateLimit = options.rateLimit ?? null; + this.resultSharingMode = options.resultSharingMode ?? 'none'; } public getConstraintKey(task: Task, input?: any): string | null { @@ -47,6 +53,16 @@ export class TaskConstraintGroup = Record< } } + if (this.rateLimit) { + this.pruneCompletionTimestamps(subGroupKey); + const timestamps = this.completionTimestamps.get(subGroupKey); + const completedInWindow = timestamps ? timestamps.length : 0; + const running = this.runningCounts.get(subGroupKey) ?? 0; + if (completedInWindow + running >= this.rateLimit.maxPerWindow) { + return false; + } + } + return true; } @@ -64,6 +80,12 @@ export class TaskConstraintGroup = Record< this.runningCounts.set(subGroupKey, next); } this.lastCompletionTimes.set(subGroupKey, Date.now()); + + if (this.rateLimit) { + const timestamps = this.completionTimestamps.get(subGroupKey) ?? []; + timestamps.push(Date.now()); + this.completionTimestamps.set(subGroupKey, timestamps); + } } public getCooldownRemaining(subGroupKey: string): number { @@ -82,8 +104,61 @@ export class TaskConstraintGroup = Record< return this.runningCounts.get(subGroupKey) ?? 0; } + // Rate limit helpers + private pruneCompletionTimestamps(subGroupKey: string): void { + const timestamps = this.completionTimestamps.get(subGroupKey); + if (!timestamps || !this.rateLimit) return; + const cutoff = Date.now() - this.rateLimit.windowMs; + let i = 0; + while (i < timestamps.length && timestamps[i] <= cutoff) { + i++; + } + if (i > 0) { + timestamps.splice(0, i); + } + } + + public getRateLimitDelay(subGroupKey: string): number { + if (!this.rateLimit) return 0; + this.pruneCompletionTimestamps(subGroupKey); + const timestamps = this.completionTimestamps.get(subGroupKey); + const completedInWindow = timestamps ? timestamps.length : 0; + const running = this.runningCounts.get(subGroupKey) ?? 0; + if (completedInWindow + running < this.rateLimit.maxPerWindow) { + return 0; + } + // If only running tasks fill the window (no completions yet), we can't compute a delay + if (!timestamps || timestamps.length === 0) { + return 1; // minimal delay; drain will re-check after running tasks complete + } + // The oldest timestamp in the window determines when a slot opens + const oldestInWindow = timestamps[0]; + const expiry = oldestInWindow + this.rateLimit.windowMs; + return Math.max(0, expiry - Date.now()); + } + + public getNextAvailableDelay(subGroupKey: string): number { + return Math.max(this.getCooldownRemaining(subGroupKey), this.getRateLimitDelay(subGroupKey)); + } + + // Result sharing helpers + public recordResult(subGroupKey: string, result: any): void { + if (this.resultSharingMode === 'none') return; + this.lastResults.set(subGroupKey, { result, timestamp: Date.now() }); + } + + public getLastResult(subGroupKey: string): { result: any; timestamp: number } | undefined { + return this.lastResults.get(subGroupKey); + } + + public hasResultSharing(): boolean { + return this.resultSharingMode !== 'none'; + } + public reset(): void { this.runningCounts.clear(); this.lastCompletionTimes.clear(); + this.completionTimestamps.clear(); + this.lastResults.clear(); } } diff --git a/ts/taskbuffer.classes.taskmanager.ts b/ts/taskbuffer.classes.taskmanager.ts index bb81448..31303ae 100644 --- a/ts/taskbuffer.classes.taskmanager.ts +++ b/ts/taskbuffer.classes.taskmanager.ts @@ -143,7 +143,14 @@ export class TaskManager { } try { - return await task.trigger(input); + const result = await task.trigger(input); + // Record result for groups with result sharing (only on true success, not caught errors) + if (!task.lastError) { + for (const { group, key } of groups) { + group.recordResult(key, result); + } + } + return result; } finally { // Release slots for (const { group, key } of groups) { @@ -181,6 +188,18 @@ export class TaskManager { continue; } + // Check result sharing — if any applicable group has a shared result, resolve immediately + const sharingGroups = applicableGroups.filter(({ group }) => group.hasResultSharing()); + if (sharingGroups.length > 0) { + const groupWithResult = sharingGroups.find(({ group, key }) => + group.getLastResult(key) !== undefined + ); + if (groupWithResult) { + entry.deferred.resolve(groupWithResult.group.getLastResult(groupWithResult.key)!.result); + continue; + } + } + const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key)); if (allCanRun) { // executeWithConstraintTracking handles shouldExecute check internally @@ -190,9 +209,9 @@ export class TaskManager { ); } else { stillQueued.push(entry); - // Track shortest cooldown for timer scheduling + // Track shortest delay for timer scheduling (cooldown + rate limit) for (const { group, key } of applicableGroups) { - const remaining = group.getCooldownRemaining(key); + const remaining = group.getNextAvailableDelay(key); if (remaining > 0 && remaining < shortestCooldown) { shortestCooldown = remaining; } diff --git a/ts/taskbuffer.interfaces.ts b/ts/taskbuffer.interfaces.ts index 1f56597..a863d8b 100644 --- a/ts/taskbuffer.interfaces.ts +++ b/ts/taskbuffer.interfaces.ts @@ -1,12 +1,21 @@ import type { ITaskStep } from './taskbuffer.classes.taskstep.js'; import type { Task } from './taskbuffer.classes.task.js'; +export interface IRateLimitConfig { + maxPerWindow: number; // max completions allowed within the sliding window + windowMs: number; // sliding window duration in ms +} + +export type TResultSharingMode = 'none' | 'share-latest'; + export interface ITaskConstraintGroupOptions = Record> { name: string; constraintKeyForExecution: (task: Task, input?: any) => string | null | undefined; maxConcurrent?: number; // default: Infinity cooldownMs?: number; // default: 0 shouldExecute?: (task: Task, input?: any) => boolean | Promise; + rateLimit?: IRateLimitConfig; + resultSharingMode?: TResultSharingMode; // default: 'none' } export interface ITaskExecution = Record> { diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 782323b..31cd368 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/taskbuffer', - version: '6.0.1', + version: '6.1.0', description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.' }