diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index c15bbba..1968d97 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/lik', - version: '6.0.3', + version: '6.0.4', description: 'light little helpers for node' } diff --git a/ts/lik.asyncexecutionstack.ts b/ts/lik.asyncexecutionstack.ts index aee3cf1..4cb76cc 100644 --- a/ts/lik.asyncexecutionstack.ts +++ b/ts/lik.asyncexecutionstack.ts @@ -7,12 +7,14 @@ interface IExecutionSlot { mode: 'exclusive' | 'nonexclusive'; } -/** - * allows for avoiding race condition - */ export class AsyncExecutionStack { private executionSlots: IExecutionSlot[] = []; - public async getExclusiveExecutionSlot(funcArg: () => Promise, timeoutArg?: number) { + private isProcessing = false; + + public async getExclusiveExecutionSlot( + funcArg: () => Promise, + timeoutArg?: number + ): Promise { const executionDeferred = plugins.smartpromise.defer(); const executionSlot: IExecutionSlot = { funcToExecute: funcArg, @@ -24,10 +26,11 @@ export class AsyncExecutionStack { this.processExecutionSlots(); return executionDeferred.promise; } + public async getNonExclusiveExecutionSlot( funcArg: () => Promise, timeoutArg?: number - ) { + ): Promise { const executionDeferred = plugins.smartpromise.defer(); const executionSlot: IExecutionSlot = { funcToExecute: funcArg, @@ -40,43 +43,68 @@ export class AsyncExecutionStack { return executionDeferred.promise; } - private currentlyExecutingDeferred: plugins.smartpromise.Deferred; private async processExecutionSlots() { - if (this.currentlyExecutingDeferred) { + if (this.isProcessing) { return; } - this.currentlyExecutingDeferred = plugins.smartpromise.defer(); - let nonExclusiveRunningSlots: IExecutionSlot[] = []; - const checkNonExclusiveRunningSlots = async (cleanArg = false) => { - if (nonExclusiveRunningSlots.length > 100 || cleanArg) { - await Promise.all(nonExclusiveRunningSlots.map(nonExclusiveRunningSlotArg => nonExclusiveRunningSlotArg.executionDeferred.promise)); - nonExclusiveRunningSlots = []; - } - }; + this.isProcessing = true; + while (this.executionSlots.length > 0) { - const nextExecutionSlot = this.executionSlots.shift(); - const runNextExecution = async () => { - if (nextExecutionSlot.timeout) { - const result = await Promise.race([ - nextExecutionSlot.funcToExecute(), - plugins.smartdelay.delayFor(nextExecutionSlot.timeout), - ]); - nextExecutionSlot.executionDeferred.resolve(result); - } else { - nextExecutionSlot.executionDeferred.resolve(await nextExecutionSlot.funcToExecute()); - } - }; - if (nextExecutionSlot.mode === 'exclusive') { - await checkNonExclusiveRunningSlots(true); - await runNextExecution(); + const currentSlot = this.executionSlots[0]; + if (currentSlot.mode === 'exclusive') { + await this.executeExclusiveSlot(currentSlot); + this.executionSlots.shift(); } else { - nonExclusiveRunningSlots.push(nextExecutionSlot); - await checkNonExclusiveRunningSlots(false); - runNextExecution(); + // Gather all non-exclusive slots at the front of the queue + const nonExclusiveSlots: IExecutionSlot[] = []; + while (this.executionSlots.length > 0 && this.executionSlots[0].mode === 'nonexclusive') { + nonExclusiveSlots.push(this.executionSlots.shift()!); + } + await this.executeNonExclusiveSlots(nonExclusiveSlots); } } - this.currentlyExecutingDeferred.resolve(); - this.currentlyExecutingDeferred = null; - + this.isProcessing = false; + } + + private async executeExclusiveSlot(slot: IExecutionSlot) { + try { + if (slot.timeout) { + const result = await Promise.race([ + slot.funcToExecute(), + plugins.smartdelay.delayFor(slot.timeout).then(() => { + throw new Error('Timeout reached'); + }), + ]); + slot.executionDeferred.resolve(result); + } else { + const result = await slot.funcToExecute(); + slot.executionDeferred.resolve(result); + } + } catch (error) { + slot.executionDeferred.reject(error); + } + } + + private async executeNonExclusiveSlots(slots: IExecutionSlot[]) { + const promises = slots.map(async (slot) => { + try { + if (slot.timeout) { + const result = await Promise.race([ + slot.funcToExecute(), + plugins.smartdelay.delayFor(slot.timeout).then(() => { + throw new Error('Timeout reached'); + }), + ]); + slot.executionDeferred.resolve(result); + } else { + const result = await slot.funcToExecute(); + slot.executionDeferred.resolve(result); + } + } catch (error) { + slot.executionDeferred.reject(error); + } + }); + + await Promise.all(promises); } }