fix(core): update

This commit is contained in:
Philipp Kunz 2023-08-01 12:24:22 +02:00
parent 2a595a1a9a
commit 30cbc05aa2
7 changed files with 82 additions and 22 deletions

0
test/00tapwrap.ts Normal file
View File

View File

@ -2,10 +2,10 @@ import { expect, tap } from '@pushrocks/tapbundle';
import { Qenv } from '@pushrocks/qenv';
import * as elasticsearch from '../ts/index.js';
let testElasticLog: elasticsearch.ElasticSearch<any>;
let testElasticLog: elasticsearch.ElsSmartlogDestination<any>;
tap.test('first test', async () => {
testElasticLog = new elasticsearch.ElasticSearch({
testElasticLog = new elasticsearch.ElsSmartlogDestination({
indexPrefix: 'testprefix',
indexRetention: 7,
node: 'http://localhost:9200',
@ -14,7 +14,7 @@ tap.test('first test', async () => {
password: 'YourPassword'
}
});
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElasticSearch);
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElsSmartlogDestination);
});
tap.test('should send a message to Elasticsearch', async () => {

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@apiclient.xyz/elasticsearch',
version: '1.0.44',
version: '1.0.45',
description: 'log to elasticsearch in a kibana compatible format'
}

View File

@ -1,14 +1,13 @@
import * as plugins from './elasticsearch.plugins.js';
import { ElasticSearch } from './elasticsearch.classes.elasticsearch.js';
import { ElsSmartlogDestination } from './els.classes.smartlogdestination.js';
import { type ILogPackage } from '@pushrocks/smartlog-interfaces';
import { Stringmap } from '@pushrocks/lik';
export class ElasticIndex {
private stringmap = new Stringmap();
private elasticSearchRef: ElasticSearch<any>;
private elasticSearchRef: ElsSmartlogDestination<any>;
constructor(elasticSearchInstanceArg: ElasticSearch<ILogPackage>) {
constructor(elasticSearchInstanceArg: ElsSmartlogDestination<ILogPackage>) {
this.elasticSearchRef = elasticSearchInstanceArg;
}
@ -16,6 +15,7 @@ export class ElasticIndex {
if (this.stringmap.checkString(indexNameArg)) {
return indexNameArg;
}
const responseArg = await this.elasticSearchRef.client.cat.indices({
format: 'json',
bytes: 'm',
@ -27,7 +27,6 @@ export class ElasticIndex {
throw new Error('Could not get valid response from elastic search');
}
// lets delete indexes that violate the retention
if (Array.isArray(responseArg.body)) {
const filteredIndices = responseArg.body.filter((indexObjectArg) => {
return indexObjectArg.index.startsWith(prefixArg);
@ -49,6 +48,7 @@ export class ElasticIndex {
if (!index) {
await this.createNewIndex(indexNameArg);
}
this.stringmap.addString(indexNameArg);
return index;
}
@ -57,6 +57,23 @@ export class ElasticIndex {
const response = await this.elasticSearchRef.client.indices.create({
wait_for_active_shards: '1',
index: indexNameArg,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
});
}

View File

@ -1,29 +1,48 @@
import { ElasticSearch, type IStandardLogParams } from './elasticsearch.classes.elasticsearch.js';
import { ElsSmartlogDestination, type IStandardLogParams } from './els.classes.smartlogdestination.js';
export class ElasticScheduler {
elasticSearchRef: ElasticSearch<any>;
elasticSearchRef: ElsSmartlogDestination<any>;
docsScheduled = false;
docsStorage: any[] = [];
constructor(elasticLogRefArg: ElasticSearch<any>) {
// maximum size of the buffer
maxBufferSize = 500;
constructor(elasticLogRefArg: ElsSmartlogDestination<any>) {
this.elasticSearchRef = elasticLogRefArg;
}
public addFailedDoc(objectArg: any | IStandardLogParams) {
this.docsStorage.push(objectArg);
this.addToStorage(objectArg);
this.setRetry();
}
public scheduleDoc(logObject: any) {
this.addToStorage(logObject);
}
private addToStorage(logObject: any) {
this.docsStorage.push(logObject);
// if buffer is full, send logs immediately
if (this.docsStorage.length >= this.maxBufferSize) {
this.flushLogsToElasticSearch();
}
}
private flushLogsToElasticSearch() {
const oldStorage = this.docsStorage;
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
}
public setRetry() {
setTimeout(() => {
const oldStorage = this.docsStorage;
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
this.flushLogsToElasticSearch();
if (this.docsStorage.length === 0) {
console.log('ElasticLog retry success!!!');
this.docsScheduled = false;

View File

@ -21,7 +21,7 @@ export interface IElasticSearchConstructorOptions {
};
}
export class ElasticSearch<T> {
export class ElsSmartlogDestination<T> {
public client: ElasticClient;
public elasticScheduler = new ElasticScheduler(this);
public elasticIndex: ElasticIndex = new ElasticIndex(this);
@ -38,7 +38,7 @@ export class ElasticSearch<T> {
node: optionsArg.node,
...(optionsArg.auth && { auth: optionsArg.auth }),
});
this.indexPrefix = optionsArg.indexPrefix;
this.indexPrefix = `${optionsArg.indexPrefix}`;
this.indexRetention = optionsArg.indexRetention;
}
@ -51,7 +51,31 @@ export class ElasticSearch<T> {
return;
}
const resultIndexName = await this.elasticIndex.ensureIndex(this.indexPrefix, indexToUse);
// Make sure the index is created with a mapping for dynamic JSON
const indexExists = await this.client.indices.exists({ index: indexToUse });
if (!indexExists.body) {
await this.client.indices.create({
index: indexToUse,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
});
}
this.client.index(
{
index: indexToUse,

View File

@ -1 +1 @@
export * from './elasticsearch.classes.elasticsearch.js';
export * from './els.classes.smartlogdestination.js';