Compare commits

...

14 Commits

Author SHA1 Message Date
2a0b0b2478 1.0.51 2023-08-14 13:09:21 +02:00
35e99663a4 fix(core): update 2023-08-14 13:09:20 +02:00
2cc5855206 1.0.50 2023-08-14 10:58:29 +02:00
8f9f2fdf05 fix(core): update 2023-08-14 10:58:28 +02:00
7ef36b5c40 1.0.49 2023-08-02 03:17:30 +02:00
67a8f3fe4d fix(core): update 2023-08-02 03:17:30 +02:00
5ae2c37519 1.0.48 2023-08-02 03:11:18 +02:00
fcb67ec878 fix(core): update 2023-08-02 03:11:17 +02:00
9e25494f8f 1.0.47 2023-08-01 15:05:46 +02:00
dd8ba4736a fix(core): update 2023-08-01 15:05:45 +02:00
d395310410 1.0.46 2023-08-01 12:38:54 +02:00
49233ce45f fix(core): update 2023-08-01 12:38:53 +02:00
fb93dce8bc 1.0.45 2023-08-01 12:24:22 +02:00
30cbc05aa2 fix(core): update 2023-08-01 12:24:22 +02:00
11 changed files with 395 additions and 24 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@apiclient.xyz/elasticsearch", "name": "@apiclient.xyz/elasticsearch",
"version": "1.0.44", "version": "1.0.51",
"private": false, "private": false,
"description": "log to elasticsearch in a kibana compatible format", "description": "log to elasticsearch in a kibana compatible format",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

0
test/00tapwrap.ts Normal file
View File

View File

@ -2,10 +2,11 @@ import { expect, tap } from '@pushrocks/tapbundle';
import { Qenv } from '@pushrocks/qenv'; import { Qenv } from '@pushrocks/qenv';
import * as elasticsearch from '../ts/index.js'; import * as elasticsearch from '../ts/index.js';
let testElasticLog: elasticsearch.ElasticSearch<any>; let testElasticLog: elasticsearch.ElsSmartlogDestination<any>;
let testElasticDoc: elasticsearch.ElasticDoc;
tap.test('first test', async () => { tap.test('first test', async () => {
testElasticLog = new elasticsearch.ElasticSearch({ testElasticLog = new elasticsearch.ElsSmartlogDestination({
indexPrefix: 'testprefix', indexPrefix: 'testprefix',
indexRetention: 7, indexRetention: 7,
node: 'http://localhost:9200', node: 'http://localhost:9200',
@ -14,7 +15,7 @@ tap.test('first test', async () => {
password: 'YourPassword' password: 'YourPassword'
} }
}); });
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElasticSearch); expect(testElasticLog).toBeInstanceOf(elasticsearch.ElsSmartlogDestination);
}); });
tap.test('should send a message to Elasticsearch', async () => { tap.test('should send a message to Elasticsearch', async () => {
@ -35,4 +36,42 @@ tap.test('should send a message to Elasticsearch', async () => {
}); });
}); });
tap.test('should create an ElasticDoc instance', async () => {
testElasticDoc = new elasticsearch.ElasticDoc({
index: 'testindex',
node: 'http://localhost:9200',
auth: {
username: 'elastic',
password: 'YourPassword'
}
});
expect(testElasticDoc).toBeInstanceOf(elasticsearch.ElasticDoc);
});
tap.test('should add and update documents in a piping session', async () => {
await testElasticDoc.startPipingSession();
await testElasticDoc.pipeDocument('1', { name: 'doc1' });
await testElasticDoc.pipeDocument('2', { name: 'doc2' });
await testElasticDoc.pipeDocument('1', { name: 'updated doc1' });
});
tap.test('should delete documents not part of the piping session', async () => {
await testElasticDoc.endPipingSession();
});
tap.test('should take and store snapshot', async () => {
await testElasticDoc.takeSnapshot(async (iterator, prevSnapshot) => {
const aggregationData = [];
for await (const doc of iterator) {
// Sample aggregation: counting documents
aggregationData.push(doc);
}
const snapshot = {
date: new Date().toISOString(),
aggregationData,
};
return snapshot;
});
});
tap.start(); tap.start();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@apiclient.xyz/elasticsearch', name: '@apiclient.xyz/elasticsearch',
version: '1.0.44', version: '1.0.51',
description: 'log to elasticsearch in a kibana compatible format' description: 'log to elasticsearch in a kibana compatible format'
} }

