fix(core): update

This commit is contained in:
Philipp Kunz 2023-08-01 12:38:53 +02:00
parent fb93dce8bc
commit 49233ce45f
4 changed files with 103 additions and 1 deletions

View File

@ -3,6 +3,7 @@ import { Qenv } from '@pushrocks/qenv';
import * as elasticsearch from '../ts/index.js';
let testElasticLog: elasticsearch.ElsSmartlogDestination<any>;
let testElasticDoc: elasticsearch.ElasticDoc;
tap.test('first test', async () => {
testElasticLog = new elasticsearch.ElsSmartlogDestination({
@ -35,4 +36,27 @@ tap.test('should send a message to Elasticsearch', async () => {
});
});
tap.test('should create an ElasticDoc instance', async () => {
testElasticDoc = new elasticsearch.ElasticDoc({
index: 'testindex',
node: 'http://localhost:9200',
auth: {
username: 'elastic',
password: 'YourPassword'
}
});
expect(testElasticDoc).toBeInstanceOf(elasticsearch.ElasticDoc);
});
tap.test('should add and update documents in a piping session', async () => {
await testElasticDoc.startPipingSession();
await testElasticDoc.pipeDocument('1', { name: 'doc1' });
await testElasticDoc.pipeDocument('2', { name: 'doc2' });
await testElasticDoc.pipeDocument('1', { name: 'updated doc1' });
});
tap.test('should delete documents not part of the piping session', async () => {
await testElasticDoc.endPipingSession();
});
tap.start();

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@apiclient.xyz/elasticsearch',
version: '1.0.45',
version: '1.0.46',
description: 'log to elasticsearch in a kibana compatible format'
}

View File

@ -0,0 +1,77 @@
import { Client as ElasticClient } from '@elastic/elasticsearch';
export interface IElasticDocConstructorOptions {
index: string;
node: string;
auth?: {
username: string;
password: string;
};
}
export class ElasticDoc {
public client: ElasticClient;
public index: string;
private sessionDocs: Set<string> = new Set();
private BATCH_SIZE = 1000;
constructor(options: IElasticDocConstructorOptions) {
this.client = new ElasticClient({
node: options.node,
...(options.auth && { auth: options.auth }),
});
this.index = options.index;
}
async startPipingSession() {
// Clear the session docs set
this.sessionDocs.clear();
}
async pipeDocument(docId: string, doc: any) {
await this.client.index({
index: this.index,
id: docId,
body: doc,
});
this.sessionDocs.add(docId);
}
async endPipingSession() {
const allDocIds: string[] = [];
const responseQueue = [];
let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE });
while (true) {
response.body.hits.hits.forEach((hit: any) => allDocIds.push(hit._id));
if (!response.body.hits.hits.length) {
break;
}
response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' });
}
// Batch delete docs
for (const docId of allDocIds) {
if (!this.sessionDocs.has(docId)) {
responseQueue.push({
delete: {
_index: this.index,
_id: docId,
},
});
if (responseQueue.length >= this.BATCH_SIZE) {
await this.client.bulk({ refresh: true, body: responseQueue });
responseQueue.length = 0;
}
}
}
if (responseQueue.length > 0) {
await this.client.bulk({ refresh: true, body: responseQueue });
}
// Clear the session docs set
this.sessionDocs.clear();
}
}

View File

@ -1 +1,2 @@
export * from './els.classes.smartlogdestination.js';
export * from './els.classes.elasticdoc.js';