import { Client as ElasticClient } from '@elastic/elasticsearch'; export interface IElasticDocConstructorOptions { index: string; node: string; auth?: { username: string; password: string; }; } export interface ISnapshot { date: string; aggregationData: any; } export type SnapshotProcessor = (iterator: AsyncIterable, prevSnapshot: ISnapshot | null) => Promise; export class ElasticDoc { public client: ElasticClient; public index: string; private sessionDocs: Set = new Set(); private indexInitialized: boolean = false; private BATCH_SIZE = 1000; constructor(options: IElasticDocConstructorOptions) { this.client = new ElasticClient({ node: options.node, ...(options.auth && { auth: options.auth }), }); this.index = options.index; } private async ensureIndexExists(doc: any) { if (!this.indexInitialized) { const { body: indexExists } = await this.client.indices.exists({ index: this.index }); if (!indexExists) { const mappings = this.createMappingsFromDoc(doc); await this.client.indices.create({ index: this.index, body: { mappings, settings: { // You can define the settings according to your requirements here }, }, }); } this.indexInitialized = true; } } private createMappingsFromDoc(doc: any): any { const properties: any = {}; for (const key in doc) { if (key === '@timestamp') { properties[key] = { type: 'date' }; continue; } properties[key] = { type: typeof doc[key] === 'number' ? 'float' : 'text' }; } return { properties }; } async startPipingSession() { this.sessionDocs.clear(); } async pipeDocument(optionsArg: { docId: string; timestamp: string | number; doc: any; }) { await this.ensureIndexExists(optionsArg.doc); const documentBody = { ...optionsArg.doc, ...(optionsArg.timestamp && { '@timestamp': optionsArg.timestamp }), }; await this.client.index({ index: this.index, id: optionsArg.docId, body: documentBody, }); this.sessionDocs.add(optionsArg.docId); } async endPipingSession() { const allDocIds: string[] = []; const responseQueue = []; let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE }); while (true) { response.body.hits.hits.forEach((hit: any) => allDocIds.push(hit._id)); if (!response.body.hits.hits.length) { break; } response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' }); } for (const docId of allDocIds) { if (!this.sessionDocs.has(docId)) { responseQueue.push({ delete: { _index: this.index, _id: docId, }, }); if (responseQueue.length >= this.BATCH_SIZE) { await this.client.bulk({ refresh: true, body: responseQueue }); responseQueue.length = 0; } } } if (responseQueue.length > 0) { await this.client.bulk({ refresh: true, body: responseQueue }); } this.sessionDocs.clear(); } async takeSnapshot(processIterator: SnapshotProcessor) { const snapshotIndex = `${this.index}_snapshots`; const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex }); if (!indexExists) { await this.client.indices.create({ index: snapshotIndex, body: { mappings: { properties: { date: { type: 'date' }, aggregationData: { type: 'object', enabled: true } } } } }); } const documentIterator = this.getDocumentIterator(); const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot()); await this.storeSnapshot(newSnapshot); } private async getLastSnapshot(): Promise { const snapshotIndex = `${this.index}_snapshots`; const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex }); if (!indexExists) { return null; } const response = await this.client.search({ index: snapshotIndex, sort: 'date:desc', size: 1 }); if (response.body.hits.hits.length > 0) { const hit = response.body.hits.hits[0]; return { date: hit._source.date, aggregationData: hit._source.aggregationData, }; } else { return null; } } private async *getDocumentIterator() { let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE }); while (true) { for (const hit of response.body.hits.hits) { yield hit._source; } if (!response.body.hits.hits.length) { break; } response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' }); } } private async storeSnapshot(snapshot: ISnapshot) { await this.client.index({ index: `${this.index}_snapshots`, body: snapshot, }); } }