View File

@ -1,14 +1,13 @@
import * as plugins from './elasticsearch.plugins.js'; import * as plugins from './els.plugins.js';
import { ElasticSearch } from './elasticsearch.classes.elasticsearch.js'; import { ElsSmartlogDestination } from './els.classes.smartlogdestination.js';
import { type ILogPackage } from '@pushrocks/smartlog-interfaces'; import { type ILogPackage } from '@pushrocks/smartlog-interfaces';
import { Stringmap } from '@pushrocks/lik'; import { Stringmap } from '@pushrocks/lik';
export class ElasticIndex { export class ElasticIndex {
private stringmap = new Stringmap(); private stringmap = new Stringmap();
private elasticSearchRef: ElasticSearch<any>; private elasticSearchRef: ElsSmartlogDestination<any>;
constructor(elasticSearchInstanceArg: ElasticSearch<ILogPackage>) { constructor(elasticSearchInstanceArg: ElsSmartlogDestination<ILogPackage>) {
this.elasticSearchRef = elasticSearchInstanceArg; this.elasticSearchRef = elasticSearchInstanceArg;
} }
@ -16,6 +15,7 @@ export class ElasticIndex {
if (this.stringmap.checkString(indexNameArg)) { if (this.stringmap.checkString(indexNameArg)) {
return indexNameArg; return indexNameArg;
} }
const responseArg = await this.elasticSearchRef.client.cat.indices({ const responseArg = await this.elasticSearchRef.client.cat.indices({
format: 'json', format: 'json',
bytes: 'm', bytes: 'm',
@ -27,7 +27,6 @@ export class ElasticIndex {
throw new Error('Could not get valid response from elastic search'); throw new Error('Could not get valid response from elastic search');
} }
// lets delete indexes that violate the retention
if (Array.isArray(responseArg.body)) { if (Array.isArray(responseArg.body)) {
const filteredIndices = responseArg.body.filter((indexObjectArg) => { const filteredIndices = responseArg.body.filter((indexObjectArg) => {
return indexObjectArg.index.startsWith(prefixArg); return indexObjectArg.index.startsWith(prefixArg);
@ -49,6 +48,7 @@ export class ElasticIndex {
if (!index) { if (!index) {
await this.createNewIndex(indexNameArg); await this.createNewIndex(indexNameArg);
} }
this.stringmap.addString(indexNameArg); this.stringmap.addString(indexNameArg);
return index; return index;
} }
@ -57,6 +57,23 @@ export class ElasticIndex {
const response = await this.elasticSearchRef.client.indices.create({ const response = await this.elasticSearchRef.client.indices.create({
wait_for_active_shards: '1', wait_for_active_shards: '1',
index: indexNameArg, index: indexNameArg,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
}); });
} }

View File

@ -1,29 +1,48 @@
import { ElasticSearch, type IStandardLogParams } from './elasticsearch.classes.elasticsearch.js'; import { ElsSmartlogDestination, type IStandardLogParams } from './els.classes.smartlogdestination.js';
export class ElasticScheduler { export class ElasticScheduler {
elasticSearchRef: ElasticSearch<any>; elasticSearchRef: ElsSmartlogDestination<any>;
docsScheduled = false; docsScheduled = false;
docsStorage: any[] = []; docsStorage: any[] = [];
constructor(elasticLogRefArg: ElasticSearch<any>) { // maximum size of the buffer
maxBufferSize = 500;
constructor(elasticLogRefArg: ElsSmartlogDestination<any>) {
this.elasticSearchRef = elasticLogRefArg; this.elasticSearchRef = elasticLogRefArg;
} }
public addFailedDoc(objectArg: any | IStandardLogParams) { public addFailedDoc(objectArg: any | IStandardLogParams) {
this.docsStorage.push(objectArg); this.addToStorage(objectArg);
this.setRetry(); this.setRetry();
} }
public scheduleDoc(logObject: any) { public scheduleDoc(logObject: any) {
this.addToStorage(logObject);
}
private addToStorage(logObject: any) {
this.docsStorage.push(logObject); this.docsStorage.push(logObject);
// if buffer is full, send logs immediately
if (this.docsStorage.length >= this.maxBufferSize) {
this.flushLogsToElasticSearch();
}
}
private flushLogsToElasticSearch() {
const oldStorage = this.docsStorage;
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
} }
public setRetry() { public setRetry() {
setTimeout(() => { setTimeout(() => {
const oldStorage = this.docsStorage; this.flushLogsToElasticSearch();
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
if (this.docsStorage.length === 0) { if (this.docsStorage.length === 0) {
console.log('ElasticLog retry success!!!'); console.log('ElasticLog retry success!!!');
this.docsScheduled = false; this.docsScheduled = false;

View File

@ -0,0 +1,159 @@
import { Client as ElasticClient } from '@elastic/elasticsearch';
export interface IElasticDocConstructorOptions {
index: string;
node: string;
auth?: {
username: string;
password: string;
};
}
export interface ISnapshot {
date: string;
aggregationData: any;
}
export type SnapshotProcessor = (iterator: AsyncIterable<any>, prevSnapshot: ISnapshot | null) => Promise<ISnapshot>;
export class ElasticDoc {
public client: ElasticClient;
public index: string;
private sessionDocs: Set<string> = new Set();
private BATCH_SIZE = 1000;
constructor(options: IElasticDocConstructorOptions) {
this.client = new ElasticClient({
node: options.node,
...(options.auth && { auth: options.auth }),
});
this.index = options.index;
}
async startPipingSession() {
this.sessionDocs.clear();
}
async pipeDocument(docId: string, doc: any) {
await this.client.index({
index: this.index,
id: docId,
body: doc,
});
this.sessionDocs.add(docId);
}
async endPipingSession() {
const allDocIds: string[] = [];
const responseQueue = [];
let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE });
while (true) {
response.body.hits.hits.forEach((hit: any) => allDocIds.push(hit._id));
if (!response.body.hits.hits.length) {
break;
}
response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' });
}
for (const docId of allDocIds) {
if (!this.sessionDocs.has(docId)) {
responseQueue.push({
delete: {
_index: this.index,
_id: docId,
},
});
if (responseQueue.length >= this.BATCH_SIZE) {
await this.client.bulk({ refresh: true, body: responseQueue });
responseQueue.length = 0;
}
}
}
if (responseQueue.length > 0) {
await this.client.bulk({ refresh: true, body: responseQueue });
}
this.sessionDocs.clear();
}
async takeSnapshot(processIterator: SnapshotProcessor) {
const snapshotIndex = `${this.index}_snapshots`;
const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex });
if (!indexExists) {
await this.client.indices.create({
index: snapshotIndex,
body: {
mappings: {
properties: {
date: {
type: 'date'
},
aggregationData: {
type: 'object',
enabled: true
}
}
}
}
});
}
const documentIterator = this.getDocumentIterator();
const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot());
await this.storeSnapshot(newSnapshot);
}
private async getLastSnapshot(): Promise<ISnapshot | null> {
const snapshotIndex = `${this.index}_snapshots`;
const { body: indexExists } = await this.client.indices.exists({ index: snapshotIndex });
if (!indexExists) {
return null;
}
const response = await this.client.search({
index: snapshotIndex,
sort: 'date:desc',
size: 1
});
if (response.body.hits.hits.length > 0) {
const hit = response.body.hits.hits[0];
return {
date: hit._source.date,
aggregationData: hit._source.aggregationData,
};
} else {
return null;
}
}
private async *getDocumentIterator() {
let response = await this.client.search({ index: this.index, scroll: '1m', size: this.BATCH_SIZE });
while (true) {
for (const hit of response.body.hits.hits) {
yield hit._source;
}
if (!response.body.hits.hits.length) {
break;
}
response = await this.client.scroll({ scroll_id: response.body._scroll_id, scroll: '1m' });
}
}
private async storeSnapshot(snapshot: ISnapshot) {
await this.client.index({
index: `${this.index}_snapshots`,
body: snapshot,
});
}
}

111
ts/els.classes.kvstore.ts Normal file
View File

@ -0,0 +1,111 @@
import * as plugins from './els.plugins.js';
import { Client as ElasticClient } from '@elastic/elasticsearch';
export interface IElasticKVStoreConstructorOptions {
index: string;
node: string;
auth?: {
username: string;
password: string;
};
}
export class ElasticKVStore {
public client: ElasticClient;
public index: string;
private readyDeferred: any;
constructor(options: IElasticKVStoreConstructorOptions) {
this.client = new ElasticClient({
node: options.node,
...(options.auth && { auth: options.auth }),
});
this.index = options.index;
this.readyDeferred = plugins.smartpromise.defer();
this.setupIndex();
}
private async setupIndex() {
try {
const { body: indexExists } = await this.client.indices.exists({ index: this.index });
if (!indexExists) {
await this.client.indices.create({
index: this.index,
body: {
mappings: {
properties: {
key: {
type: 'keyword'
},
value: {
type: 'text'
}
}
}
}
});
}
this.readyDeferred.resolve();
} catch (err) {
this.readyDeferred.reject(err);
}
}
async set(key: string, value: string) {
await this.readyDeferred.promise;
await this.client.index({
index: this.index,
id: key,
body: {
key,
value
}
});
}
async get(key: string): Promise<string | null> {
await this.readyDeferred.promise;
try {
const response = await this.client.get({
index: this.index,
id: key
});
return response.body._source.value;
} catch (error) {
if (error.meta && error.meta.statusCode === 404) {
return null;
}
throw error;
}
}
async delete(key: string) {
await this.readyDeferred.promise;
try {
await this.client.delete({
index: this.index,
id: key
});
} catch (error) {
if (error.meta && error.meta.statusCode !== 404) {
throw error;
}
}
}
async clear() {
await this.readyDeferred.promise;
await this.client.deleteByQuery({
index: this.index,
body: {
query: {
match_all: {}
}
}
});
}
}

View File

@ -21,7 +21,7 @@ export interface IElasticSearchConstructorOptions {
}; };
} }
export class ElasticSearch<T> { export class ElsSmartlogDestination<T> {
public client: ElasticClient; public client: ElasticClient;
public elasticScheduler = new ElasticScheduler(this); public elasticScheduler = new ElasticScheduler(this);
public elasticIndex: ElasticIndex = new ElasticIndex(this); public elasticIndex: ElasticIndex = new ElasticIndex(this);
@ -38,7 +38,7 @@ export class ElasticSearch<T> {
node: optionsArg.node, node: optionsArg.node,
...(optionsArg.auth && { auth: optionsArg.auth }), ...(optionsArg.auth && { auth: optionsArg.auth }),
}); });
this.indexPrefix = optionsArg.indexPrefix; this.indexPrefix = `${optionsArg.indexPrefix}`;
this.indexRetention = optionsArg.indexRetention; this.indexRetention = optionsArg.indexRetention;
} }
@ -51,7 +51,31 @@ export class ElasticSearch<T> {
return; return;
} }
const resultIndexName = await this.elasticIndex.ensureIndex(this.indexPrefix, indexToUse); // Make sure the index is created with a mapping for dynamic JSON
const indexExists = await this.client.indices.exists({ index: indexToUse });
if (!indexExists.body) {
await this.client.indices.create({
index: indexToUse,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
});
}
this.client.index( this.client.index(
{ {
index: indexToUse, index: indexToUse,

View File

@ -1 +1,3 @@
export * from './elasticsearch.classes.elasticsearch.js'; export * from './els.classes.smartlogdestination.js';
export * from './els.classes.elasticdoc.js';
export * from './els.classes.kvstore.js';