From fcb67ec878e70c6d3827daff4fd4c181e0da9e44 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Wed, 2 Aug 2023 03:11:17 +0200 Subject: [PATCH] fix(core): update --- test/test.nonci.ts | 15 ++++++ ts/00_commitinfo_data.ts | 2 +- ts/els.classes.elasticdoc.ts | 88 ++++++++++++++++++++++++++++++++++-- 3 files changed, 101 insertions(+), 4 deletions(-) diff --git a/test/test.nonci.ts b/test/test.nonci.ts index 123265e..e4b3734 100644 --- a/test/test.nonci.ts +++ b/test/test.nonci.ts @@ -59,4 +59,19 @@ tap.test('should delete documents not part of the piping session', async () => { await testElasticDoc.endPipingSession(); }); +tap.test('should take and store snapshot', async () => { + await testElasticDoc.takeSnapshot(async (iterator, prevSnapshot) => { + const aggregationData = []; + for await (const doc of iterator) { + // Sample aggregation: counting documents + aggregationData.push(doc); + } + const snapshot = { + date: new Date().toISOString(), + aggregationData, + }; + return snapshot; + }); +}); + tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 65d492a..5303fdc 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@apiclient.xyz/elasticsearch', - version: '1.0.47', + version: '1.0.48', description: 'log to elasticsearch in a kibana compatible format' } diff --git a/ts/els.classes.elasticdoc.ts b/ts/els.classes.elasticdoc.ts index ee6b476..88f20d0 100644 --- a/ts/els.classes.elasticdoc.ts +++ b/ts/els.classes.elasticdoc.ts @@ -9,6 +9,13 @@ export interface IElasticDocConstructorOptions { }; } +export interface ISnapshot { + date: string; + aggregationData: any; +} + +export type SnapshotProcessor = (iterator: AsyncIterable, prevSnapshot: ISnapshot | null) => Promise; + export class ElasticDoc { public client: ElasticClient; public index: string; @@ -25,7 +32,6 @@ export class ElasticDoc { } async startPipingSession() { - // Clear the session docs set this.sessionDocs.clear(); } @@ -50,7 +56,6 @@ export class ElasticDoc { response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' }); } - // Batch delete docs for (const docId of allDocIds) { if (!this.sessionDocs.has(docId)) { responseQueue.push({ @@ -71,7 +76,84 @@ export class ElasticDoc { await this.client.bulk({ refresh: true, body: responseQueue }); } - // Clear the session docs set this.sessionDocs.clear(); } + + async takeSnapshot(processIterator: SnapshotProcessor) { + const snapshotIndex = `${this.index}_snapshots`; + + const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex }); + if (!indexExists) { + await this.client.indices.create({ + index: snapshotIndex, + body: { + mappings: { + properties: { + date: { + type: 'date' + }, + aggregationData: { + type: 'object', + enabled: true + } + } + } + } + }); + } + + const documentIterator = this.getDocumentIterator(); + + const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot()); + + await this.storeSnapshot(newSnapshot); + } + +private async getLastSnapshot(): Promise { + const snapshotIndex = `${this.index}_snapshots`; + const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex }); + + if (!indexExists) { + return null; + } + + const response = await this.client.search({ + index: snapshotIndex, + sort: 'date:desc', + size: 1 + }); + + if (response.body.hits.hits.length > 0) { + const hit = response.body.hits.hits[0]; + return { + date: hit._source.date, + aggregationData: hit._source.aggregationData, + }; + } else { + return null; + } + } + + + private async *getDocumentIterator() { + let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE }); + while (true) { + for (const hit of response.body.hits.hits) { + yield hit._source; + } + + if (!response.body.hits.hits.length) { + break; + } + + response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' }); + } + } + + private async storeSnapshot(snapshot: ISnapshot) { + await this.client.index({ + index: `${this.index}_snapshots`, + body: snapshot, + }); + } }