smartarray/ts/classes.concurrentprocessor.ts

35 lines
1.0 KiB
TypeScript
Raw Permalink Normal View History

export class ConcurrentProcessor<T> {
private asyncFunction: (item: T) => Promise<void>;
private concurrencyLimit: number;
constructor(asyncFunction: (item: T) => Promise<void>, concurrencyLimit: number) {
this.asyncFunction = asyncFunction;
this.concurrencyLimit = concurrencyLimit;
}
async process(items: T[]): Promise<void> {
const queue: Array<Promise<void>> = [];
let totalProcessed = 0;
for (const item of items) {
const promise = this.asyncFunction(item).then(() => {
totalProcessed++;
if (totalProcessed % 10000 === 0) {
console.log(`${totalProcessed} items processed.`);
}
// Remove the completed promise from the queue
queue.splice(queue.indexOf(promise), 1);
});
queue.push(promise);
// Wait if the number of promises exceeds the concurrency limit
if (queue.length >= this.concurrencyLimit) {
await Promise.race(queue);
}
}
// Wait for the remaining promises to resolve
await Promise.all(queue);
}
}