import { Client as ElasticClient } from '@elastic/elasticsearch'; export interface IElasticDocConstructorOptions { index: string; node: string; auth?: { username: string; password: string; }; } export interface ISnapshot { date: string; aggregationData: any; } export type SnapshotProcessor = ( iterator: AsyncIterable, prevSnapshot: ISnapshot | null ) => Promise; export class ElasticDoc { public client: ElasticClient; public index: string; private sessionDocs: Set = new Set(); private indexInitialized: boolean = false; private latestTimestamp: string | null = null; // Store the latest timestamp private onlyNew: boolean = false; // Whether to only pipe new docs private BATCH_SIZE = 1000; constructor(options: IElasticDocConstructorOptions) { this.client = new ElasticClient({ node: options.node, ...(options.auth && { auth: options.auth }), }); this.index = options.index; } private async ensureIndexExists(doc: any) { if (!this.indexInitialized) { const indexExists = await this.client.indices.exists({ index: this.index }); if (!indexExists) { const mappings = this.createMappingsFromDoc(doc); await this.client.indices.create({ index: this.index, body: { mappings, settings: { // You can define the settings according to your requirements here }, }, }); } this.indexInitialized = true; } } private createMappingsFromDoc(doc: any): any { const properties: any = {}; for (const key in doc) { if (key === '@timestamp') { properties[key] = { type: 'date' }; continue; } properties[key] = { type: typeof doc[key] === 'number' ? 'float' : 'text' }; } return { properties }; } async startPipingSession(options: { onlyNew?: boolean }) { this.sessionDocs.clear(); this.onlyNew = options.onlyNew; const indexExists = await this.client.indices.exists({ index: this.index }); if (this.onlyNew && indexExists) { const response = await this.client.search({ index: this.index, sort: '@timestamp:desc', size: 1, }); // If the search query succeeded, the index exists. const hit = response.hits.hits[0]; this.latestTimestamp = hit?._source?.['@timestamp'] || null; if (this.latestTimestamp) { console.log(`Working in "onlyNew" mode. Hence we are omitting documents prior to ${this.latestTimestamp}`); } else { console.log(`Working in "onlyNew" mode, but no documents found in index ${this.index}. Hence processing all documents now.`); } } else if (this.onlyNew && !indexExists) { console.log(`Working in "onlyNew" mode, but index ${this.index} does not exist. Hence processing all documents now.`); } } async pipeDocument(optionsArg: { docId: string; timestamp?: string | number; doc: any }) { await this.ensureIndexExists(optionsArg.doc); const documentBody = { ...optionsArg.doc, ...(optionsArg.timestamp && { '@timestamp': optionsArg.timestamp }), }; // If 'onlyNew' is true, compare the document timestamp with the latest timestamp if (this.onlyNew) { if (this.latestTimestamp && optionsArg.timestamp <= this.latestTimestamp) { // Omit the document } else { await this.client.index({ index: this.index, id: optionsArg.docId, body: documentBody, }); } } this.sessionDocs.add(optionsArg.docId); } async endPipingSession() { const allDocIds: string[] = []; const responseQueue = []; let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE, }); while (true) { response.hits.hits.forEach((hit: any) => allDocIds.push(hit._id)); if (!response.hits.hits.length) { break; } response = await this.client.scroll({ scroll_id: response._scroll_id, scroll: '1m' }); } for (const docId of allDocIds) { if (!this.sessionDocs.has(docId)) { responseQueue.push({ delete: { _index: this.index, _id: docId, }, }); if (responseQueue.length >= this.BATCH_SIZE) { await this.client.bulk({ refresh: true, body: responseQueue }); responseQueue.length = 0; } } } if (responseQueue.length > 0) { await this.client.bulk({ refresh: true, body: responseQueue }); } this.sessionDocs.clear(); } async takeSnapshot(processIterator: SnapshotProcessor) { const snapshotIndex = `${this.index}_snapshots`; const indexExists = await this.client.indices.exists({ index: snapshotIndex }); if (!indexExists) { await this.client.indices.create({ index: snapshotIndex, body: { mappings: { properties: { date: { type: 'date', }, aggregationData: { type: 'object', enabled: true, }, }, }, }, }); } const documentIterator = this.getDocumentIterator(); const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot()); await this.storeSnapshot(newSnapshot); } private async getLastSnapshot(): Promise { const snapshotIndex = `${this.index}_snapshots`; const indexExists = await this.client.indices.exists({ index: snapshotIndex }); if (!indexExists) { return null; } const response = await this.client.search({ index: snapshotIndex, sort: 'date:desc', size: 1, }); if (response.hits.hits.length > 0) { const hit = response.hits.hits[0]; return { date: hit._source['date'], aggregationData: hit._source['aggregationData'], }; } else { return null; } } private async *getDocumentIterator() { let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE, }); while (true) { for (const hit of response.hits.hits) { yield hit._source; } if (!response.hits.hits.length) { break; } response = await this.client.scroll({ scroll_id: response._scroll_id, scroll: '1m' }); } } private async storeSnapshot(snapshot: ISnapshot) { await this.client.index({ index: `${this.index}_snapshots`, body: snapshot, }); } }