lik/ts/classes.asyncexecutionstack.ts

164 lines
5.3 KiB
TypeScript
Raw Normal View History

2024-02-25 13:01:06 +01:00
import * as plugins from './classes.plugins.js';
2023-01-18 17:45:19 +01:00
interface IExecutionSlot<T> {
executionDeferred: plugins.smartpromise.Deferred<T>;
funcToExecute: () => Promise<T>;
timeout?: number;
mode: 'exclusive' | 'nonexclusive';
}
2023-01-18 12:18:47 +01:00
export class AsyncExecutionStack {
2023-01-18 17:45:19 +01:00
private executionSlots: IExecutionSlot<any>[] = [];
2023-08-14 19:27:28 +02:00
private isProcessing = false;
/** Maximum concurrent non-exclusive tasks (Infinity = unlimited) */
private nonExclusiveMaxConcurrency: number = Infinity;
/** Currently running non-exclusive task count */
private nonExclusiveCurrentCount: number = 0;
/** Queue of resolvers waiting for a non-exclusive slot */
private nonExclusivePendingQueue: Array<() => void> = [];
2023-08-14 19:27:28 +02:00
public async getExclusiveExecutionSlot<T = any>(
funcArg: () => Promise<T>,
timeoutArg?: number
): Promise<T> {
2023-01-18 17:45:19 +01:00
const executionDeferred = plugins.smartpromise.defer<T>();
const executionSlot: IExecutionSlot<T> = {
funcToExecute: funcArg,
executionDeferred,
timeout: timeoutArg,
mode: 'exclusive',
};
this.executionSlots.push(executionSlot);
this.processExecutionSlots();
return executionDeferred.promise;
}
2023-08-14 19:27:28 +02:00
2023-01-18 17:45:19 +01:00
public async getNonExclusiveExecutionSlot<T = any>(
funcArg: () => Promise<T>,
timeoutArg?: number
2023-08-14 19:27:28 +02:00
): Promise<T> {
2023-01-18 17:45:19 +01:00
const executionDeferred = plugins.smartpromise.defer<T>();
const executionSlot: IExecutionSlot<T> = {
funcToExecute: funcArg,
executionDeferred,
timeout: timeoutArg,
mode: 'nonexclusive',
};
this.executionSlots.push(executionSlot);
this.processExecutionSlots();
return executionDeferred.promise;
}
/**
* Set the maximum number of concurrent non-exclusive tasks.
* @param concurrency minimum 1 (Infinity means unlimited)
*/
public setNonExclusiveMaxConcurrency(concurrency: number): void {
if (!Number.isFinite(concurrency) || concurrency < 1) {
throw new Error('nonExclusiveMaxConcurrency must be a finite number >= 1');
}
this.nonExclusiveMaxConcurrency = concurrency;
}
/** Get the configured max concurrency for non-exclusive tasks */
public getNonExclusiveMaxConcurrency(): number {
return this.nonExclusiveMaxConcurrency;
}
/** Number of non-exclusive tasks currently running */
public getActiveNonExclusiveCount(): number {
return this.nonExclusiveCurrentCount;
}
/** Number of non-exclusive tasks waiting for a free slot */
public getPendingNonExclusiveCount(): number {
return this.nonExclusivePendingQueue.length;
}
2023-01-18 17:45:19 +01:00
private async processExecutionSlots() {
2023-08-14 19:27:28 +02:00
if (this.isProcessing) {
2023-01-18 17:45:19 +01:00
return;
}
2023-08-14 19:27:28 +02:00
this.isProcessing = true;
2023-01-18 17:45:19 +01:00
while (this.executionSlots.length > 0) {
2023-08-14 19:27:28 +02:00
const currentSlot = this.executionSlots[0];
if (currentSlot.mode === 'exclusive') {
await this.executeExclusiveSlot(currentSlot);
this.executionSlots.shift();
} else {
// Gather all non-exclusive slots at the front of the queue
const nonExclusiveSlots: IExecutionSlot<any>[] = [];
while (this.executionSlots.length > 0 && this.executionSlots[0].mode === 'nonexclusive') {
nonExclusiveSlots.push(this.executionSlots.shift()!);
}
await this.executeNonExclusiveSlots(nonExclusiveSlots);
}
}
this.isProcessing = false;
}
private async executeExclusiveSlot(slot: IExecutionSlot<any>) {
try {
if (slot.timeout) {
const result = await Promise.race([
slot.funcToExecute(),
plugins.smartdelay.delayFor(slot.timeout).then(() => {
throw new Error('Timeout reached');
}),
]);
slot.executionDeferred.resolve(result);
} else {
const result = await slot.funcToExecute();
slot.executionDeferred.resolve(result);
}
} catch (error) {
slot.executionDeferred.reject(error);
}
}
private async executeNonExclusiveSlots(slots: IExecutionSlot<any>[]) {
const promises = slots.map(async (slot) => {
// wait for an available non-exclusive slot
await this.waitForNonExclusiveSlot();
2023-08-14 19:27:28 +02:00
try {
// execute with optional timeout
2023-08-14 19:27:28 +02:00
if (slot.timeout) {
2023-01-18 17:45:19 +01:00
const result = await Promise.race([
2023-08-14 19:27:28 +02:00
slot.funcToExecute(),
plugins.smartdelay.delayFor(slot.timeout).then(() => { throw new Error('Timeout reached'); }),
2023-01-18 17:45:19 +01:00
]);
2023-08-14 19:27:28 +02:00
slot.executionDeferred.resolve(result);
2023-01-18 12:18:47 +01:00
} else {
2023-08-14 19:27:28 +02:00
const result = await slot.funcToExecute();
slot.executionDeferred.resolve(result);
2023-01-18 12:18:47 +01:00
}
2023-08-14 19:27:28 +02:00
} catch (error) {
slot.executionDeferred.reject(error);
} finally {
this.releaseNonExclusiveSlot();
2023-01-18 12:18:47 +01:00
}
2023-08-14 19:27:28 +02:00
});
await Promise.all(promises);
2023-01-18 17:45:19 +01:00
}
/**
* Wait until a non-exclusive slot is available (respects max concurrency).
*/
private waitForNonExclusiveSlot(): Promise<void> {
if (this.nonExclusiveCurrentCount < this.nonExclusiveMaxConcurrency) {
this.nonExclusiveCurrentCount++;
return Promise.resolve();
}
return new Promise((resolve) => {
this.nonExclusivePendingQueue.push(() => {
this.nonExclusiveCurrentCount++;
resolve();
});
});
}
/** Release a non-exclusive slot and wake the next waiter, if any. */
private releaseNonExclusiveSlot(): void {
this.nonExclusiveCurrentCount--;
const next = this.nonExclusivePendingQueue.shift();
if (next) {
next();
}
}
2023-01-18 17:45:19 +01:00
}