feat(AsyncExecutionStack): Improve non-exclusive task management with concurrency limit controls and enhanced monitoring in AsyncExecutionStack.

This commit is contained in:
2025-04-25 18:57:26 +00:00
parent 5d9624bd56
commit 84babb3cd4
6 changed files with 8405 additions and 3256 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/lik',
version: '6.1.0',
version: '6.2.0',
description: 'Provides a collection of lightweight helpers and utilities for Node.js projects.'
}

View File

@@ -10,6 +10,12 @@ interface IExecutionSlot<T> {
export class AsyncExecutionStack {
private executionSlots: IExecutionSlot<any>[] = [];
private isProcessing = false;
/** Maximum concurrent non-exclusive tasks (Infinity = unlimited) */
private nonExclusiveMaxConcurrency: number = Infinity;
/** Currently running non-exclusive task count */
private nonExclusiveCurrentCount: number = 0;
/** Queue of resolvers waiting for a non-exclusive slot */
private nonExclusivePendingQueue: Array<() => void> = [];
public async getExclusiveExecutionSlot<T = any>(
funcArg: () => Promise<T>,
@@ -42,6 +48,28 @@ export class AsyncExecutionStack {
this.processExecutionSlots();
return executionDeferred.promise;
}
/**
* Set the maximum number of concurrent non-exclusive tasks.
* @param concurrency minimum 1 (Infinity means unlimited)
*/
public setNonExclusiveMaxConcurrency(concurrency: number): void {
if (!Number.isFinite(concurrency) || concurrency < 1) {
throw new Error('nonExclusiveMaxConcurrency must be a finite number >= 1');
}
this.nonExclusiveMaxConcurrency = concurrency;
}
/** Get the configured max concurrency for non-exclusive tasks */
public getNonExclusiveMaxConcurrency(): number {
return this.nonExclusiveMaxConcurrency;
}
/** Number of non-exclusive tasks currently running */
public getActiveNonExclusiveCount(): number {
return this.nonExclusiveCurrentCount;
}
/** Number of non-exclusive tasks waiting for a free slot */
public getPendingNonExclusiveCount(): number {
return this.nonExclusivePendingQueue.length;
}
private async processExecutionSlots() {
if (this.isProcessing) {
@@ -87,13 +115,14 @@ export class AsyncExecutionStack {
private async executeNonExclusiveSlots(slots: IExecutionSlot<any>[]) {
const promises = slots.map(async (slot) => {
// wait for an available non-exclusive slot
await this.waitForNonExclusiveSlot();
try {
// execute with optional timeout
if (slot.timeout) {
const result = await Promise.race([
slot.funcToExecute(),
plugins.smartdelay.delayFor(slot.timeout).then(() => {
throw new Error('Timeout reached');
}),
plugins.smartdelay.delayFor(slot.timeout).then(() => { throw new Error('Timeout reached'); }),
]);
slot.executionDeferred.resolve(result);
} else {
@@ -102,9 +131,33 @@ export class AsyncExecutionStack {
}
} catch (error) {
slot.executionDeferred.reject(error);
} finally {
this.releaseNonExclusiveSlot();
}
});
await Promise.all(promises);
}
/**
* Wait until a non-exclusive slot is available (respects max concurrency).
*/
private waitForNonExclusiveSlot(): Promise<void> {
if (this.nonExclusiveCurrentCount < this.nonExclusiveMaxConcurrency) {
this.nonExclusiveCurrentCount++;
return Promise.resolve();
}
return new Promise((resolve) => {
this.nonExclusivePendingQueue.push(() => {
this.nonExclusiveCurrentCount++;
resolve();
});
});
}
/** Release a non-exclusive slot and wake the next waiter, if any. */
private releaseNonExclusiveSlot(): void {
this.nonExclusiveCurrentCount--;
const next = this.nonExclusivePendingQueue.shift();
if (next) {
next();
}
}
}