feat(taskbuffer): add sliding-window rate limiting and result-sharing to TaskConstraintGroup and integrate with TaskManager
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/taskbuffer',
|
||||
version: '6.0.1',
|
||||
version: '6.1.0',
|
||||
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ export { TaskStep } from './taskbuffer.classes.taskstep.js';
|
||||
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
|
||||
|
||||
// Metadata interfaces
|
||||
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution } from './taskbuffer.interfaces.js';
|
||||
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
|
||||
|
||||
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
|
||||
export { distributedCoordination };
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
import type { Task } from './taskbuffer.classes.task.js';
|
||||
import type { ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js';
|
||||
import type { ITaskConstraintGroupOptions, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
|
||||
|
||||
export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
public name: string;
|
||||
public maxConcurrent: number;
|
||||
public cooldownMs: number;
|
||||
public rateLimit: IRateLimitConfig | null;
|
||||
public resultSharingMode: TResultSharingMode;
|
||||
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
|
||||
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
|
||||
|
||||
private runningCounts = new Map<string, number>();
|
||||
private lastCompletionTimes = new Map<string, number>();
|
||||
private completionTimestamps = new Map<string, number[]>();
|
||||
private lastResults = new Map<string, { result: any; timestamp: number }>();
|
||||
|
||||
constructor(options: ITaskConstraintGroupOptions<TData>) {
|
||||
this.name = options.name;
|
||||
@@ -17,6 +21,8 @@ export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<
|
||||
this.maxConcurrent = options.maxConcurrent ?? Infinity;
|
||||
this.cooldownMs = options.cooldownMs ?? 0;
|
||||
this.shouldExecuteFn = options.shouldExecute;
|
||||
this.rateLimit = options.rateLimit ?? null;
|
||||
this.resultSharingMode = options.resultSharingMode ?? 'none';
|
||||
}
|
||||
|
||||
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
|
||||
@@ -47,6 +53,16 @@ export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<
|
||||
}
|
||||
}
|
||||
|
||||
if (this.rateLimit) {
|
||||
this.pruneCompletionTimestamps(subGroupKey);
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey);
|
||||
const completedInWindow = timestamps ? timestamps.length : 0;
|
||||
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
if (completedInWindow + running >= this.rateLimit.maxPerWindow) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -64,6 +80,12 @@ export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<
|
||||
this.runningCounts.set(subGroupKey, next);
|
||||
}
|
||||
this.lastCompletionTimes.set(subGroupKey, Date.now());
|
||||
|
||||
if (this.rateLimit) {
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey) ?? [];
|
||||
timestamps.push(Date.now());
|
||||
this.completionTimestamps.set(subGroupKey, timestamps);
|
||||
}
|
||||
}
|
||||
|
||||
public getCooldownRemaining(subGroupKey: string): number {
|
||||
@@ -82,8 +104,61 @@ export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<
|
||||
return this.runningCounts.get(subGroupKey) ?? 0;
|
||||
}
|
||||
|
||||
// Rate limit helpers
|
||||
private pruneCompletionTimestamps(subGroupKey: string): void {
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey);
|
||||
if (!timestamps || !this.rateLimit) return;
|
||||
const cutoff = Date.now() - this.rateLimit.windowMs;
|
||||
let i = 0;
|
||||
while (i < timestamps.length && timestamps[i] <= cutoff) {
|
||||
i++;
|
||||
}
|
||||
if (i > 0) {
|
||||
timestamps.splice(0, i);
|
||||
}
|
||||
}
|
||||
|
||||
public getRateLimitDelay(subGroupKey: string): number {
|
||||
if (!this.rateLimit) return 0;
|
||||
this.pruneCompletionTimestamps(subGroupKey);
|
||||
const timestamps = this.completionTimestamps.get(subGroupKey);
|
||||
const completedInWindow = timestamps ? timestamps.length : 0;
|
||||
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
||||
if (completedInWindow + running < this.rateLimit.maxPerWindow) {
|
||||
return 0;
|
||||
}
|
||||
// If only running tasks fill the window (no completions yet), we can't compute a delay
|
||||
if (!timestamps || timestamps.length === 0) {
|
||||
return 1; // minimal delay; drain will re-check after running tasks complete
|
||||
}
|
||||
// The oldest timestamp in the window determines when a slot opens
|
||||
const oldestInWindow = timestamps[0];
|
||||
const expiry = oldestInWindow + this.rateLimit.windowMs;
|
||||
return Math.max(0, expiry - Date.now());
|
||||
}
|
||||
|
||||
public getNextAvailableDelay(subGroupKey: string): number {
|
||||
return Math.max(this.getCooldownRemaining(subGroupKey), this.getRateLimitDelay(subGroupKey));
|
||||
}
|
||||
|
||||
// Result sharing helpers
|
||||
public recordResult(subGroupKey: string, result: any): void {
|
||||
if (this.resultSharingMode === 'none') return;
|
||||
this.lastResults.set(subGroupKey, { result, timestamp: Date.now() });
|
||||
}
|
||||
|
||||
public getLastResult(subGroupKey: string): { result: any; timestamp: number } | undefined {
|
||||
return this.lastResults.get(subGroupKey);
|
||||
}
|
||||
|
||||
public hasResultSharing(): boolean {
|
||||
return this.resultSharingMode !== 'none';
|
||||
}
|
||||
|
||||
public reset(): void {
|
||||
this.runningCounts.clear();
|
||||
this.lastCompletionTimes.clear();
|
||||
this.completionTimestamps.clear();
|
||||
this.lastResults.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +143,14 @@ export class TaskManager {
|
||||
}
|
||||
|
||||
try {
|
||||
return await task.trigger(input);
|
||||
const result = await task.trigger(input);
|
||||
// Record result for groups with result sharing (only on true success, not caught errors)
|
||||
if (!task.lastError) {
|
||||
for (const { group, key } of groups) {
|
||||
group.recordResult(key, result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} finally {
|
||||
// Release slots
|
||||
for (const { group, key } of groups) {
|
||||
@@ -181,6 +188,18 @@ export class TaskManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check result sharing — if any applicable group has a shared result, resolve immediately
|
||||
const sharingGroups = applicableGroups.filter(({ group }) => group.hasResultSharing());
|
||||
if (sharingGroups.length > 0) {
|
||||
const groupWithResult = sharingGroups.find(({ group, key }) =>
|
||||
group.getLastResult(key) !== undefined
|
||||
);
|
||||
if (groupWithResult) {
|
||||
entry.deferred.resolve(groupWithResult.group.getLastResult(groupWithResult.key)!.result);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
|
||||
if (allCanRun) {
|
||||
// executeWithConstraintTracking handles shouldExecute check internally
|
||||
@@ -190,9 +209,9 @@ export class TaskManager {
|
||||
);
|
||||
} else {
|
||||
stillQueued.push(entry);
|
||||
// Track shortest cooldown for timer scheduling
|
||||
// Track shortest delay for timer scheduling (cooldown + rate limit)
|
||||
for (const { group, key } of applicableGroups) {
|
||||
const remaining = group.getCooldownRemaining(key);
|
||||
const remaining = group.getNextAvailableDelay(key);
|
||||
if (remaining > 0 && remaining < shortestCooldown) {
|
||||
shortestCooldown = remaining;
|
||||
}
|
||||
|
||||
@@ -1,12 +1,21 @@
|
||||
import type { ITaskStep } from './taskbuffer.classes.taskstep.js';
|
||||
import type { Task } from './taskbuffer.classes.task.js';
|
||||
|
||||
export interface IRateLimitConfig {
|
||||
maxPerWindow: number; // max completions allowed within the sliding window
|
||||
windowMs: number; // sliding window duration in ms
|
||||
}
|
||||
|
||||
export type TResultSharingMode = 'none' | 'share-latest';
|
||||
|
||||
export interface ITaskConstraintGroupOptions<TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
name: string;
|
||||
constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
|
||||
maxConcurrent?: number; // default: Infinity
|
||||
cooldownMs?: number; // default: 0
|
||||
shouldExecute?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
|
||||
rateLimit?: IRateLimitConfig;
|
||||
resultSharingMode?: TResultSharingMode; // default: 'none'
|
||||
}
|
||||
|
||||
export interface ITaskExecution<TData extends Record<string, unknown> = Record<string, unknown>> {
|
||||
|
||||
Reference in New Issue
Block a user