BREAKING CHANGE(constraints): make TaskConstraintGroup constraint matcher input-aware and add shouldExecute pre-execution hook

This commit is contained in:
2026-02-15 15:15:37 +00:00
parent 33d1c334c4
commit 0811b04dfd
10 changed files with 389 additions and 51 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '5.0.1',
version: '6.0.0',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}

View File

@@ -12,7 +12,7 @@ export { TaskStep } from './taskbuffer.classes.taskstep.js';
export type { ITaskStep } from './taskbuffer.classes.taskstep.js';
// Metadata interfaces
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions } from './taskbuffer.interfaces.js';
export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution } from './taskbuffer.interfaces.js';
import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js';
export { distributedCoordination };

View File

@@ -5,23 +5,32 @@ export class TaskConstraintGroup<TData extends Record<string, unknown> = Record<
public name: string;
public maxConcurrent: number;
public cooldownMs: number;
private constraintKeyForTask: (task: Task<any, any, TData>) => string | null | undefined;
private constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
private shouldExecuteFn?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
private runningCounts = new Map<string, number>();
private lastCompletionTimes = new Map<string, number>();
constructor(options: ITaskConstraintGroupOptions<TData>) {
this.name = options.name;
this.constraintKeyForTask = options.constraintKeyForTask;
this.constraintKeyForExecution = options.constraintKeyForExecution;
this.maxConcurrent = options.maxConcurrent ?? Infinity;
this.cooldownMs = options.cooldownMs ?? 0;
this.shouldExecuteFn = options.shouldExecute;
}
public getConstraintKey(task: Task<any, any, TData>): string | null {
const key = this.constraintKeyForTask(task);
public getConstraintKey(task: Task<any, any, TData>, input?: any): string | null {
const key = this.constraintKeyForExecution(task, input);
return key ?? null;
}
public async checkShouldExecute(task: Task<any, any, TData>, input?: any): Promise<boolean> {
if (!this.shouldExecuteFn) {
return true;
}
return this.shouldExecuteFn(task, input);
}
public canRun(subGroupKey: string): boolean {
const running = this.runningCounts.get(subGroupKey) ?? 0;
if (running >= this.maxConcurrent) {

View File

@@ -80,14 +80,18 @@ export class TaskManager {
// Gather applicable constraints
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(task);
const key = group.getConstraintKey(task, input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply → trigger directly
// No constraints apply → check shouldExecute then trigger directly
if (applicableGroups.length === 0) {
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
return undefined;
}
return task.trigger(input);
}
@@ -97,22 +101,47 @@ export class TaskManager {
return this.executeWithConstraintTracking(task, input, applicableGroups);
}
// Blocked → enqueue with deferred promise
// Blocked → enqueue with deferred promise and cached constraint keys
const deferred = plugins.smartpromise.defer<any>();
this.constraintQueue.push({ task, input, deferred });
const constraintKeys = new Map<string, string>();
for (const { group, key } of applicableGroups) {
constraintKeys.set(group.name, key);
}
this.constraintQueue.push({ task, input, deferred, constraintKeys });
return deferred.promise;
}
private async checkAllShouldExecute(task: Task<any, any, any>, input?: any): Promise<boolean> {
for (const group of this.constraintGroups) {
const shouldRun = await group.checkShouldExecute(task, input);
if (!shouldRun) {
return false;
}
}
return true;
}
private async executeWithConstraintTracking(
task: Task<any, any, any>,
input: any,
groups: Array<{ group: TaskConstraintGroup<any>; key: string }>,
): Promise<any> {
// Acquire slots
// Acquire slots synchronously to prevent race conditions
for (const { group, key } of groups) {
group.acquireSlot(key);
}
// Check shouldExecute after acquiring slots
const shouldRun = await this.checkAllShouldExecute(task, input);
if (!shouldRun) {
// Release slots and drain queue
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
return undefined;
}
try {
return await task.trigger(input);
} finally {
@@ -131,23 +160,30 @@ export class TaskManager {
for (const entry of this.constraintQueue) {
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(entry.task);
const key = group.getConstraintKey(entry.task, entry.input);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply anymore (group removed?) → run directly
// No constraints apply anymore (group removed?) → check shouldExecute then run
if (applicableGroups.length === 0) {
entry.task.trigger(entry.input).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
this.checkAllShouldExecute(entry.task, entry.input).then((shouldRun) => {
if (!shouldRun) {
entry.deferred.resolve(undefined);
return;
}
entry.task.trigger(entry.input).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
});
continue;
}
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
// executeWithConstraintTracking handles shouldExecute check internally
this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),

View File

@@ -3,15 +3,22 @@ import type { Task } from './taskbuffer.classes.task.js';
export interface ITaskConstraintGroupOptions<TData extends Record<string, unknown> = Record<string, unknown>> {
name: string;
constraintKeyForTask: (task: Task<any, any, TData>) => string | null | undefined;
constraintKeyForExecution: (task: Task<any, any, TData>, input?: any) => string | null | undefined;
maxConcurrent?: number; // default: Infinity
cooldownMs?: number; // default: 0
shouldExecute?: (task: Task<any, any, TData>, input?: any) => boolean | Promise<boolean>;
}
export interface ITaskExecution<TData extends Record<string, unknown> = Record<string, unknown>> {
task: Task<any, any, TData>;
input: any;
}
export interface IConstrainedTaskEntry {
task: Task<any, any, any>;
input: any;
deferred: import('@push.rocks/smartpromise').Deferred<any>;
constraintKeys: Map<string, string>; // groupName -> key
}
export interface ITaskMetadata {