fix(core): update
This commit is contained in:
		@@ -49,7 +49,7 @@ tap.test('should create an ElasticDoc instance', async () => {
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
tap.test('should add and update documents in a piping session', async () => {
 | 
			
		||||
  await testElasticDoc.startPipingSession();
 | 
			
		||||
  await testElasticDoc.startPipingSession({});
 | 
			
		||||
  await testElasticDoc.pipeDocument({
 | 
			
		||||
    docId: '1',
 | 
			
		||||
    timestamp: new Date().toISOString(),
 | 
			
		||||
 
 | 
			
		||||
@@ -3,6 +3,6 @@
 | 
			
		||||
 */
 | 
			
		||||
export const commitinfo = {
 | 
			
		||||
  name: '@apiclient.xyz/elasticsearch',
 | 
			
		||||
  version: '2.0.1',
 | 
			
		||||
  version: '2.0.2',
 | 
			
		||||
  description: 'log to elasticsearch in a kibana compatible format'
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -14,13 +14,18 @@ export interface ISnapshot {
 | 
			
		||||
  aggregationData: any;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export type SnapshotProcessor = (iterator: AsyncIterable<any>, prevSnapshot: ISnapshot | null) => Promise<ISnapshot>;
 | 
			
		||||
export type SnapshotProcessor = (
 | 
			
		||||
  iterator: AsyncIterable<any>,
 | 
			
		||||
  prevSnapshot: ISnapshot | null
 | 
			
		||||
) => Promise<ISnapshot>;
 | 
			
		||||
 | 
			
		||||
export class ElasticDoc {
 | 
			
		||||
  public client: ElasticClient;
 | 
			
		||||
  public index: string;
 | 
			
		||||
  private sessionDocs: Set<string> = new Set();
 | 
			
		||||
  private indexInitialized: boolean = false;
 | 
			
		||||
  private latestTimestamp: string | null = null; // Store the latest timestamp
 | 
			
		||||
  private onlyNew: boolean = false; // Whether to only pipe new docs
 | 
			
		||||
 | 
			
		||||
  private BATCH_SIZE = 1000;
 | 
			
		||||
 | 
			
		||||
@@ -63,15 +68,35 @@ export class ElasticDoc {
 | 
			
		||||
    return { properties };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async startPipingSession() {
 | 
			
		||||
  async startPipingSession(options: { onlyNew?: boolean }) {
 | 
			
		||||
    this.sessionDocs.clear();
 | 
			
		||||
    this.onlyNew = options.onlyNew;
 | 
			
		||||
    if (this.onlyNew) {
 | 
			
		||||
      // Retrieve and store the latest timestamp
 | 
			
		||||
      const response = await this.client.search({
 | 
			
		||||
        index: this.index,
 | 
			
		||||
        sort: '@timestamp:desc',
 | 
			
		||||
        size: 1,
 | 
			
		||||
      });
 | 
			
		||||
      const hit = response.body.hits.hits[0];
 | 
			
		||||
      this.latestTimestamp = hit?._source?.['@timestamp'] || null;
 | 
			
		||||
      if (this.latestTimestamp) {
 | 
			
		||||
        console.log(`Working in "onlyNew" mode. Hence we are omitting documents prior to ${this.latestTimestamp}`);
 | 
			
		||||
      } else {
 | 
			
		||||
        `Working in "onlyNew" mode, but no documents found in index ${this.index}. Hence processing all documents now.`
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async pipeDocument(optionsArg: {
 | 
			
		||||
    docId: string;
 | 
			
		||||
    timestamp: string | number;
 | 
			
		||||
    doc: any;
 | 
			
		||||
  }) {
 | 
			
		||||
  async pipeDocument(optionsArg: { docId: string; timestamp?: string | number; doc: any }) {
 | 
			
		||||
    // If 'onlyNew' is true, compare the document timestamp with the latest timestamp
 | 
			
		||||
    if (this.onlyNew) {
 | 
			
		||||
      if (this.latestTimestamp && optionsArg.timestamp <= this.latestTimestamp) {
 | 
			
		||||
        // Omit the document
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    await this.ensureIndexExists(optionsArg.doc);
 | 
			
		||||
 | 
			
		||||
    const documentBody = {
 | 
			
		||||
@@ -90,7 +115,11 @@ export class ElasticDoc {
 | 
			
		||||
  async endPipingSession() {
 | 
			
		||||
    const allDocIds: string[] = [];
 | 
			
		||||
    const responseQueue = [];
 | 
			
		||||
    let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE });
 | 
			
		||||
    let response = await this.client.search({
 | 
			
		||||
      index: this.index,
 | 
			
		||||
      scroll: '1m',
 | 
			
		||||
      size: this.BATCH_SIZE,
 | 
			
		||||
    });
 | 
			
		||||
    while (true) {
 | 
			
		||||
      response.body.hits.hits.forEach((hit: any) => allDocIds.push(hit._id));
 | 
			
		||||
      if (!response.body.hits.hits.length) {
 | 
			
		||||
@@ -124,35 +153,35 @@ export class ElasticDoc {
 | 
			
		||||
 | 
			
		||||
  async takeSnapshot(processIterator: SnapshotProcessor) {
 | 
			
		||||
    const snapshotIndex = `${this.index}_snapshots`;
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
    const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex });
 | 
			
		||||
    if (!indexExists) {
 | 
			
		||||
      await this.client.indices.create({ 
 | 
			
		||||
      await this.client.indices.create({
 | 
			
		||||
        index: snapshotIndex,
 | 
			
		||||
        body: {
 | 
			
		||||
          mappings: {
 | 
			
		||||
            properties: {
 | 
			
		||||
              date: {
 | 
			
		||||
                type: 'date'
 | 
			
		||||
                type: 'date',
 | 
			
		||||
              },
 | 
			
		||||
              aggregationData: {
 | 
			
		||||
                type: 'object',
 | 
			
		||||
                enabled: true
 | 
			
		||||
              }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
                enabled: true,
 | 
			
		||||
              },
 | 
			
		||||
            },
 | 
			
		||||
          },
 | 
			
		||||
        },
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
    const documentIterator = this.getDocumentIterator();
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
    const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot());
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
    await this.storeSnapshot(newSnapshot);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
private async getLastSnapshot(): Promise<ISnapshot | null> {
 | 
			
		||||
  private async getLastSnapshot(): Promise<ISnapshot | null> {
 | 
			
		||||
    const snapshotIndex = `${this.index}_snapshots`;
 | 
			
		||||
    const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex });
 | 
			
		||||
 | 
			
		||||
@@ -163,7 +192,7 @@ private async getLastSnapshot(): Promise<ISnapshot | null> {
 | 
			
		||||
    const response = await this.client.search({
 | 
			
		||||
      index: snapshotIndex,
 | 
			
		||||
      sort: 'date:desc',
 | 
			
		||||
      size: 1
 | 
			
		||||
      size: 1,
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    if (response.body.hits.hits.length > 0) {
 | 
			
		||||
@@ -177,9 +206,12 @@ private async getLastSnapshot(): Promise<ISnapshot | null> {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  private async *getDocumentIterator() {
 | 
			
		||||
    let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE });
 | 
			
		||||
    let response = await this.client.search({
 | 
			
		||||
      index: this.index,
 | 
			
		||||
      scroll: '1m',
 | 
			
		||||
      size: this.BATCH_SIZE,
 | 
			
		||||
    });
 | 
			
		||||
    while (true) {
 | 
			
		||||
      for (const hit of response.body.hits.hits) {
 | 
			
		||||
        yield hit._source;
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user