fix(core): update

This commit is contained in:
Philipp Kunz 2023-08-02 03:11:17 +02:00
parent 9e25494f8f
commit fcb67ec878
3 changed files with 101 additions and 4 deletions

View File

@ -59,4 +59,19 @@ tap.test('should delete documents not part of the piping session', async () => {
await testElasticDoc.endPipingSession(); await testElasticDoc.endPipingSession();
}); });
tap.test('should take and store snapshot', async () => {
await testElasticDoc.takeSnapshot(async (iterator, prevSnapshot) => {
const aggregationData = [];
for await (const doc of iterator) {
// Sample aggregation: counting documents
aggregationData.push(doc);
}
const snapshot = {
date: new Date().toISOString(),
aggregationData,
};
return snapshot;
});
});
tap.start(); tap.start();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@apiclient.xyz/elasticsearch', name: '@apiclient.xyz/elasticsearch',
version: '1.0.47', version: '1.0.48',
description: 'log to elasticsearch in a kibana compatible format' description: 'log to elasticsearch in a kibana compatible format'
} }

View File

@ -9,6 +9,13 @@ export interface IElasticDocConstructorOptions {
}; };
} }
export interface ISnapshot {
date: string;
aggregationData: any;
}
export type SnapshotProcessor = (iterator: AsyncIterable<any>, prevSnapshot: ISnapshot | null) => Promise<ISnapshot>;
export class ElasticDoc { export class ElasticDoc {
public client: ElasticClient; public client: ElasticClient;
public index: string; public index: string;
@ -25,7 +32,6 @@ export class ElasticDoc {
} }
async startPipingSession() { async startPipingSession() {
// Clear the session docs set
this.sessionDocs.clear(); this.sessionDocs.clear();
} }
@ -50,7 +56,6 @@ export class ElasticDoc {
response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' }); response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' });
} }
// Batch delete docs
for (const docId of allDocIds) { for (const docId of allDocIds) {
if (!this.sessionDocs.has(docId)) { if (!this.sessionDocs.has(docId)) {
responseQueue.push({ responseQueue.push({
@ -71,7 +76,84 @@ export class ElasticDoc {
await this.client.bulk({ refresh: true, body: responseQueue }); await this.client.bulk({ refresh: true, body: responseQueue });
} }
// Clear the session docs set
this.sessionDocs.clear(); this.sessionDocs.clear();
} }
async takeSnapshot(processIterator: SnapshotProcessor) {
const snapshotIndex = `${this.index}_snapshots`;
const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex });
if (!indexExists) {
await this.client.indices.create({
index: snapshotIndex,
body: {
mappings: {
properties: {
date: {
type: 'date'
},
aggregationData: {
type: 'object',
enabled: true
}
}
}
}
});
}
const documentIterator = this.getDocumentIterator();
const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot());
await this.storeSnapshot(newSnapshot);
}
private async getLastSnapshot(): Promise<ISnapshot | null> {
const snapshotIndex = `${this.index}_snapshots`;
const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex });
if (!indexExists) {
return null;
}
const response = await this.client.search({
index: snapshotIndex,
sort: 'date:desc',
size: 1
});
if (response.body.hits.hits.length > 0) {
const hit = response.body.hits.hits[0];
return {
date: hit._source.date,
aggregationData: hit._source.aggregationData,
};
} else {
return null;
}
}
private async *getDocumentIterator() {
let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE });
while (true) {
for (const hit of response.body.hits.hits) {
yield hit._source;
}
if (!response.body.hits.hits.length) {
break;
}
response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' });
}
}
private async storeSnapshot(snapshot: ISnapshot) {
await this.client.index({
index: `${this.index}_snapshots`,
body: snapshot,
});
}
} }