feat(core): Add concurrent processing support
This commit is contained in:
35
ts/classes.concurrentprocessor.ts
Normal file
35
ts/classes.concurrentprocessor.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
export class ConcurrentProcessor<T> {
|
||||
private asyncFunction: (item: T) => Promise<void>;
|
||||
private concurrencyLimit: number;
|
||||
|
||||
constructor(asyncFunction: (item: T) => Promise<void>, concurrencyLimit: number) {
|
||||
this.asyncFunction = asyncFunction;
|
||||
this.concurrencyLimit = concurrencyLimit;
|
||||
}
|
||||
|
||||
async process(items: T[]): Promise<void> {
|
||||
const queue: Array<Promise<void>> = [];
|
||||
let totalProcessed = 0;
|
||||
|
||||
for (const item of items) {
|
||||
const promise = this.asyncFunction(item).then(() => {
|
||||
totalProcessed++;
|
||||
if (totalProcessed % 10000 === 0) {
|
||||
console.log(`${totalProcessed} items processed.`);
|
||||
}
|
||||
// Remove the completed promise from the queue
|
||||
queue.splice(queue.indexOf(promise), 1);
|
||||
});
|
||||
|
||||
queue.push(promise);
|
||||
|
||||
// Wait if the number of promises exceeds the concurrency limit
|
||||
if (queue.length >= this.concurrencyLimit) {
|
||||
await Promise.race(queue);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the remaining promises to resolve
|
||||
await Promise.all(queue);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user