Compare commits

..

14 Commits

Author SHA1 Message Date
fb93dce8bc 1.0.45 2023-08-01 12:24:22 +02:00
30cbc05aa2 fix(core): update 2023-08-01 12:24:22 +02:00
2a595a1a9a 1.0.44 2023-07-05 23:45:48 +02:00
d62b18e93c fix(core): update 2023-07-05 23:45:48 +02:00
d6176f820a 1.0.43 2023-07-05 17:27:24 +02:00
0ebc1d5288 fix(core): update 2023-07-05 17:27:24 +02:00
2b0003546a 1.0.42 2023-07-05 15:31:01 +02:00
60617f2fca fix(core): update 2023-07-05 15:31:00 +02:00
9c767d07e4 1.0.41 2023-07-05 10:26:00 +02:00
f3aa94dcb7 fix(core): update 2023-07-05 10:25:59 +02:00
a0be0edd9d 1.0.40 2023-07-05 10:22:53 +02:00
ad24ba2f5d fix(core): update 2023-07-05 10:22:53 +02:00
b0cf4bb27f 1.0.39 2023-07-05 09:38:43 +02:00
fd29ceab80 fix(core): update 2023-07-05 09:38:43 +02:00
11 changed files with 148 additions and 139 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@apiclient.xyz/elasticsearch", "name": "@apiclient.xyz/elasticsearch",
"version": "1.0.38", "version": "1.0.45",
"private": false, "private": false,
"description": "log to elasticsearch in a kibana compatible format", "description": "log to elasticsearch in a kibana compatible format",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -27,8 +27,7 @@
"@pushrocks/smartdelay": "^3.0.1", "@pushrocks/smartdelay": "^3.0.1",
"@pushrocks/smartlog-interfaces": "^3.0.0", "@pushrocks/smartlog-interfaces": "^3.0.0",
"@pushrocks/smartpromise": "^4.0.2", "@pushrocks/smartpromise": "^4.0.2",
"@pushrocks/smarttime": "^4.0.1", "@pushrocks/smarttime": "^4.0.1"
"@types/elasticsearch": "^5.0.40"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

7
pnpm-lock.yaml generated
View File

@ -23,9 +23,6 @@ dependencies:
'@pushrocks/smarttime': '@pushrocks/smarttime':
specifier: ^4.0.1 specifier: ^4.0.1
version: 4.0.1 version: 4.0.1
'@types/elasticsearch':
specifier: ^5.0.40
version: 5.0.40
devDependencies: devDependencies:
'@gitzone/tsbuild': '@gitzone/tsbuild':
@ -1244,10 +1241,6 @@ packages:
resolution: {integrity: sha512-tpu0hp+AOIzwdAHyZPzLE5pCf9uT0pb+xZ76T4S7MrY2YTVq918Q7Q2VQ3KCVQqYxM7nxuCK/SL3X97jBEIeKQ==} resolution: {integrity: sha512-tpu0hp+AOIzwdAHyZPzLE5pCf9uT0pb+xZ76T4S7MrY2YTVq918Q7Q2VQ3KCVQqYxM7nxuCK/SL3X97jBEIeKQ==}
dev: true dev: true
/@types/elasticsearch@5.0.40:
resolution: {integrity: sha512-lhnbkC0XorAD7Dt7X+94cXUSHEdDNnEVk/DgFLHgIZQNhixV631Lj4+KpXunTT5rCHyj9RqK3TfO7QrOiwEeUQ==}
dev: false
/@types/express-serve-static-core@4.17.35: /@types/express-serve-static-core@4.17.35:
resolution: {integrity: sha512-wALWQwrgiB2AWTT91CB62b6Yt0sNHpznUXeZEcnPU3DRdlDIz74x8Qg1UUYKSVFi+va5vKOLYRBI1bRKiLLKIg==} resolution: {integrity: sha512-wALWQwrgiB2AWTT91CB62b6Yt0sNHpznUXeZEcnPU3DRdlDIz74x8Qg1UUYKSVFi+va5vKOLYRBI1bRKiLLKIg==}
dependencies: dependencies:

View File

@ -1,5 +1,2 @@
required: required:
- ELK_DOMAIN
- ELK_PORT
- ELK_USER
- ELK_PASS

0
test/00tapwrap.ts Normal file
View File

View File

