165 lines
5.8 KiB
TypeScript
165 lines
5.8 KiB
TypeScript
import type { Task } from './taskbuffer.classes.task.js';
|
|
import type { ITaskConstraintGroupOptions, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js';
|
|
|
|
export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<string, unknown>> {
|
|
public name: string;
|
|
public maxConcurrent: number;
|
|
public cooldownMs: number;
|
|
public rateLimit: IRateLimitConfig | null;
|
|
public resultSharingMode: TResultSharingMode;
|
|
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
|
|
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
|
|
|
|
private runningCounts = new Map<string, number>();
|
|
private lastCompletionTimes = new Map<string, number>();
|
|
private completionTimestamps = new Map<string, number[]>();
|
|
private lastResults = new Map<string, { result: any; timestamp: number }>();
|
|
|
|
constructor(options: ITaskConstraintGroupOptions<TData>) {
|
|
this.name = options.name;
|
|
this.constraintKeyForExecution = options.constraintKeyForExecution;
|
|
this.maxConcurrent = options.maxConcurrent ?? Infinity;
|
|
this.cooldownMs = options.cooldownMs ?? 0;
|
|
this.shouldExecuteFn = options.shouldExecute;
|
|
this.rateLimit = options.rateLimit ?? null;
|
|
this.resultSharingMode = options.resultSharingMode ?? 'none';
|
|
}
|
|
|
|
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
|
|
const key = this.constraintKeyForExecution(task, input);
|
|
return key ?? null;
|
|
}
|
|
|
|
public async checkShouldExecute(task: Task<any, any, TData>, input?: any): Promise<boolean> {
|
|
if (!this.shouldExecuteFn) {
|
|
return true;
|
|
}
|
|
return this.shouldExecuteFn(task, input);
|
|
}
|
|
|
|
public canRun(subGroupKey: string): boolean {
|
|
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
|
if (running >= this.maxConcurrent) {
|
|
return false;
|
|
}
|
|
|
|
if (this.cooldownMs > 0) {
|
|
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
|
|
if (lastCompletion !== undefined) {
|
|
const elapsed = Date.now() - lastCompletion;
|
|
if (elapsed < this.cooldownMs) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (this.rateLimit) {
|
|
this.pruneCompletionTimestamps(subGroupKey);
|
|
const timestamps = this.completionTimestamps.get(subGroupKey);
|
|
const completedInWindow = timestamps ? timestamps.length : 0;
|
|
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
|
if (completedInWindow + running >= this.rateLimit.maxPerWindow) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
public acquireSlot(subGroupKey: string): void {
|
|
const current = this.runningCounts.get(subGroupKey) ?? 0;
|
|
this.runningCounts.set(subGroupKey, current + 1);
|
|
}
|
|
|
|
public releaseSlot(subGroupKey: string): void {
|
|
const current = this.runningCounts.get(subGroupKey) ?? 0;
|
|
const next = Math.max(0, current - 1);
|
|
if (next === 0) {
|
|
this.runningCounts.delete(subGroupKey);
|
|
} else {
|
|
this.runningCounts.set(subGroupKey, next);
|
|
}
|
|
this.lastCompletionTimes.set(subGroupKey, Date.now());
|
|
|
|
if (this.rateLimit) {
|
|
const timestamps = this.completionTimestamps.get(subGroupKey) ?? [];
|
|
timestamps.push(Date.now());
|
|
this.completionTimestamps.set(subGroupKey, timestamps);
|
|
}
|
|
}
|
|
|
|
public getCooldownRemaining(subGroupKey: string): number {
|
|
if (this.cooldownMs <= 0) {
|
|
return 0;
|
|
}
|
|
const lastCompletion = this.lastCompletionTimes.get(subGroupKey);
|
|
if (lastCompletion === undefined) {
|
|
return 0;
|
|
}
|
|
const elapsed = Date.now() - lastCompletion;
|
|
return Math.max(0, this.cooldownMs - elapsed);
|
|
}
|
|
|
|
public getRunningCount(subGroupKey: string): number {
|
|
return this.runningCounts.get(subGroupKey) ?? 0;
|
|
}
|
|
|
|
// Rate limit helpers
|
|
private pruneCompletionTimestamps(subGroupKey: string): void {
|
|
const timestamps = this.completionTimestamps.get(subGroupKey);
|
|
if (!timestamps || !this.rateLimit) return;
|
|
const cutoff = Date.now() - this.rateLimit.windowMs;
|
|
let i = 0;
|
|
while (i < timestamps.length && timestamps[i] <= cutoff) {
|
|
i++;
|
|
}
|
|
if (i > 0) {
|
|
timestamps.splice(0, i);
|
|
}
|
|
}
|
|
|
|
public getRateLimitDelay(subGroupKey: string): number {
|
|
if (!this.rateLimit) return 0;
|
|
this.pruneCompletionTimestamps(subGroupKey);
|
|
const timestamps = this.completionTimestamps.get(subGroupKey);
|
|
const completedInWindow = timestamps ? timestamps.length : 0;
|
|
const running = this.runningCounts.get(subGroupKey) ?? 0;
|
|
if (completedInWindow + running < this.rateLimit.maxPerWindow) {
|
|
return 0;
|
|
}
|
|
// If only running tasks fill the window (no completions yet), we can't compute a delay
|
|
if (!timestamps || timestamps.length === 0) {
|
|
return 1; // minimal delay; drain will re-check after running tasks complete
|
|
}
|
|
// The oldest timestamp in the window determines when a slot opens
|
|
const oldestInWindow = timestamps[0];
|
|
const expiry = oldestInWindow + this.rateLimit.windowMs;
|
|
return Math.max(0, expiry - Date.now());
|
|
}
|
|
|
|
public getNextAvailableDelay(subGroupKey: string): number {
|
|
return Math.max(this.getCooldownRemaining(subGroupKey), this.getRateLimitDelay(subGroupKey));
|
|
}
|
|
|
|
// Result sharing helpers
|
|
public recordResult(subGroupKey: string, result: any): void {
|
|
if (this.resultSharingMode === 'none') return;
|
|
this.lastResults.set(subGroupKey, { result, timestamp: Date.now() });
|
|
}
|
|
|
|
public getLastResult(subGroupKey: string): { result: any; timestamp: number } | undefined {
|
|
return this.lastResults.get(subGroupKey);
|
|
}
|
|
|
|
public hasResultSharing(): boolean {
|
|
return this.resultSharingMode !== 'none';
|
|
}
|
|
|
|
public reset(): void {
|
|
this.runningCounts.clear();
|
|
this.lastCompletionTimes.clear();
|
|
this.completionTimestamps.clear();
|
|
this.lastResults.clear();
|
|
}
|
|
}
|