BREAKING CHANGE(taskbuffer): Introduce constraint-based concurrency with TaskConstraintGroup and TaskManager integration; remove legacy TaskRunner and several Task APIs (breaking); add typed Task.data and update exports and tests.

This commit is contained in:
2026-02-15 12:20:01 +00:00
parent 9d78933a46
commit d3e8ff1a11
16 changed files with 924 additions and 296 deletions

View File

@@ -1,10 +1,11 @@
import * as plugins from './taskbuffer.plugins.js';
import { Task } from './taskbuffer.classes.task.js';
import { TaskConstraintGroup } from './taskbuffer.classes.taskconstraintgroup.js';
import {
AbstractDistributedCoordinator,
type IDistributedTaskRequestResult,
} from './taskbuffer.classes.distributedcoordinator.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent } from './taskbuffer.interfaces.js';
import type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, IConstrainedTaskEntry } from './taskbuffer.interfaces.js';
import { logger } from './taskbuffer.logging.js';
export interface ICronJob {
@@ -19,23 +20,28 @@ export interface ITaskManagerConstructorOptions {
export class TaskManager {
public randomId = plugins.smartunique.shortId();
public taskMap = new plugins.lik.ObjectMap<Task<any, any>>();
public taskMap = new plugins.lik.ObjectMap<Task<any, any, any>>();
public readonly taskSubject = new plugins.smartrx.rxjs.Subject<ITaskEvent>();
private taskSubscriptions = new Map<Task<any, any>, plugins.smartrx.rxjs.Subscription>();
private taskSubscriptions = new Map<Task<any, any, any>, plugins.smartrx.rxjs.Subscription>();
private cronJobManager = new plugins.smarttime.CronManager();
public options: ITaskManagerConstructorOptions = {
distributedCoordinator: null,
};
// Constraint system
public constraintGroups: TaskConstraintGroup<any>[] = [];
private constraintQueue: IConstrainedTaskEntry[] = [];
private drainTimer: ReturnType<typeof setTimeout> | null = null;
constructor(options: ITaskManagerConstructorOptions = {}) {
this.options = Object.assign(this.options, options);
}
public getTaskByName(taskName: string): Task<any, any> {
public getTaskByName(taskName: string): Task<any, any, any> {
return this.taskMap.findSync((task) => task.name === taskName);
}
public addTask(task: Task<any, any>): void {
public addTask(task: Task<any, any, any>): void {
if (!task.name) {
throw new Error('Task must have a name to be added to taskManager');
}
@@ -46,7 +52,7 @@ export class TaskManager {
this.taskSubscriptions.set(task, subscription);
}
public removeTask(task: Task<any, any>): void {
public removeTask(task: Task<any, any, any>): void {
this.taskMap.remove(task);
const subscription = this.taskSubscriptions.get(task);
if (subscription) {
@@ -55,21 +61,134 @@ export class TaskManager {
}
}
public addAndScheduleTask(task: Task<any, any>, cronString: string) {
public addAndScheduleTask(task: Task<any, any, any>, cronString: string) {
this.addTask(task);
this.scheduleTaskByName(task.name, cronString);
}
// Constraint group management
public addConstraintGroup(group: TaskConstraintGroup<any>): void {
this.constraintGroups.push(group);
}
public removeConstraintGroup(name: string): void {
this.constraintGroups = this.constraintGroups.filter((g) => g.name !== name);
}
// Core constraint evaluation
public async triggerTaskConstrained(task: Task<any, any, any>, input?: any): Promise<any> {
// Gather applicable constraints
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(task);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply → trigger directly
if (applicableGroups.length === 0) {
return task.trigger(input);
}
// Check if all constraints allow running
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
return this.executeWithConstraintTracking(task, input, applicableGroups);
}
// Blocked → enqueue with deferred promise
const deferred = plugins.smartpromise.defer<any>();
this.constraintQueue.push({ task, input, deferred });
return deferred.promise;
}
private async executeWithConstraintTracking(
task: Task<any, any, any>,
input: any,
groups: Array<{ group: TaskConstraintGroup<any>; key: string }>,
): Promise<any> {
// Acquire slots
for (const { group, key } of groups) {
group.acquireSlot(key);
}
try {
return await task.trigger(input);
} finally {
// Release slots
for (const { group, key } of groups) {
group.releaseSlot(key);
}
this.drainConstraintQueue();
}
}
private drainConstraintQueue(): void {
let shortestCooldown = Infinity;
const stillQueued: IConstrainedTaskEntry[] = [];
for (const entry of this.constraintQueue) {
const applicableGroups: Array<{ group: TaskConstraintGroup<any>; key: string }> = [];
for (const group of this.constraintGroups) {
const key = group.getConstraintKey(entry.task);
if (key !== null) {
applicableGroups.push({ group, key });
}
}
// No constraints apply anymore (group removed?) → run directly
if (applicableGroups.length === 0) {
entry.task.trigger(entry.input).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
continue;
}
const allCanRun = applicableGroups.every(({ group, key }) => group.canRun(key));
if (allCanRun) {
this.executeWithConstraintTracking(entry.task, entry.input, applicableGroups).then(
(result) => entry.deferred.resolve(result),
(err) => entry.deferred.reject(err),
);
} else {
stillQueued.push(entry);
// Track shortest cooldown for timer scheduling
for (const { group, key } of applicableGroups) {
const remaining = group.getCooldownRemaining(key);
if (remaining > 0 && remaining < shortestCooldown) {
shortestCooldown = remaining;
}
}
}
}
this.constraintQueue = stillQueued;
// Schedule next drain if there are cooldown-blocked entries
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
if (stillQueued.length > 0 && shortestCooldown < Infinity) {
this.drainTimer = setTimeout(() => {
this.drainTimer = null;
this.drainConstraintQueue();
}, shortestCooldown + 1);
}
}
public async triggerTaskByName(taskName: string): Promise<any> {
const taskToTrigger = this.getTaskByName(taskName);
if (!taskToTrigger) {
throw new Error(`No task with the name ${taskName} found.`);
}
return taskToTrigger.trigger();
return this.triggerTaskConstrained(taskToTrigger);
}
public async triggerTask(task: Task<any, any>) {
return task.trigger();
public async triggerTask(task: Task<any, any, any>) {
return this.triggerTaskConstrained(task);
}
public scheduleTaskByName(taskName: string, cronString: string) {
@@ -80,7 +199,7 @@ export class TaskManager {
this.handleTaskScheduling(taskToSchedule, cronString);
}
private handleTaskScheduling(task: Task<any, any>, cronString: string) {
private handleTaskScheduling(task: Task<any, any, any>, cronString: string) {
const cronJob = this.cronJobManager.addCronjob(
cronString,
async (triggerTime: number) => {
@@ -98,7 +217,7 @@ export class TaskManager {
}
}
try {
await task.trigger();
await this.triggerTaskConstrained(task);
} catch (err) {
logger.log('error', `TaskManager: scheduled task "${task.name || 'unnamed'}" failed: ${err instanceof Error ? err.message : String(err)}`);
}
@@ -107,7 +226,7 @@ export class TaskManager {
task.cronJob = cronJob;
}
private logTaskState(task: Task<any, any>) {
private logTaskState(task: Task<any, any, any>) {
logger.log('info', `Taskbuffer schedule triggered task >>${task.name}<<`);
const bufferState = task.buffered
? `buffered with max ${task.bufferMax} buffered calls`
@@ -116,7 +235,7 @@ export class TaskManager {
}
private async performDistributedConsultation(
task: Task<any, any>,
task: Task<any, any, any>,
triggerTime: number,
): Promise<IDistributedTaskRequestResult> {
logger.log('info', 'Found a distributed coordinator, performing consultation.');
@@ -144,7 +263,7 @@ export class TaskManager {
}
}
public async descheduleTask(task: Task<any, any>) {
public async descheduleTask(task: Task<any, any, any>) {
await this.descheduleTaskByName(task.name);
}
@@ -169,6 +288,10 @@ export class TaskManager {
subscription.unsubscribe();
}
this.taskSubscriptions.clear();
if (this.drainTimer) {
clearTimeout(this.drainTimer);
this.drainTimer = null;
}
}
// Get metadata for a specific task
@@ -186,7 +309,7 @@ export class TaskManager {
// Get scheduled tasks with their schedules and next run times
public getScheduledTasks(): IScheduledTaskInfo[] {
const scheduledTasks: IScheduledTaskInfo[] = [];
for (const task of this.taskMap.getArray()) {
if (task.cronJob) {
scheduledTasks.push({
@@ -199,7 +322,7 @@ export class TaskManager {
});
}
}
return scheduledTasks;
}
@@ -213,11 +336,11 @@ export class TaskManager {
}))
.sort((a, b) => a.nextRun.getTime() - b.nextRun.getTime())
.slice(0, limit);
return scheduledRuns;
}
public getTasksByLabel(key: string, value: string): Task<any, any>[] {
public getTasksByLabel(key: string, value: string): Task<any, any, any>[] {
return this.taskMap.getArray().filter(task => task.labels[key] === value);
}
@@ -227,7 +350,7 @@ export class TaskManager {
// Add, execute, and remove a task while collecting metadata
public async addExecuteRemoveTask<T, TSteps extends ReadonlyArray<{ name: string; description: string; percentage: number }>>(
task: Task<T, TSteps>,
task: Task<T, TSteps, any>,
options?: {
schedule?: string;
trackProgress?: boolean;
@@ -235,19 +358,18 @@ export class TaskManager {
): Promise<ITaskExecutionReport> {
// Add task to manager
this.addTask(task);
// Optionally schedule it
if (options?.schedule) {
this.scheduleTaskByName(task.name!, options.schedule);
}
const startTime = Date.now();
const progressUpdates: Array<{ stepName: string; timestamp: number }> = [];
try {
// Execute the task
const result = await task.trigger();
// Execute the task through constraints
const result = await this.triggerTaskConstrained(task);
// Collect execution report
const report: ITaskExecutionReport = {
taskName: task.name || 'unnamed',
@@ -261,15 +383,15 @@ export class TaskManager {
progress: task.getProgress(),
result,
};
// Remove task from manager
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
return report;
} catch (error) {
// Create error report
@@ -285,15 +407,15 @@ export class TaskManager {
progress: task.getProgress(),
error: error as Error,
};
// Remove task from manager even on error
this.removeTask(task);
// Deschedule if it was scheduled
if (options?.schedule && task.name) {
this.descheduleTaskByName(task.name);
}
throw errorReport;
}
}