diff --git a/test/test.nonci.ts b/test/test.nonci.ts index e4b3734..8f2b86d 100644 --- a/test/test.nonci.ts +++ b/test/test.nonci.ts @@ -50,9 +50,21 @@ tap.test('should create an ElasticDoc instance', async () => { tap.test('should add and update documents in a piping session', async () => { await testElasticDoc.startPipingSession(); - await testElasticDoc.pipeDocument('1', { name: 'doc1' }); - await testElasticDoc.pipeDocument('2', { name: 'doc2' }); - await testElasticDoc.pipeDocument('1', { name: 'updated doc1' }); + await testElasticDoc.pipeDocument({ + docId: '1', + timestamp: new Date().toISOString(), + doc: { name: 'doc1' } + }); + await testElasticDoc.pipeDocument({ + docId: '2', + timestamp: new Date().toISOString(), + doc: { name: 'doc2' } + }); + await testElasticDoc.pipeDocument({ + docId: '1', + timestamp: new Date().toISOString(), + doc: { name: 'updated doc1' } + }); }); tap.test('should delete documents not part of the piping session', async () => { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 7734a63..6d895f3 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@apiclient.xyz/elasticsearch', - version: '1.0.55', + version: '1.0.56', description: 'log to elasticsearch in a kibana compatible format' } diff --git a/ts/els.classes.elasticdoc.ts b/ts/els.classes.elasticdoc.ts index 88f20d0..268354f 100644 --- a/ts/els.classes.elasticdoc.ts +++ b/ts/els.classes.elasticdoc.ts @@ -20,7 +20,8 @@ export class ElasticDoc { public client: ElasticClient; public index: string; private sessionDocs: Set = new Set(); - + private indexInitialized: boolean = false; + private BATCH_SIZE = 1000; constructor(options: IElasticDocConstructorOptions) { @@ -31,17 +32,59 @@ export class ElasticDoc { this.index = options.index; } + private async ensureIndexExists(doc: any) { + if (!this.indexInitialized) { + const { body: indexExists } = await this.client.indices.exists({ index: this.index }); + if (!indexExists) { + const mappings = this.createMappingsFromDoc(doc); + await this.client.indices.create({ + index: this.index, + body: { + mappings, + settings: { + // You can define the settings according to your requirements here + }, + }, + }); + } + this.indexInitialized = true; + } + } + + private createMappingsFromDoc(doc: any): any { + const properties: any = {}; + for (const key in doc) { + if (key === '@timestamp') { + properties[key] = { type: 'date' }; + continue; + } + properties[key] = { type: typeof doc[key] === 'number' ? 'float' : 'text' }; + } + return { properties }; + } + async startPipingSession() { this.sessionDocs.clear(); } - async pipeDocument(docId: string, doc: any) { + async pipeDocument(optionsArg: { + docId: string; + timestamp: string | number; + doc: any; + }) { + await this.ensureIndexExists(optionsArg.doc); + + const documentBody = { + ...optionsArg.doc, + ...(optionsArg.timestamp && { '@timestamp': optionsArg.timestamp }), + }; + await this.client.index({ index: this.index, - id: docId, - body: doc, + id: optionsArg.docId, + body: documentBody, }); - this.sessionDocs.add(docId); + this.sessionDocs.add(optionsArg.docId); } async endPipingSession() {