import { Client as ElasticClient } from '@elastic/elasticsearch'; export interface IElasticDocConstructorOptions { index: string; node: string; auth?: { username: string; password: string; }; } export class ElasticDoc { public client: ElasticClient; public index: string; private sessionDocs: Set = new Set(); private BATCH_SIZE = 1000; constructor(options: IElasticDocConstructorOptions) { this.client = new ElasticClient({ node: options.node, ...(options.auth && { auth: options.auth }), }); this.index = options.index; } async startPipingSession() { // Clear the session docs set this.sessionDocs.clear(); } async pipeDocument(docId: string, doc: any) { await this.client.index({ index: this.index, id: docId, body: doc, }); this.sessionDocs.add(docId); } async endPipingSession() { const allDocIds: string[] = []; const responseQueue = []; let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE }); while (true) { response.body.hits.hits.forEach((hit: any) => allDocIds.push(hit._id)); if (!response.body.hits.hits.length) { break; } response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' }); } // Batch delete docs for (const docId of allDocIds) { if (!this.sessionDocs.has(docId)) { responseQueue.push({ delete: { _index: this.index, _id: docId, }, }); if (responseQueue.length >= this.BATCH_SIZE) { await this.client.bulk({ refresh: true, body: responseQueue }); responseQueue.length = 0; } } } if (responseQueue.length > 0) { await this.client.bulk({ refresh: true, body: responseQueue }); } // Clear the session docs set this.sessionDocs.clear(); } }