@ -2,23 +2,23 @@ import { expect, tap } from '@pushrocks/tapbundle';
import { Qenv } from '@pushrocks/qenv'; import { Qenv } from '@pushrocks/qenv';
import * as elasticsearch from '../ts/index.js'; import * as elasticsearch from '../ts/index.js';
const testQenv = new Qenv('./', './.nogit/'); let testElasticLog: elasticsearch.ElsSmartlogDestination<any>;
let testElasticLog: elasticsearch.ElasticSearch<any>;
tap.test('first test', async () => { tap.test('first test', async () => {
testElasticLog = new elasticsearch.ElasticSearch({ testElasticLog = new elasticsearch.ElsSmartlogDestination({
indexPrefix: 'smartlog', indexPrefix: 'testprefix',
indexRetention: 7, indexRetention: 7,
domain: testQenv.getEnvVarOnDemand('ELK_DOMAIN'), node: 'http://localhost:9200',
port: parseInt(testQenv.getEnvVarOnDemand('ELK_PORT'), 10), auth: {
ssl: true, username: 'elastic',
password: 'YourPassword'
}
}); });
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElasticSearch); expect(testElasticLog).toBeInstanceOf(elasticsearch.ElsSmartlogDestination);
}); });
tap.test('should send a message to Elasticsearch', async () => { tap.test('should send a message to Elasticsearch', async () => {
testElasticLog.log({ await testElasticLog.log({
timestamp: Date.now(), timestamp: Date.now(),
type: 'increment', type: 'increment',
level: 'info', level: 'info',

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@apiclient.xyz/elasticsearch', name: '@apiclient.xyz/elasticsearch',
version: '1.0.38', version: '1.0.45',
description: 'log to elasticsearch in a kibana compatible format' description: 'log to elasticsearch in a kibana compatible format'
} }

View File

@ -1,102 +1,102 @@
import * as plugins from './elasticsearch.plugins.js'; import * as plugins from './elasticsearch.plugins.js';
import { ElasticSearch } from './elasticsearch.classes.elasticsearch.js'; import { ElsSmartlogDestination } from './els.classes.smartlogdestination.js';
import { type ILogPackage } from '@pushrocks/smartlog-interfaces'; import { type ILogPackage } from '@pushrocks/smartlog-interfaces';
import { Stringmap } from '@pushrocks/lik'; import { Stringmap } from '@pushrocks/lik';
export class ElasticIndex { export class ElasticIndex {
private stringmap = new Stringmap(); private stringmap = new Stringmap();
private elasticSearchRef: ElasticSearch<any>; private elasticSearchRef: ElsSmartlogDestination<any>;
constructor(elasticSearchInstanceArg: ElasticSearch<ILogPackage>) { constructor(elasticSearchInstanceArg: ElsSmartlogDestination<ILogPackage>) {
this.elasticSearchRef = elasticSearchInstanceArg; this.elasticSearchRef = elasticSearchInstanceArg;
} }
public async ensureIndex(indexArg: string) { public async ensureIndex(prefixArg: string, indexNameArg: string) {
const done = plugins.smartpromise.defer(); if (this.stringmap.checkString(indexNameArg)) {
if (this.stringmap.checkString(indexArg)) { return indexNameArg;
done.resolve();
return;
} }
this.elasticSearchRef.client.cat.indices(
{
format: 'json',
bytes: 'm',
},
async (err, responseArg: any[]) => {
if (err) {
console.log(err);
return;
}
// lets delete indexes that violate the retention const responseArg = await this.elasticSearchRef.client.cat.indices({
if (Array.isArray(responseArg)) { format: 'json',
const filteredIndices = responseArg.filter((indexObjectArg) => { bytes: 'm',
return indexObjectArg.index.startsWith('smartlog'); }).catch(err => {
}); console.log(err);
const filteredIndexNames = filteredIndices.map((indexObjectArg) => { });
return indexObjectArg.index;
});
this.deleteOldIndices(filteredIndexNames);
}
let index = null; if (!responseArg) {
throw new Error('Could not get valid response from elastic search');
}
if (Array.isArray(responseArg)) { if (Array.isArray(responseArg.body)) {
index = responseArg.find((indexObject) => { const filteredIndices = responseArg.body.filter((indexObjectArg) => {
return indexObject.index === indexArg; return indexObjectArg.index.startsWith(prefixArg);
}); });
} const filteredIndexNames = filteredIndices.map((indexObjectArg) => {
return indexObjectArg.index;
});
await this.deleteOldIndices(prefixArg, filteredIndexNames);
}
if (!index) { let index = null;
const done2 = plugins.smartpromise.defer();
this.elasticSearchRef.client.indices.create( if (Array.isArray(responseArg.body)) {
{ index = responseArg.body.find((indexItemArg) => {
waitForActiveShards: '1', return indexItemArg.index === indexNameArg;
index: indexArg, });
}, }
(error, response) => {
// console.lof(response) if (!index) {
done2.resolve(); await this.createNewIndex(indexNameArg);
} }
);
await done2.promise; this.stringmap.addString(indexNameArg);
} return index;
this.stringmap.addString(indexArg);
done.resolve();
}
);
await done.promise;
} }
public createNewIndex(indexNameArg: string) {} public async createNewIndex(indexNameArg: string) {
const response = await this.elasticSearchRef.client.indices.create({
wait_for_active_shards: '1',
index: indexNameArg,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
});
}
public async deleteOldIndices(indicesArray: string[]) { public async deleteOldIndices(prefixArg: string, indicesArray: string[]) {
const todayAsUnix: number = Date.now(); const todayAsUnix: number = Date.now();
const rententionPeriodAsUnix: number = plugins.smarttime.units.days( const rententionPeriodAsUnix: number = plugins.smarttime.units.days(
this.elasticSearchRef.indexRetention this.elasticSearchRef.indexRetention
); );
for (const indexName of indicesArray) { for (const indexName of indicesArray) {
const regexResult = /^smartlog-([0-9]*)\.([0-9]*)\.([0-9]*)$/.exec(indexName); if (!indexName.startsWith(prefixArg)) continue;
const indexRegex = new RegExp(`^${prefixArg}-([0-9]*)-([0-9]*)-([0-9]*)$`)
const regexResult = indexRegex.exec(indexName);
const dateAsUnix: number = new Date( const dateAsUnix: number = new Date(
`${regexResult[1]}-${regexResult[2]}-${regexResult[3]}` `${regexResult[1]}-${regexResult[2]}-${regexResult[3]}`
).getTime(); ).getTime();
if (todayAsUnix - rententionPeriodAsUnix > dateAsUnix) { if (todayAsUnix - rententionPeriodAsUnix > dateAsUnix) {
console.log(`found old index ${indexName}`); console.log(`found old index ${indexName}`);
const done2 = plugins.smartpromise.defer(); const response = await this.elasticSearchRef.client.indices.delete(
this.elasticSearchRef.client.indices.delete(
{ {
index: indexName, index: indexName,
}, }).catch(err => {
(err2, response2) => { console.log(err);
if (err2) { });
console.log(err2);
}
console.log(`deleted ${indexName}`);
done2.resolve();
}
);
await done2.promise;
} }
} }
} }

View File

@ -1,29 +1,48 @@
import { ElasticSearch, type IStandardLogParams } from './elasticsearch.classes.elasticsearch.js'; import { ElsSmartlogDestination, type IStandardLogParams } from './els.classes.smartlogdestination.js';
export class ElasticScheduler { export class ElasticScheduler {
elasticSearchRef: ElasticSearch<any>; elasticSearchRef: ElsSmartlogDestination<any>;
docsScheduled = false; docsScheduled = false;
docsStorage: any[] = []; docsStorage: any[] = [];
constructor(elasticLogRefArg: ElasticSearch<any>) { // maximum size of the buffer
maxBufferSize = 500;
constructor(elasticLogRefArg: ElsSmartlogDestination<any>) {
this.elasticSearchRef = elasticLogRefArg; this.elasticSearchRef = elasticLogRefArg;
} }
public addFailedDoc(objectArg: any | IStandardLogParams) { public addFailedDoc(objectArg: any | IStandardLogParams) {
this.docsStorage.push(objectArg); this.addToStorage(objectArg);
this.setRetry(); this.setRetry();
} }
public scheduleDoc(logObject: any) { public scheduleDoc(logObject: any) {
this.addToStorage(logObject);
}
private addToStorage(logObject: any) {
this.docsStorage.push(logObject); this.docsStorage.push(logObject);
// if buffer is full, send logs immediately
if (this.docsStorage.length >= this.maxBufferSize) {
this.flushLogsToElasticSearch();
}
}
private flushLogsToElasticSearch() {
const oldStorage = this.docsStorage;
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
} }
public setRetry() { public setRetry() {
setTimeout(() => { setTimeout(() => {
const oldStorage = this.docsStorage; this.flushLogsToElasticSearch();
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
if (this.docsStorage.length === 0) { if (this.docsStorage.length === 0) {
console.log('ElasticLog retry success!!!'); console.log('ElasticLog retry success!!!');
this.docsScheduled = false; this.docsScheduled = false;

View File

@ -1,4 +1,4 @@
import * as elasticsearch from 'elasticsearch'; import * as elasticsearch from '@elastic/elasticsearch';
import * as lik from '@pushrocks/lik'; import * as lik from '@pushrocks/lik';
import * as smartdelay from '@pushrocks/smartdelay'; import * as smartdelay from '@pushrocks/smartdelay';
import * as smartlogInterfaces from '@pushrocks/smartlog-interfaces'; import * as smartlogInterfaces from '@pushrocks/smartlog-interfaces';

View File

@ -1,5 +1,5 @@
// interfaces // interfaces
import { Client as ElasticClient } from 'elasticsearch'; import { Client as ElasticClient } from '@elastic/elasticsearch';
import type { ILogContext, ILogPackage, ILogDestination } from '@pushrocks/smartlog-interfaces'; import type { ILogContext, ILogPackage, ILogDestination } from '@pushrocks/smartlog-interfaces';
// other classes // other classes
@ -14,14 +14,14 @@ export interface IStandardLogParams {
export interface IElasticSearchConstructorOptions { export interface IElasticSearchConstructorOptions {
indexPrefix: string; indexPrefix: string;
indexRetention: number; indexRetention: number;
port: number; node: string;
domain: string; auth?: {
ssl: boolean; username: string;
user?: string; password: string;
pass?: string; };
} }
export class ElasticSearch<T> { export class ElsSmartlogDestination<T> {
public client: ElasticClient; public client: ElasticClient;
public elasticScheduler = new ElasticScheduler(this); public elasticScheduler = new ElasticScheduler(this);
public elasticIndex: ElasticIndex = new ElasticIndex(this); public elasticIndex: ElasticIndex = new ElasticIndex(this);
@ -35,49 +35,50 @@ export class ElasticSearch<T> {
*/ */
constructor(optionsArg: IElasticSearchConstructorOptions) { constructor(optionsArg: IElasticSearchConstructorOptions) {
this.client = new ElasticClient({ this.client = new ElasticClient({
host: this.computeHostString(optionsArg), node: optionsArg.node,
// log: 'trace' ...(optionsArg.auth && { auth: optionsArg.auth }),
}); });
this.indexPrefix = optionsArg.indexPrefix; this.indexPrefix = `${optionsArg.indexPrefix}`;
this.indexRetention = optionsArg.indexRetention; this.indexRetention = optionsArg.indexRetention;
} }
/**
* computes the host string from the constructor options
* @param optionsArg
*/
private computeHostString(optionsArg: IElasticSearchConstructorOptions): string {
let hostString = `${optionsArg.domain}:${optionsArg.port}`;
if (optionsArg.user && optionsArg.pass) {
hostString = `${optionsArg.user}:${optionsArg.pass}@${hostString}`;
}
if (optionsArg.ssl) {
hostString = `https://${hostString}`;
} else {
hostString = `http://${hostString}`;
}
console.log(hostString);
return hostString;
}
public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) { public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) {
const now = new Date(); const now = new Date();
const indexToUse = `${this.indexPrefix}-${now.getFullYear()}.${( const indexToUse = `${this.indexPrefix}-${now.toISOString().split('T')[0]}`;
'0' +
(now.getMonth() + 1)
).slice(-2)}.${('0' + now.getDate()).slice(-2)}`;
if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) { if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) {
this.elasticScheduler.scheduleDoc(logPackageArg); this.elasticScheduler.scheduleDoc(logPackageArg);
return; return;
} }
await this.elasticIndex.ensureIndex(indexToUse); // Make sure the index is created with a mapping for dynamic JSON
const indexExists = await this.client.indices.exists({ index: indexToUse });
if (!indexExists.body) {
await this.client.indices.create({
index: indexToUse,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
});
}
this.client.index( this.client.index(
{ {
index: indexToUse, index: indexToUse,
type: 'log',
body: { body: {
'@timestamp': new Date(logPackageArg.timestamp).toISOString(), '@timestamp': new Date(logPackageArg.timestamp).toISOString(),
...logPackageArg, ...logPackageArg,

View File

@ -1 +1 @@
export * from './elasticsearch.classes.elasticsearch.js'; export * from './els.classes.smartlogdestination.js';