fix(core): update

This commit is contained in:
Philipp Kunz 2023-08-14 19:27:28 +02:00
parent 775a307056
commit 66676d89a5
2 changed files with 65 additions and 37 deletions

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/lik', name: '@push.rocks/lik',
version: '6.0.3', version: '6.0.4',
description: 'light little helpers for node' description: 'light little helpers for node'
} }

View File

@ -7,12 +7,14 @@ interface IExecutionSlot<T> {
mode: 'exclusive' | 'nonexclusive'; mode: 'exclusive' | 'nonexclusive';
} }
/**
* allows for avoiding race condition
*/
export class AsyncExecutionStack { export class AsyncExecutionStack {
private executionSlots: IExecutionSlot<any>[] = []; private executionSlots: IExecutionSlot<any>[] = [];
public async getExclusiveExecutionSlot<T = any>(funcArg: () => Promise<T>, timeoutArg?: number) { private isProcessing = false;
public async getExclusiveExecutionSlot<T = any>(
funcArg: () => Promise<T>,
timeoutArg?: number
): Promise<T> {
const executionDeferred = plugins.smartpromise.defer<T>(); const executionDeferred = plugins.smartpromise.defer<T>();
const executionSlot: IExecutionSlot<T> = { const executionSlot: IExecutionSlot<T> = {
funcToExecute: funcArg, funcToExecute: funcArg,
@ -24,10 +26,11 @@ export class AsyncExecutionStack {
this.processExecutionSlots(); this.processExecutionSlots();
return executionDeferred.promise; return executionDeferred.promise;
} }
public async getNonExclusiveExecutionSlot<T = any>( public async getNonExclusiveExecutionSlot<T = any>(
funcArg: () => Promise<T>, funcArg: () => Promise<T>,
timeoutArg?: number timeoutArg?: number
) { ): Promise<T> {
const executionDeferred = plugins.smartpromise.defer<T>(); const executionDeferred = plugins.smartpromise.defer<T>();
const executionSlot: IExecutionSlot<T> = { const executionSlot: IExecutionSlot<T> = {
funcToExecute: funcArg, funcToExecute: funcArg,
@ -40,43 +43,68 @@ export class AsyncExecutionStack {
return executionDeferred.promise; return executionDeferred.promise;
} }
private currentlyExecutingDeferred: plugins.smartpromise.Deferred<any>;
private async processExecutionSlots() { private async processExecutionSlots() {
if (this.currentlyExecutingDeferred) { if (this.isProcessing) {
return; return;
} }
this.currentlyExecutingDeferred = plugins.smartpromise.defer(); this.isProcessing = true;
let nonExclusiveRunningSlots: IExecutionSlot<any>[] = [];
const checkNonExclusiveRunningSlots = async (cleanArg = false) => {
if (nonExclusiveRunningSlots.length > 100 || cleanArg) {
await Promise.all(nonExclusiveRunningSlots.map(nonExclusiveRunningSlotArg => nonExclusiveRunningSlotArg.executionDeferred.promise));
nonExclusiveRunningSlots = [];
}
};
while (this.executionSlots.length > 0) {
const nextExecutionSlot = this.executionSlots.shift();
const runNextExecution = async () => {
if (nextExecutionSlot.timeout) {
const result = await Promise.race([
nextExecutionSlot.funcToExecute(),
plugins.smartdelay.delayFor(nextExecutionSlot.timeout),
]);
nextExecutionSlot.executionDeferred.resolve(result);
} else {
nextExecutionSlot.executionDeferred.resolve(await nextExecutionSlot.funcToExecute());
}
};
if (nextExecutionSlot.mode === 'exclusive') {
await checkNonExclusiveRunningSlots(true);
await runNextExecution();
} else {
nonExclusiveRunningSlots.push(nextExecutionSlot);
await checkNonExclusiveRunningSlots(false);
runNextExecution();
}
}
this.currentlyExecutingDeferred.resolve();
this.currentlyExecutingDeferred = null;
while (this.executionSlots.length > 0) {
const currentSlot = this.executionSlots[0];
if (currentSlot.mode === 'exclusive') {
await this.executeExclusiveSlot(currentSlot);
this.executionSlots.shift();
} else {
// Gather all non-exclusive slots at the front of the queue
const nonExclusiveSlots: IExecutionSlot<any>[] = [];
while (this.executionSlots.length > 0 && this.executionSlots[0].mode === 'nonexclusive') {
nonExclusiveSlots.push(this.executionSlots.shift()!);
}
await this.executeNonExclusiveSlots(nonExclusiveSlots);
}
}
this.isProcessing = false;
}
private async executeExclusiveSlot(slot: IExecutionSlot<any>) {
try {
if (slot.timeout) {
const result = await Promise.race([
slot.funcToExecute(),
plugins.smartdelay.delayFor(slot.timeout).then(() => {
throw new Error('Timeout reached');
}),
]);
slot.executionDeferred.resolve(result);
} else {
const result = await slot.funcToExecute();
slot.executionDeferred.resolve(result);
}
} catch (error) {
slot.executionDeferred.reject(error);
}
}
private async executeNonExclusiveSlots(slots: IExecutionSlot<any>[]) {
const promises = slots.map(async (slot) => {
try {
if (slot.timeout) {
const result = await Promise.race([
slot.funcToExecute(),
plugins.smartdelay.delayFor(slot.timeout).then(() => {
throw new Error('Timeout reached');
}),
]);
slot.executionDeferred.resolve(result);
} else {
const result = await slot.funcToExecute();
slot.executionDeferred.resolve(result);
}
} catch (error) {
slot.executionDeferred.reject(error);
}
});
await Promise.all(promises);
} }
} }