164 lines
5.3 KiB
TypeScript
164 lines
5.3 KiB
TypeScript
import * as plugins from './classes.plugins.js';
|
|
|
|
interface IExecutionSlot<T> {
|
|
executionDeferred: plugins.smartpromise.Deferred<T>;
|
|
funcToExecute: () => Promise<T>;
|
|
timeout?: number;
|
|
mode: 'exclusive' | 'nonexclusive';
|
|
}
|
|
|
|
export class AsyncExecutionStack {
|
|
private executionSlots: IExecutionSlot<any>[] = [];
|
|
private isProcessing = false;
|
|
/** Maximum concurrent non-exclusive tasks (Infinity = unlimited) */
|
|
private nonExclusiveMaxConcurrency: number = Infinity;
|
|
/** Currently running non-exclusive task count */
|
|
private nonExclusiveCurrentCount: number = 0;
|
|
/** Queue of resolvers waiting for a non-exclusive slot */
|
|
private nonExclusivePendingQueue: Array<() => void> = [];
|
|
|
|
public async getExclusiveExecutionSlot<T = any>(
|
|
funcArg: () => Promise<T>,
|
|
timeoutArg?: number
|
|
): Promise<T> {
|
|
const executionDeferred = plugins.smartpromise.defer<T>();
|
|
const executionSlot: IExecutionSlot<T> = {
|
|
funcToExecute: funcArg,
|
|
executionDeferred,
|
|
timeout: timeoutArg,
|
|
mode: 'exclusive',
|
|
};
|
|
this.executionSlots.push(executionSlot);
|
|
this.processExecutionSlots();
|
|
return executionDeferred.promise;
|
|
}
|
|
|
|
public async getNonExclusiveExecutionSlot<T = any>(
|
|
funcArg: () => Promise<T>,
|
|
timeoutArg?: number
|
|
): Promise<T> {
|
|
const executionDeferred = plugins.smartpromise.defer<T>();
|
|
const executionSlot: IExecutionSlot<T> = {
|
|
funcToExecute: funcArg,
|
|
executionDeferred,
|
|
timeout: timeoutArg,
|
|
mode: 'nonexclusive',
|
|
};
|
|
this.executionSlots.push(executionSlot);
|
|
this.processExecutionSlots();
|
|
return executionDeferred.promise;
|
|
}
|
|
/**
|
|
* Set the maximum number of concurrent non-exclusive tasks.
|
|
* @param concurrency minimum 1 (Infinity means unlimited)
|
|
*/
|
|
public setNonExclusiveMaxConcurrency(concurrency: number): void {
|
|
if (!Number.isFinite(concurrency) || concurrency < 1) {
|
|
throw new Error('nonExclusiveMaxConcurrency must be a finite number >= 1');
|
|
}
|
|
this.nonExclusiveMaxConcurrency = concurrency;
|
|
}
|
|
/** Get the configured max concurrency for non-exclusive tasks */
|
|
public getNonExclusiveMaxConcurrency(): number {
|
|
return this.nonExclusiveMaxConcurrency;
|
|
}
|
|
/** Number of non-exclusive tasks currently running */
|
|
public getActiveNonExclusiveCount(): number {
|
|
return this.nonExclusiveCurrentCount;
|
|
}
|
|
/** Number of non-exclusive tasks waiting for a free slot */
|
|
public getPendingNonExclusiveCount(): number {
|
|
return this.nonExclusivePendingQueue.length;
|
|
}
|
|
|
|
private async processExecutionSlots() {
|
|
if (this.isProcessing) {
|
|
return;
|
|
}
|
|
this.isProcessing = true;
|
|
|
|
while (this.executionSlots.length > 0) {
|
|
const currentSlot = this.executionSlots[0];
|
|
if (currentSlot.mode === 'exclusive') {
|
|
await this.executeExclusiveSlot(currentSlot);
|
|
this.executionSlots.shift();
|
|
} else {
|
|
// Gather all non-exclusive slots at the front of the queue
|
|
const nonExclusiveSlots: IExecutionSlot<any>[] = [];
|
|
while (this.executionSlots.length > 0 && this.executionSlots[0].mode === 'nonexclusive') {
|
|
nonExclusiveSlots.push(this.executionSlots.shift()!);
|
|
}
|
|
await this.executeNonExclusiveSlots(nonExclusiveSlots);
|
|
}
|
|
}
|
|
this.isProcessing = false;
|
|
}
|
|
|
|
private async executeExclusiveSlot(slot: IExecutionSlot<any>) {
|
|
try {
|
|
if (slot.timeout) {
|
|
const result = await Promise.race([
|
|
slot.funcToExecute(),
|
|
plugins.smartdelay.delayFor(slot.timeout).then(() => {
|
|
throw new Error('Timeout reached');
|
|
}),
|
|
]);
|
|
slot.executionDeferred.resolve(result);
|
|
} else {
|
|
const result = await slot.funcToExecute();
|
|
slot.executionDeferred.resolve(result);
|
|
}
|
|
} catch (error) {
|
|
slot.executionDeferred.reject(error);
|
|
}
|
|
}
|
|
|
|
private async executeNonExclusiveSlots(slots: IExecutionSlot<any>[]) {
|
|
const promises = slots.map(async (slot) => {
|
|
// wait for an available non-exclusive slot
|
|
await this.waitForNonExclusiveSlot();
|
|
try {
|
|
// execute with optional timeout
|
|
if (slot.timeout) {
|
|
const result = await Promise.race([
|
|
slot.funcToExecute(),
|
|
plugins.smartdelay.delayFor(slot.timeout).then(() => { throw new Error('Timeout reached'); }),
|
|
]);
|
|
slot.executionDeferred.resolve(result);
|
|
} else {
|
|
const result = await slot.funcToExecute();
|
|
slot.executionDeferred.resolve(result);
|
|
}
|
|
} catch (error) {
|
|
slot.executionDeferred.reject(error);
|
|
} finally {
|
|
this.releaseNonExclusiveSlot();
|
|
}
|
|
});
|
|
await Promise.all(promises);
|
|
}
|
|
/**
|
|
* Wait until a non-exclusive slot is available (respects max concurrency).
|
|
*/
|
|
private waitForNonExclusiveSlot(): Promise<void> {
|
|
if (this.nonExclusiveCurrentCount < this.nonExclusiveMaxConcurrency) {
|
|
this.nonExclusiveCurrentCount++;
|
|
return Promise.resolve();
|
|
}
|
|
return new Promise((resolve) => {
|
|
this.nonExclusivePendingQueue.push(() => {
|
|
this.nonExclusiveCurrentCount++;
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
/** Release a non-exclusive slot and wake the next waiter, if any. */
|
|
private releaseNonExclusiveSlot(): void {
|
|
this.nonExclusiveCurrentCount--;
|
|
const next = this.nonExclusivePendingQueue.shift();
|
|
if (next) {
|
|
next();
|
|
}
|
|
}
|
|
}
|