diff --git a/test/test.nonci.ts b/test/test.nonci.ts index 8f2b86d..dd5a088 100644 --- a/test/test.nonci.ts +++ b/test/test.nonci.ts @@ -49,7 +49,7 @@ tap.test('should create an ElasticDoc instance', async () => { }); tap.test('should add and update documents in a piping session', async () => { - await testElasticDoc.startPipingSession(); + await testElasticDoc.startPipingSession({}); await testElasticDoc.pipeDocument({ docId: '1', timestamp: new Date().toISOString(), diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index f66a93e..4190546 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@apiclient.xyz/elasticsearch', - version: '2.0.1', + version: '2.0.2', description: 'log to elasticsearch in a kibana compatible format' } diff --git a/ts/els.classes.elasticdoc.ts b/ts/els.classes.elasticdoc.ts index 268354f..8f87927 100644 --- a/ts/els.classes.elasticdoc.ts +++ b/ts/els.classes.elasticdoc.ts @@ -14,13 +14,18 @@ export interface ISnapshot { aggregationData: any; } -export type SnapshotProcessor = (iterator: AsyncIterable, prevSnapshot: ISnapshot | null) => Promise; +export type SnapshotProcessor = ( + iterator: AsyncIterable, + prevSnapshot: ISnapshot | null +) => Promise; export class ElasticDoc { public client: ElasticClient; public index: string; private sessionDocs: Set = new Set(); private indexInitialized: boolean = false; + private latestTimestamp: string | null = null; // Store the latest timestamp + private onlyNew: boolean = false; // Whether to only pipe new docs private BATCH_SIZE = 1000; @@ -63,15 +68,35 @@ export class ElasticDoc { return { properties }; } - async startPipingSession() { + async startPipingSession(options: { onlyNew?: boolean }) { this.sessionDocs.clear(); + this.onlyNew = options.onlyNew; + if (this.onlyNew) { + // Retrieve and store the latest timestamp + const response = await this.client.search({ + index: this.index, + sort: '@timestamp:desc', + size: 1, + }); + const hit = response.body.hits.hits[0]; + this.latestTimestamp = hit?._source?.['@timestamp'] || null; + if (this.latestTimestamp) { + console.log(`Working in "onlyNew" mode. Hence we are omitting documents prior to ${this.latestTimestamp}`); + } else { + `Working in "onlyNew" mode, but no documents found in index ${this.index}. Hence processing all documents now.` + } + } } - async pipeDocument(optionsArg: { - docId: string; - timestamp: string | number; - doc: any; - }) { + async pipeDocument(optionsArg: { docId: string; timestamp?: string | number; doc: any }) { + // If 'onlyNew' is true, compare the document timestamp with the latest timestamp + if (this.onlyNew) { + if (this.latestTimestamp && optionsArg.timestamp <= this.latestTimestamp) { + // Omit the document + return; + } + } + await this.ensureIndexExists(optionsArg.doc); const documentBody = { @@ -90,7 +115,11 @@ export class ElasticDoc { async endPipingSession() { const allDocIds: string[] = []; const responseQueue = []; - let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE }); + let response = await this.client.search({ + index: this.index, + scroll: '1m', + size: this.BATCH_SIZE, + }); while (true) { response.body.hits.hits.forEach((hit: any) => allDocIds.push(hit._id)); if (!response.body.hits.hits.length) { @@ -124,35 +153,35 @@ export class ElasticDoc { async takeSnapshot(processIterator: SnapshotProcessor) { const snapshotIndex = `${this.index}_snapshots`; - + const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex }); if (!indexExists) { - await this.client.indices.create({ + await this.client.indices.create({ index: snapshotIndex, body: { mappings: { properties: { date: { - type: 'date' + type: 'date', }, aggregationData: { type: 'object', - enabled: true - } - } - } - } + enabled: true, + }, + }, + }, + }, }); } - + const documentIterator = this.getDocumentIterator(); - + const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot()); - + await this.storeSnapshot(newSnapshot); } -private async getLastSnapshot(): Promise { + private async getLastSnapshot(): Promise { const snapshotIndex = `${this.index}_snapshots`; const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex }); @@ -163,7 +192,7 @@ private async getLastSnapshot(): Promise { const response = await this.client.search({ index: snapshotIndex, sort: 'date:desc', - size: 1 + size: 1, }); if (response.body.hits.hits.length > 0) { @@ -177,9 +206,12 @@ private async getLastSnapshot(): Promise { } } - private async *getDocumentIterator() { - let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE }); + let response = await this.client.search({ + index: this.index, + scroll: '1m', + size: this.BATCH_SIZE, + }); while (true) { for (const hit of response.body.hits.hits) { yield hit._source;