35 lines
1.0 KiB
TypeScript
35 lines
1.0 KiB
TypeScript
|
export class ConcurrentProcessor<T> {
|
||
|
private asyncFunction: (item: T) => Promise<void>;
|
||
|
private concurrencyLimit: number;
|
||
|
|
||
|
constructor(asyncFunction: (item: T) => Promise<void>, concurrencyLimit: number) {
|
||
|
this.asyncFunction = asyncFunction;
|
||
|
this.concurrencyLimit = concurrencyLimit;
|
||
|
}
|
||
|
|
||
|
async process(items: T[]): Promise<void> {
|
||
|
const queue: Array<Promise<void>> = [];
|
||
|
let totalProcessed = 0;
|
||
|
|
||
|
for (const item of items) {
|
||
|
const promise = this.asyncFunction(item).then(() => {
|
||
|
totalProcessed++;
|
||
|
if (totalProcessed % 10000 === 0) {
|
||
|
console.log(`${totalProcessed} items processed.`);
|
||
|
}
|
||
|
// Remove the completed promise from the queue
|
||
|
queue.splice(queue.indexOf(promise), 1);
|
||
|
});
|
||
|
|
||
|
queue.push(promise);
|
||
|
|
||
|
// Wait if the number of promises exceeds the concurrency limit
|
||
|
if (queue.length >= this.concurrencyLimit) {
|
||
|
await Promise.race(queue);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Wait for the remaining promises to resolve
|
||
|
await Promise.all(queue);
|
||
|
}
|
||
|
}
|