Compare commits
84 Commits
Author | SHA1 | Date | |
---|---|---|---|
36d7cb69a3 | |||
4924e0a151 | |||
cd98529541 | |||
e6a9282987 | |||
9dcadcd611 | |||
4b045ff988 | |||
023dd1b519 | |||
4971385eae | |||
e209839962 | |||
e44365b674 | |||
bd154089c3 | |||
0be693da60 | |||
040c93dec3 | |||
21e55bd341 | |||
f1d04fe63c | |||
49c4660131 | |||
e5fd0361fc | |||
d6a291d8d4 | |||
fc87fd7ab7 | |||
203444d1a6 | |||
cdbf1fd316 | |||
10108d8338 | |||
36abb2c7c0 | |||
8f00d90bb1 | |||
41f1758d46 | |||
2b7cd33996 | |||
80a04ca893 | |||
93f739c79e | |||
a1b5bf5c0c | |||
084c5d137c | |||
4aa0592bd5 | |||
0313e5045a | |||
7442d93f58 | |||
35e70aae62 | |||
cc30f43a28 | |||
100a8fc12e | |||
f32403961d | |||
9734949241 | |||
b70444824b | |||
0eb0903667 | |||
4d11dca22c | |||
3079adbbd9 | |||
bc9de8e4d6 | |||
3fa7d66236 | |||
2a0b0b2478 | |||
35e99663a4 | |||
2cc5855206 | |||
8f9f2fdf05 | |||
7ef36b5c40 | |||
67a8f3fe4d | |||
5ae2c37519 | |||
fcb67ec878 | |||
9e25494f8f | |||
dd8ba4736a | |||
d395310410 | |||
49233ce45f | |||
fb93dce8bc | |||
30cbc05aa2 | |||
2a595a1a9a | |||
d62b18e93c | |||
d6176f820a | |||
0ebc1d5288 | |||
2b0003546a | |||
60617f2fca | |||
9c767d07e4 | |||
f3aa94dcb7 | |||
a0be0edd9d | |||
ad24ba2f5d | |||
b0cf4bb27f | |||
fd29ceab80 | |||
bcca434a24 | |||
d4a9ad8f67 | |||
d4c7c33668 | |||
8340257b00 | |||
32265e83f3 | |||
e2df11cea2 | |||
2719ba28f6 | |||
6d78a7ba0c | |||
5897c6e7de | |||
20369614a2 | |||
7ceaf694fe | |||
391c6bd45d | |||
1a702071c6 | |||
0fe2f6a4ae |
@ -12,29 +12,38 @@ stages:
|
|||||||
- release
|
- release
|
||||||
- metadata
|
- metadata
|
||||||
|
|
||||||
|
before_script:
|
||||||
|
- pnpm install -g pnpm
|
||||||
|
- pnpm install -g @shipzone/npmci
|
||||||
|
- npmci npm prepare
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# security stage
|
# security stage
|
||||||
# ====================
|
# ====================
|
||||||
mirror:
|
# ====================
|
||||||
|
# security stage
|
||||||
|
# ====================
|
||||||
|
auditProductionDependencies:
|
||||||
|
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||||
stage: security
|
stage: security
|
||||||
script:
|
script:
|
||||||
- npmci git mirror
|
- npmci command npm config set registry https://registry.npmjs.org
|
||||||
|
- npmci command pnpm audit --audit-level=high --prod
|
||||||
tags:
|
tags:
|
||||||
- lossless
|
- lossless
|
||||||
- docker
|
- docker
|
||||||
- notpriv
|
allow_failure: true
|
||||||
|
|
||||||
snyk:
|
auditDevDependencies:
|
||||||
|
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||||
stage: security
|
stage: security
|
||||||
script:
|
script:
|
||||||
- npmci npm prepare
|
- npmci command npm config set registry https://registry.npmjs.org
|
||||||
- npmci command npm install -g snyk
|
- npmci command pnpm audit --audit-level=high --dev
|
||||||
- npmci command npm install --ignore-scripts
|
|
||||||
- npmci command snyk test
|
|
||||||
tags:
|
tags:
|
||||||
- lossless
|
- lossless
|
||||||
- docker
|
- docker
|
||||||
- notpriv
|
allow_failure: true
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# test stage
|
# test stage
|
||||||
@ -43,33 +52,24 @@ snyk:
|
|||||||
testStable:
|
testStable:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
- npmci npm prepare
|
|
||||||
- npmci node install stable
|
|
||||||
- npmci npm install
|
- npmci npm install
|
||||||
- npmci npm test
|
- npmci npm test
|
||||||
coverage: /\d+.?\d+?\%\s*coverage/
|
coverage: /\d+.?\d+?\%\s*coverage/
|
||||||
tags:
|
tags:
|
||||||
- lossless
|
|
||||||
- docker
|
- docker
|
||||||
- priv
|
|
||||||
|
|
||||||
testBuild:
|
testBuild:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
- npmci npm prepare
|
|
||||||
- npmci node install stable
|
|
||||||
- npmci npm install
|
- npmci npm install
|
||||||
- npmci command npm run build
|
- npmci npm build
|
||||||
coverage: /\d+.?\d+?\%\s*coverage/
|
coverage: /\d+.?\d+?\%\s*coverage/
|
||||||
tags:
|
tags:
|
||||||
- lossless
|
|
||||||
- docker
|
- docker
|
||||||
- notpriv
|
|
||||||
|
|
||||||
release:
|
release:
|
||||||
stage: release
|
stage: release
|
||||||
script:
|
script:
|
||||||
- npmci node install stable
|
|
||||||
- npmci npm publish
|
- npmci npm publish
|
||||||
only:
|
only:
|
||||||
- tags
|
- tags
|
||||||
@ -84,11 +84,12 @@ release:
|
|||||||
codequality:
|
codequality:
|
||||||
stage: metadata
|
stage: metadata
|
||||||
allow_failure: true
|
allow_failure: true
|
||||||
|
only:
|
||||||
|
- tags
|
||||||
script:
|
script:
|
||||||
- npmci command npm install -g tslint typescript
|
- npmci command npm install -g typescript
|
||||||
- npmci npm prepare
|
- npmci npm prepare
|
||||||
- npmci npm install
|
- npmci npm install
|
||||||
- npmci command "tslint -c tslint.json ./ts/**/*.ts"
|
|
||||||
tags:
|
tags:
|
||||||
- lossless
|
- lossless
|
||||||
- docker
|
- docker
|
||||||
@ -108,11 +109,9 @@ trigger:
|
|||||||
pages:
|
pages:
|
||||||
stage: metadata
|
stage: metadata
|
||||||
script:
|
script:
|
||||||
- npmci node install lts
|
- npmci node install stable
|
||||||
- npmci command npm install -g @gitzone/tsdoc
|
|
||||||
- npmci npm prepare
|
|
||||||
- npmci npm install
|
- npmci npm install
|
||||||
- npmci command tsdoc
|
- npmci command npm run buildDocs
|
||||||
tags:
|
tags:
|
||||||
- lossless
|
- lossless
|
||||||
- docker
|
- docker
|
||||||
|
31
package.json
31
package.json
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@mojoio/elasticsearch",
|
"name": "@apiclient.xyz/elasticsearch",
|
||||||
"version": "1.0.31",
|
"version": "2.0.16",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "log to elasticsearch in a kibana compatible format",
|
"description": "log to elasticsearch in a kibana compatible format",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
@ -10,25 +10,24 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "(tstest test/)",
|
"test": "(tstest test/)",
|
||||||
"format": "(gitzone format)",
|
"format": "(gitzone format)",
|
||||||
"build": "(tsbuild)",
|
"build": "(tsbuild --allowimplicitany)",
|
||||||
"buildDocs": "tsdoc"
|
"buildDocs": "tsdoc"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@gitzone/tsbuild": "^2.1.66",
|
"@git.zone/tsbuild": "^2.1.70",
|
||||||
"@gitzone/tsrun": "^1.2.42",
|
"@git.zone/tsrun": "^1.2.46",
|
||||||
"@gitzone/tstest": "^1.0.74",
|
"@git.zone/tstest": "^1.0.80",
|
||||||
"@pushrocks/qenv": "^5.0.2",
|
"@push.rocks/qenv": "^6.0.2",
|
||||||
"@pushrocks/tapbundle": "^5.0.8",
|
"@push.rocks/tapbundle": "^5.0.15",
|
||||||
"@types/node": "^20.3.3"
|
"@types/node": "^20.5.7"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@pushrocks/lik": "^6.0.2",
|
"@elastic/elasticsearch": "8.9.0",
|
||||||
"@pushrocks/smartdelay": "^3.0.1",
|
"@push.rocks/lik": "^6.0.5",
|
||||||
"@pushrocks/smartlog-interfaces": "^3.0.0",
|
"@push.rocks/smartdelay": "^3.0.5",
|
||||||
"@pushrocks/smartpromise": "^4.0.2",
|
"@push.rocks/smartlog-interfaces": "^3.0.0",
|
||||||
"@pushrocks/smarttime": "^4.0.1",
|
"@push.rocks/smartpromise": "^4.0.2",
|
||||||
"@types/elasticsearch": "^5.0.40",
|
"@push.rocks/smarttime": "^4.0.5"
|
||||||
"elasticsearch": "^16.7.3"
|
|
||||||
},
|
},
|
||||||
"files": [
|
"files": [
|
||||||
"ts/**/*",
|
"ts/**/*",
|
||||||
|
2267
pnpm-lock.yaml
generated
2267
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
5
qenv.yml
5
qenv.yml
@ -1,5 +1,2 @@
|
|||||||
required:
|
required:
|
||||||
- ELK_DOMAIN
|
|
||||||
- ELK_PORT
|
|
||||||
- ELK_USER
|
|
||||||
- ELK_PASS
|
|
@ -37,7 +37,6 @@ For further information read the linked docs at the top of this README.
|
|||||||
|
|
||||||
We are always happy for code contributions. If you are not the code contributing type that is ok. + Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: [Contribute monthly :)](https://lossless.link/contribute)
|
We are always happy for code contributions. If you are not the code contributing type that is ok. + Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: [Contribute monthly :)](https://lossless.link/contribute)
|
||||||
|
|
||||||
|
|
||||||
## Contribution
|
## Contribution
|
||||||
|
|
||||||
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)
|
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)
|
||||||
|
0
test/00tapwrap.ts
Normal file
0
test/00tapwrap.ts
Normal file
89
test/test.nonci.ts
Normal file
89
test/test.nonci.ts
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
import { expect, tap } from '@push.rocks/tapbundle';
|
||||||
|
import { Qenv } from '@push.rocks/qenv';
|
||||||
|
import * as elasticsearch from '../ts/index.js';
|
||||||
|
|
||||||
|
let testElasticLog: elasticsearch.ElsSmartlogDestination<any>;
|
||||||
|
let testElasticDoc: elasticsearch.ElasticDoc;
|
||||||
|
|
||||||
|
tap.test('first test', async () => {
|
||||||
|
testElasticLog = new elasticsearch.ElsSmartlogDestination({
|
||||||
|
indexPrefix: 'testprefix',
|
||||||
|
indexRetention: 7,
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
auth: {
|
||||||
|
username: 'elastic',
|
||||||
|
password: 'YourPassword'
|
||||||
|
}
|
||||||
|
});
|
||||||
|
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElsSmartlogDestination);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should send a message to Elasticsearch', async () => {
|
||||||
|
await testElasticLog.log({
|
||||||
|
timestamp: Date.now(),
|
||||||
|
type: 'increment',
|
||||||
|
level: 'info',
|
||||||
|
context: {
|
||||||
|
company: 'Lossless GmbH',
|
||||||
|
companyunit: 'lossless.cloud',
|
||||||
|
containerName: 'testcontainer',
|
||||||
|
environment: 'test',
|
||||||
|
runtime: 'node',
|
||||||
|
zone: 'ship.zone',
|
||||||
|
},
|
||||||
|
message: 'GET https://myroute.to.a.cool.destination/sorare?hello=there',
|
||||||
|
correlation: null,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should create an ElasticDoc instance', async () => {
|
||||||
|
testElasticDoc = new elasticsearch.ElasticDoc({
|
||||||
|
index: 'testindex',
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
auth: {
|
||||||
|
username: 'elastic',
|
||||||
|
password: 'YourPassword'
|
||||||
|
}
|
||||||
|
});
|
||||||
|
expect(testElasticDoc).toBeInstanceOf(elasticsearch.ElasticDoc);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should add and update documents in a piping session', async () => {
|
||||||
|
await testElasticDoc.startPipingSession({});
|
||||||
|
await testElasticDoc.pipeDocument({
|
||||||
|
docId: '1',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
doc: { name: 'doc1' }
|
||||||
|
});
|
||||||
|
await testElasticDoc.pipeDocument({
|
||||||
|
docId: '2',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
doc: { name: 'doc2' }
|
||||||
|
});
|
||||||
|
await testElasticDoc.pipeDocument({
|
||||||
|
docId: '1',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
doc: { name: 'updated doc1' }
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should delete documents not part of the piping session', async () => {
|
||||||
|
await testElasticDoc.endPipingSession();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should take and store snapshot', async () => {
|
||||||
|
await testElasticDoc.takeSnapshot(async (iterator, prevSnapshot) => {
|
||||||
|
const aggregationData = [];
|
||||||
|
for await (const doc of iterator) {
|
||||||
|
// Sample aggregation: counting documents
|
||||||
|
aggregationData.push(doc);
|
||||||
|
}
|
||||||
|
const snapshot = {
|
||||||
|
date: new Date().toISOString(),
|
||||||
|
aggregationData,
|
||||||
|
};
|
||||||
|
return snapshot;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.start();
|
38
test/test.ts
38
test/test.ts
@ -1,38 +0,0 @@
|
|||||||
import { expect, tap } from '@pushrocks/tapbundle';
|
|
||||||
import { Qenv } from '@pushrocks/qenv';
|
|
||||||
import * as elasticsearch from '../ts/index.js';
|
|
||||||
|
|
||||||
const testQenv = new Qenv('./', './.nogit/');
|
|
||||||
|
|
||||||
let testElasticLog: elasticsearch.ElasticSearch<any>;
|
|
||||||
|
|
||||||
tap.test('first test', async () => {
|
|
||||||
testElasticLog = new elasticsearch.ElasticSearch({
|
|
||||||
indexPrefix: 'smartlog',
|
|
||||||
indexRetention: 7,
|
|
||||||
domain: testQenv.getEnvVarOnDemand('ELK_DOMAIN'),
|
|
||||||
port: parseInt(testQenv.getEnvVarOnDemand('ELK_PORT'), 10),
|
|
||||||
ssl: true,
|
|
||||||
});
|
|
||||||
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElasticSearch);
|
|
||||||
});
|
|
||||||
|
|
||||||
tap.test('should send a message to Elasticsearch', async () => {
|
|
||||||
testElasticLog.log({
|
|
||||||
timestamp: Date.now(),
|
|
||||||
type: 'increment',
|
|
||||||
level: 'info',
|
|
||||||
context: {
|
|
||||||
company: 'Lossless GmbH',
|
|
||||||
companyunit: 'lossless.cloud',
|
|
||||||
containerName: 'testcontainer',
|
|
||||||
environment: 'test',
|
|
||||||
runtime: 'node',
|
|
||||||
zone: 'ship.zone',
|
|
||||||
},
|
|
||||||
message: 'GET https://myroute.to.a.cool.destination/sorare?hello=there',
|
|
||||||
correlation: null,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
tap.start();
|
|
@ -2,7 +2,7 @@
|
|||||||
* autocreated commitinfo by @pushrocks/commitinfo
|
* autocreated commitinfo by @pushrocks/commitinfo
|
||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@mojoio/elasticsearch',
|
name: '@apiclient.xyz/elasticsearch',
|
||||||
version: '1.0.31',
|
version: '2.0.16',
|
||||||
description: 'log to elasticsearch in a kibana compatible format'
|
description: 'log to elasticsearch in a kibana compatible format'
|
||||||
}
|
}
|
||||||
|
@ -1,103 +0,0 @@
|
|||||||
import * as plugins from './elasticsearch.plugins';
|
|
||||||
import { ElasticSearch } from './elasticsearch.classes.elasticsearch';
|
|
||||||
import { ILogPackage } from '@pushrocks/smartlog-interfaces';
|
|
||||||
|
|
||||||
import { Stringmap } from '@pushrocks/lik';
|
|
||||||
|
|
||||||
export class ElasticIndex {
|
|
||||||
private stringmap = new Stringmap();
|
|
||||||
private elasticSearchRef: ElasticSearch<any>;
|
|
||||||
|
|
||||||
constructor(elasticSearchInstanceArg: ElasticSearch<ILogPackage>) {
|
|
||||||
this.elasticSearchRef = elasticSearchInstanceArg;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async ensureIndex(indexArg: string) {
|
|
||||||
const done = plugins.smartpromise.defer();
|
|
||||||
if (this.stringmap.checkString(indexArg)) {
|
|
||||||
done.resolve();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.elasticSearchRef.client.cat.indices(
|
|
||||||
{
|
|
||||||
format: 'json',
|
|
||||||
bytes: 'm',
|
|
||||||
},
|
|
||||||
async (err, responseArg: any[]) => {
|
|
||||||
if (err) {
|
|
||||||
console.log(err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// lets delete indexes that violate the retention
|
|
||||||
if (Array.isArray(responseArg)) {
|
|
||||||
const filteredIndices = responseArg.filter((indexObjectArg) => {
|
|
||||||
return indexObjectArg.index.startsWith('smartlog');
|
|
||||||
});
|
|
||||||
const filteredIndexNames = filteredIndices.map((indexObjectArg) => {
|
|
||||||
return indexObjectArg.index;
|
|
||||||
});
|
|
||||||
this.deleteOldIndices(filteredIndexNames);
|
|
||||||
}
|
|
||||||
|
|
||||||
let index = null;
|
|
||||||
|
|
||||||
if (Array.isArray(responseArg)) {
|
|
||||||
index = responseArg.find((indexObject) => {
|
|
||||||
return indexObject.index === indexArg;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!index) {
|
|
||||||
const done2 = plugins.smartpromise.defer();
|
|
||||||
this.elasticSearchRef.client.indices.create(
|
|
||||||
{
|
|
||||||
waitForActiveShards: '1',
|
|
||||||
index: indexArg,
|
|
||||||
},
|
|
||||||
(error, response) => {
|
|
||||||
// console.lof(response)
|
|
||||||
done2.resolve();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
await done2.promise;
|
|
||||||
}
|
|
||||||
this.stringmap.addString(indexArg);
|
|
||||||
done.resolve();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
await done.promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
public createNewIndex(indexNameArg: string) {}
|
|
||||||
|
|
||||||
public async deleteOldIndices(indicesArray: string[]) {
|
|
||||||
const todayAsUnix: number = Date.now();
|
|
||||||
const rententionPeriodAsUnix: number = plugins.smarttime.units.days(
|
|
||||||
this.elasticSearchRef.indexRetention
|
|
||||||
);
|
|
||||||
for (const indexName of indicesArray) {
|
|
||||||
const regexResult = /^smartlog-([0-9]*)\.([0-9]*)\.([0-9]*)$/.exec(indexName);
|
|
||||||
const dateAsUnix: number = new Date(
|
|
||||||
`${regexResult[1]}-${regexResult[2]}-${regexResult[3]}`
|
|
||||||
).getTime();
|
|
||||||
if (todayAsUnix - rententionPeriodAsUnix > dateAsUnix) {
|
|
||||||
console.log(`found old index ${indexName}`);
|
|
||||||
const done2 = plugins.smartpromise.defer();
|
|
||||||
this.elasticSearchRef.client.indices.delete(
|
|
||||||
{
|
|
||||||
index: indexName,
|
|
||||||
},
|
|
||||||
(err2, response2) => {
|
|
||||||
if (err2) {
|
|
||||||
console.log(err2);
|
|
||||||
}
|
|
||||||
console.log(`deleted ${indexName}`);
|
|
||||||
done2.resolve();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
await done2.promise;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,44 +0,0 @@
|
|||||||
import { ElasticSearch, type IStandardLogParams } from './elasticsearch.classes.elasticsearch.js';
|
|
||||||
|
|
||||||
export class ElasticScheduler {
|
|
||||||
elasticSearchRef: ElasticSearch<any>;
|
|
||||||
docsScheduled = false;
|
|
||||||
docsStorage: any[] = [];
|
|
||||||
|
|
||||||
constructor(elasticLogRefArg: ElasticSearch<any>) {
|
|
||||||
this.elasticSearchRef = elasticLogRefArg;
|
|
||||||
}
|
|
||||||
|
|
||||||
public addFailedDoc(objectArg: any | IStandardLogParams) {
|
|
||||||
this.docsStorage.push(objectArg);
|
|
||||||
this.setRetry();
|
|
||||||
}
|
|
||||||
public scheduleDoc(logObject: any) {
|
|
||||||
this.docsStorage.push(logObject);
|
|
||||||
}
|
|
||||||
|
|
||||||
public setRetry() {
|
|
||||||
setTimeout(() => {
|
|
||||||
const oldStorage = this.docsStorage;
|
|
||||||
this.docsStorage = [];
|
|
||||||
for (let logObject of oldStorage) {
|
|
||||||
this.elasticSearchRef.log(logObject, true);
|
|
||||||
}
|
|
||||||
if (this.docsStorage.length === 0) {
|
|
||||||
console.log('ElasticLog retry success!!!');
|
|
||||||
this.docsScheduled = false;
|
|
||||||
} else {
|
|
||||||
console.log('ElasticLog retry failed');
|
|
||||||
this.setRetry();
|
|
||||||
}
|
|
||||||
}, 5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
public deferSend() {
|
|
||||||
if (!this.docsScheduled) {
|
|
||||||
console.log('Retry ElasticLog in 5 seconds!');
|
|
||||||
this.docsScheduled = true;
|
|
||||||
this.setRetry();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,105 +0,0 @@
|
|||||||
// interfaces
|
|
||||||
import { Client as ElasticClient } from 'elasticsearch';
|
|
||||||
import type { ILogContext, ILogPackage, ILogDestination } from '@pushrocks/smartlog-interfaces';
|
|
||||||
|
|
||||||
// other classes
|
|
||||||
import { ElasticScheduler } from './elasticsearch.classes.elasticscheduler.js';
|
|
||||||
import { ElasticIndex } from './elasticsearch.classes.elasticindex.js';
|
|
||||||
|
|
||||||
export interface IStandardLogParams {
|
|
||||||
message: string;
|
|
||||||
severity: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface IElasticSearchConstructorOptions {
|
|
||||||
indexPrefix: string;
|
|
||||||
indexRetention: number;
|
|
||||||
port: number;
|
|
||||||
domain: string;
|
|
||||||
ssl: boolean;
|
|
||||||
user?: string;
|
|
||||||
pass?: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export class ElasticSearch<T> {
|
|
||||||
public client: ElasticClient;
|
|
||||||
public elasticScheduler = new ElasticScheduler(this);
|
|
||||||
public elasticIndex: ElasticIndex = new ElasticIndex(this);
|
|
||||||
|
|
||||||
public indexPrefix: string;
|
|
||||||
public indexRetention: number;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* sets up an instance of Elastic log
|
|
||||||
* @param optionsArg
|
|
||||||
*/
|
|
||||||
constructor(optionsArg: IElasticSearchConstructorOptions) {
|
|
||||||
this.client = new ElasticClient({
|
|
||||||
host: this.computeHostString(optionsArg),
|
|
||||||
// log: 'trace'
|
|
||||||
});
|
|
||||||
this.indexPrefix = optionsArg.indexPrefix;
|
|
||||||
this.indexRetention = optionsArg.indexRetention;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* computes the host string from the constructor options
|
|
||||||
* @param optionsArg
|
|
||||||
*/
|
|
||||||
private computeHostString(optionsArg: IElasticSearchConstructorOptions): string {
|
|
||||||
let hostString = `${optionsArg.domain}:${optionsArg.port}`;
|
|
||||||
if (optionsArg.user && optionsArg.pass) {
|
|
||||||
hostString = `${optionsArg.user}:${optionsArg.pass}@${hostString}`;
|
|
||||||
}
|
|
||||||
if (optionsArg.ssl) {
|
|
||||||
hostString = `https://${hostString}`;
|
|
||||||
} else {
|
|
||||||
hostString = `http://${hostString}`;
|
|
||||||
}
|
|
||||||
console.log(hostString);
|
|
||||||
return hostString;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) {
|
|
||||||
const now = new Date();
|
|
||||||
const indexToUse = `${this.indexPrefix}-${now.getFullYear()}.${(
|
|
||||||
'0' +
|
|
||||||
(now.getMonth() + 1)
|
|
||||||
).slice(-2)}.${('0' + now.getDate()).slice(-2)}`;
|
|
||||||
|
|
||||||
if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) {
|
|
||||||
this.elasticScheduler.scheduleDoc(logPackageArg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.elasticIndex.ensureIndex(indexToUse);
|
|
||||||
|
|
||||||
this.client.index(
|
|
||||||
{
|
|
||||||
index: indexToUse,
|
|
||||||
type: 'log',
|
|
||||||
body: {
|
|
||||||
'@timestamp': new Date(logPackageArg.timestamp).toISOString(),
|
|
||||||
...logPackageArg,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
(error, response) => {
|
|
||||||
if (error) {
|
|
||||||
console.log('ElasticLog encountered an error:');
|
|
||||||
console.log(error);
|
|
||||||
this.elasticScheduler.addFailedDoc(logPackageArg);
|
|
||||||
} else {
|
|
||||||
// console.log(`ElasticLog: ${logPackageArg.message}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
get logDestination(): ILogDestination {
|
|
||||||
return {
|
|
||||||
handleLog: async (smartlogPackageArg: ILogPackage) => {
|
|
||||||
this.log(smartlogPackageArg);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,8 +0,0 @@
|
|||||||
import * as elasticsearch from 'elasticsearch';
|
|
||||||
import * as lik from '@pushrocks/lik';
|
|
||||||
import * as smartdelay from '@pushrocks/smartdelay';
|
|
||||||
import * as smartlogInterfaces from '@pushrocks/smartlog-interfaces';
|
|
||||||
import * as smartpromise from '@pushrocks/smartpromise';
|
|
||||||
import * as smarttime from '@pushrocks/smarttime';
|
|
||||||
|
|
||||||
export { elasticsearch, lik, smartdelay, smartlogInterfaces, smartpromise, smarttime };
|
|
249
ts/els.classes.elasticdoc.ts
Normal file
249
ts/els.classes.elasticdoc.ts
Normal file
@ -0,0 +1,249 @@
|
|||||||
|
import { Client as ElasticClient } from '@elastic/elasticsearch';
|
||||||
|
|
||||||
|
export interface IElasticDocConstructorOptions {
|
||||||
|
index: string;
|
||||||
|
node: string;
|
||||||
|
auth?: {
|
||||||
|
username: string;
|
||||||
|
password: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ISnapshot {
|
||||||
|
date: string;
|
||||||
|
aggregationData: any;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type SnapshotProcessor = (
|
||||||
|
iterator: AsyncIterable<any>,
|
||||||
|
prevSnapshot: ISnapshot | null
|
||||||
|
) => Promise<ISnapshot>;
|
||||||
|
|
||||||
|
export class ElasticDoc {
|
||||||
|
public client: ElasticClient;
|
||||||
|
public index: string;
|
||||||
|
private sessionDocs: Set<string> = new Set();
|
||||||
|
private indexInitialized: boolean = false;
|
||||||
|
private latestTimestamp: string | null = null; // Store the latest timestamp
|
||||||
|
private onlyNew: boolean = false; // Whether to only pipe new docs
|
||||||
|
public fastForward: boolean = false; // Whether to fast forward to the latest timestamp
|
||||||
|
|
||||||
|
private BATCH_SIZE = 1000;
|
||||||
|
|
||||||
|
constructor(options: IElasticDocConstructorOptions) {
|
||||||
|
this.client = new ElasticClient({
|
||||||
|
node: options.node,
|
||||||
|
...(options.auth && { auth: options.auth }),
|
||||||
|
});
|
||||||
|
this.index = options.index;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async ensureIndexExists(doc: any) {
|
||||||
|
if (!this.indexInitialized) {
|
||||||
|
const indexExists = await this.client.indices.exists({ index: this.index });
|
||||||
|
if (!indexExists) {
|
||||||
|
const mappings = this.createMappingsFromDoc(doc);
|
||||||
|
await this.client.indices.create({
|
||||||
|
index: this.index,
|
||||||
|
body: {
|
||||||
|
// mappings,
|
||||||
|
settings: {
|
||||||
|
// You can define the settings according to your requirements here
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.indexInitialized = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private createMappingsFromDoc(doc: any): any {
|
||||||
|
const properties: any = {};
|
||||||
|
for (const key in doc) {
|
||||||
|
if (key === '@timestamp') {
|
||||||
|
properties[key] = { type: 'date' };
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
properties[key] = { type: typeof doc[key] === 'number' ? 'float' : 'text' };
|
||||||
|
}
|
||||||
|
return { properties };
|
||||||
|
}
|
||||||
|
|
||||||
|
async startPipingSession(options: { onlyNew?: boolean }) {
|
||||||
|
this.sessionDocs.clear();
|
||||||
|
this.onlyNew = options.onlyNew;
|
||||||
|
const indexExists = await this.client.indices.exists({ index: this.index });
|
||||||
|
if (this.onlyNew && indexExists) {
|
||||||
|
const response = await this.client.search({
|
||||||
|
index: this.index,
|
||||||
|
sort: '@timestamp:desc',
|
||||||
|
size: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
// If the search query succeeded, the index exists.
|
||||||
|
const hit = response.hits.hits[0];
|
||||||
|
this.latestTimestamp = hit?._source?.['@timestamp'] || null;
|
||||||
|
|
||||||
|
if (this.latestTimestamp) {
|
||||||
|
console.log(`Working in "onlyNew" mode. Hence we are omitting documents prior to ${this.latestTimestamp}`);
|
||||||
|
} else {
|
||||||
|
console.log(`Working in "onlyNew" mode, but no documents found in index ${this.index}. Hence processing all documents now.`);
|
||||||
|
}
|
||||||
|
} else if (this.onlyNew && !indexExists) {
|
||||||
|
console.log(`Working in "onlyNew" mode, but index ${this.index} does not exist. Hence processing all documents now.`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async pipeDocument(optionsArg: { docId: string; timestamp?: string | number; doc: any }) {
|
||||||
|
await this.ensureIndexExists(optionsArg.doc);
|
||||||
|
|
||||||
|
const documentBody = {
|
||||||
|
...optionsArg.doc,
|
||||||
|
...(optionsArg.timestamp && { '@timestamp': optionsArg.timestamp }),
|
||||||
|
};
|
||||||
|
|
||||||
|
// If 'onlyNew' is true, compare the document timestamp with the latest timestamp
|
||||||
|
if (this.onlyNew) {
|
||||||
|
if (this.latestTimestamp && optionsArg.timestamp <= this.latestTimestamp) {
|
||||||
|
this.fastForward = true;
|
||||||
|
} else {
|
||||||
|
this.fastForward = false;
|
||||||
|
await this.client.index({
|
||||||
|
index: this.index,
|
||||||
|
id: optionsArg.docId,
|
||||||
|
body: documentBody,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.fastForward = false;
|
||||||
|
await this.client.index({
|
||||||
|
index: this.index,
|
||||||
|
id: optionsArg.docId,
|
||||||
|
body: documentBody,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.sessionDocs.add(optionsArg.docId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async endPipingSession() {
|
||||||
|
const allDocIds: string[] = [];
|
||||||
|
const responseQueue = [];
|
||||||
|
let response = await this.client.search({
|
||||||
|
index: this.index,
|
||||||
|
scroll: '1m',
|
||||||
|
size: this.BATCH_SIZE,
|
||||||
|
});
|
||||||
|
while (true) {
|
||||||
|
response.hits.hits.forEach((hit: any) => allDocIds.push(hit._id));
|
||||||
|
if (!response.hits.hits.length) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
response = await this.client.scroll({ scroll_id: response._scroll_id, scroll: '1m' });
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const docId of allDocIds) {
|
||||||
|
if (!this.sessionDocs.has(docId)) {
|
||||||
|
responseQueue.push({
|
||||||
|
delete: {
|
||||||
|
_index: this.index,
|
||||||
|
_id: docId,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (responseQueue.length >= this.BATCH_SIZE) {
|
||||||
|
await this.client.bulk({ refresh: true, body: responseQueue });
|
||||||
|
responseQueue.length = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (responseQueue.length > 0) {
|
||||||
|
await this.client.bulk({ refresh: true, body: responseQueue });
|
||||||
|
}
|
||||||
|
|
||||||
|
this.sessionDocs.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
async takeSnapshot(processIterator: SnapshotProcessor) {
|
||||||
|
const snapshotIndex = `${this.index}_snapshots`;
|
||||||
|
|
||||||
|
const indexExists = await this.client.indices.exists({ index: snapshotIndex });
|
||||||
|
if (!indexExists) {
|
||||||
|
await this.client.indices.create({
|
||||||
|
index: snapshotIndex,
|
||||||
|
body: {
|
||||||
|
mappings: {
|
||||||
|
properties: {
|
||||||
|
date: {
|
||||||
|
type: 'date',
|
||||||
|
},
|
||||||
|
aggregationData: {
|
||||||
|
type: 'object',
|
||||||
|
enabled: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const documentIterator = this.getDocumentIterator();
|
||||||
|
|
||||||
|
const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot());
|
||||||
|
|
||||||
|
await this.storeSnapshot(newSnapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getLastSnapshot(): Promise<ISnapshot | null> {
|
||||||
|
const snapshotIndex = `${this.index}_snapshots`;
|
||||||
|
const indexExists = await this.client.indices.exists({ index: snapshotIndex });
|
||||||
|
|
||||||
|
if (!indexExists) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = await this.client.search({
|
||||||
|
index: snapshotIndex,
|
||||||
|
sort: 'date:desc',
|
||||||
|
size: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (response.hits.hits.length > 0) {
|
||||||
|
const hit = response.hits.hits[0];
|
||||||
|
return {
|
||||||
|
date: hit._source['date'],
|
||||||
|
aggregationData: hit._source['aggregationData'],
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async *getDocumentIterator() {
|
||||||
|
let response = await this.client.search({
|
||||||
|
index: this.index,
|
||||||
|
scroll: '1m',
|
||||||
|
size: this.BATCH_SIZE,
|
||||||
|
});
|
||||||
|
while (true) {
|
||||||
|
for (const hit of response.hits.hits) {
|
||||||
|
yield hit._source;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!response.hits.hits.length) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
response = await this.client.scroll({ scroll_id: response._scroll_id, scroll: '1m' });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async storeSnapshot(snapshot: ISnapshot) {
|
||||||
|
await this.client.index({
|
||||||
|
index: `${this.index}_snapshots`,
|
||||||
|
body: snapshot,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
103
ts/els.classes.elasticindex.ts
Normal file
103
ts/els.classes.elasticindex.ts
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
import * as plugins from './els.plugins.js';
|
||||||
|
import { ElsSmartlogDestination } from './els.classes.smartlogdestination.js';
|
||||||
|
import { type ILogPackage } from '@push.rocks/smartlog-interfaces';
|
||||||
|
import { Stringmap } from '@push.rocks/lik';
|
||||||
|
|
||||||
|
export class ElasticIndex {
|
||||||
|
private stringmap = new Stringmap();
|
||||||
|
private elasticSearchRef: ElsSmartlogDestination<any>;
|
||||||
|
|
||||||
|
constructor(elasticSearchInstanceArg: ElsSmartlogDestination<ILogPackage>) {
|
||||||
|
this.elasticSearchRef = elasticSearchInstanceArg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ensureIndex(prefixArg: string, indexNameArg: string) {
|
||||||
|
if (this.stringmap.checkString(indexNameArg)) {
|
||||||
|
return indexNameArg;
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseArg = await this.elasticSearchRef.client.cat.indices({
|
||||||
|
format: 'json',
|
||||||
|
bytes: 'mb',
|
||||||
|
}).catch(err => {
|
||||||
|
console.log(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!responseArg) {
|
||||||
|
throw new Error('Could not get valid response from elastic search');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(responseArg)) {
|
||||||
|
const filteredIndices = responseArg.filter((indexObjectArg) => {
|
||||||
|
return indexObjectArg.index.startsWith(prefixArg);
|
||||||
|
});
|
||||||
|
const filteredIndexNames = filteredIndices.map((indexObjectArg) => {
|
||||||
|
return indexObjectArg.index;
|
||||||
|
});
|
||||||
|
await this.deleteOldIndices(prefixArg, filteredIndexNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
let index = null;
|
||||||
|
|
||||||
|
if (Array.isArray(responseArg)) {
|
||||||
|
index = responseArg.find((indexItemArg) => {
|
||||||
|
return indexItemArg.index === indexNameArg;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!index) {
|
||||||
|
await this.createNewIndex(indexNameArg);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.stringmap.addString(indexNameArg);
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async createNewIndex(indexNameArg: string) {
|
||||||
|
const response = await this.elasticSearchRef.client.indices.create({
|
||||||
|
wait_for_active_shards: 1,
|
||||||
|
index: indexNameArg,
|
||||||
|
body: {
|
||||||
|
mappings: {
|
||||||
|
properties: {
|
||||||
|
'@timestamp': {
|
||||||
|
type: 'date',
|
||||||
|
},
|
||||||
|
logPackageArg: {
|
||||||
|
properties: {
|
||||||
|
payload: {
|
||||||
|
type: 'object',
|
||||||
|
dynamic: true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async deleteOldIndices(prefixArg: string, indicesArray: string[]) {
|
||||||
|
const todayAsUnix: number = Date.now();
|
||||||
|
const rententionPeriodAsUnix: number = plugins.smarttime.units.days(
|
||||||
|
this.elasticSearchRef.indexRetention
|
||||||
|
);
|
||||||
|
for (const indexName of indicesArray) {
|
||||||
|
if (!indexName.startsWith(prefixArg)) continue;
|
||||||
|
const indexRegex = new RegExp(`^${prefixArg}-([0-9]*)-([0-9]*)-([0-9]*)$`)
|
||||||
|
const regexResult = indexRegex.exec(indexName);
|
||||||
|
const dateAsUnix: number = new Date(
|
||||||
|
`${regexResult[1]}-${regexResult[2]}-${regexResult[3]}`
|
||||||
|
).getTime();
|
||||||
|
if (todayAsUnix - rententionPeriodAsUnix > dateAsUnix) {
|
||||||
|
console.log(`found old index ${indexName}`);
|
||||||
|
const response = await this.elasticSearchRef.client.indices.delete(
|
||||||
|
{
|
||||||
|
index: indexName,
|
||||||
|
}).catch(err => {
|
||||||
|
console.log(err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
63
ts/els.classes.elasticscheduler.ts
Normal file
63
ts/els.classes.elasticscheduler.ts
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
import { ElsSmartlogDestination, type IStandardLogParams } from './els.classes.smartlogdestination.js';
|
||||||
|
|
||||||
|
export class ElasticScheduler {
|
||||||
|
elasticSearchRef: ElsSmartlogDestination<any>;
|
||||||
|
docsScheduled = false;
|
||||||
|
docsStorage: any[] = [];
|
||||||
|
|
||||||
|
// maximum size of the buffer
|
||||||
|
maxBufferSize = 500;
|
||||||
|
|
||||||
|
constructor(elasticLogRefArg: ElsSmartlogDestination<any>) {
|
||||||
|
this.elasticSearchRef = elasticLogRefArg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public addFailedDoc(objectArg: any | IStandardLogParams) {
|
||||||
|
this.addToStorage(objectArg);
|
||||||
|
this.setRetry();
|
||||||
|
}
|
||||||
|
|
||||||
|
public scheduleDoc(logObject: any) {
|
||||||
|
this.addToStorage(logObject);
|
||||||
|
}
|
||||||
|
|
||||||
|
private addToStorage(logObject: any) {
|
||||||
|
this.docsStorage.push(logObject);
|
||||||
|
|
||||||
|
// if buffer is full, send logs immediately
|
||||||
|
if (this.docsStorage.length >= this.maxBufferSize) {
|
||||||
|
this.flushLogsToElasticSearch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private flushLogsToElasticSearch() {
|
||||||
|
const oldStorage = this.docsStorage;
|
||||||
|
this.docsStorage = [];
|
||||||
|
|
||||||
|
for (let logObject of oldStorage) {
|
||||||
|
this.elasticSearchRef.log(logObject, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public setRetry() {
|
||||||
|
setTimeout(() => {
|
||||||
|
this.flushLogsToElasticSearch();
|
||||||
|
|
||||||
|
if (this.docsStorage.length === 0) {
|
||||||
|
console.log('ElasticLog retry success!!!');
|
||||||
|
this.docsScheduled = false;
|
||||||
|
} else {
|
||||||
|
console.log('ElasticLog retry failed');
|
||||||
|
this.setRetry();
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public deferSend() {
|
||||||
|
if (!this.docsScheduled) {
|
||||||
|
console.log('Retry ElasticLog in 5 seconds!');
|
||||||
|
this.docsScheduled = true;
|
||||||
|
this.setRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
68
ts/els.classes.fastpush.ts
Normal file
68
ts/els.classes.fastpush.ts
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import { Client as ElasticClient } from '@elastic/elasticsearch';
|
||||||
|
|
||||||
|
interface FastPushOptions {
|
||||||
|
deleteOldData?: boolean; // Clear the index
|
||||||
|
deleteIndex?: boolean; // Delete the entire index
|
||||||
|
}
|
||||||
|
|
||||||
|
export class FastPush {
|
||||||
|
private client: ElasticClient;
|
||||||
|
|
||||||
|
constructor(node: string, auth?: { username: string; password: string }) {
|
||||||
|
this.client = new ElasticClient({
|
||||||
|
node: node,
|
||||||
|
...(auth && { auth: auth }),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async pushToIndex(indexName: string, docArray: any[], options?: FastPushOptions) {
|
||||||
|
if (docArray.length === 0) return;
|
||||||
|
|
||||||
|
const indexExists = await this.client.indices.exists({ index: indexName });
|
||||||
|
|
||||||
|
if (indexExists) {
|
||||||
|
if (options?.deleteIndex) {
|
||||||
|
await this.client.indices.delete({ index: indexName });
|
||||||
|
} else if (options?.deleteOldData) {
|
||||||
|
await this.client.deleteByQuery({
|
||||||
|
index: indexName,
|
||||||
|
body: {
|
||||||
|
query: {
|
||||||
|
match_all: {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!indexExists || options?.deleteIndex) {
|
||||||
|
// Create index with mappings (for simplicity, we use dynamic mapping)
|
||||||
|
await this.client.indices.create({
|
||||||
|
index: indexName,
|
||||||
|
body: {
|
||||||
|
mappings: {
|
||||||
|
dynamic: "true"
|
||||||
|
// ... other specific mappings
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bulk insert documents
|
||||||
|
const bulkBody = [];
|
||||||
|
for (const doc of docArray) {
|
||||||
|
bulkBody.push({
|
||||||
|
index: {
|
||||||
|
_index: indexName,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
bulkBody.push(doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.client.bulk({ body: bulkBody });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Usage example:
|
||||||
|
// const fastPush = new FastPush('http://localhost:9200', { username: 'elastic', password: 'password' });
|
||||||
|
// fastPush.pushToIndex('my_index', [{ name: 'John', age: 30 }, { name: 'Jane', age: 25 }], { deleteOldData: true });
|
111
ts/els.classes.kvstore.ts
Normal file
111
ts/els.classes.kvstore.ts
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
import * as plugins from './els.plugins.js';
|
||||||
|
import { Client as ElasticClient } from '@elastic/elasticsearch';
|
||||||
|
|
||||||
|
export interface IElasticKVStoreConstructorOptions {
|
||||||
|
index: string;
|
||||||
|
node: string;
|
||||||
|
auth?: {
|
||||||
|
username: string;
|
||||||
|
password: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ElasticKVStore {
|
||||||
|
public client: ElasticClient;
|
||||||
|
public index: string;
|
||||||
|
private readyDeferred: any;
|
||||||
|
|
||||||
|
constructor(options: IElasticKVStoreConstructorOptions) {
|
||||||
|
this.client = new ElasticClient({
|
||||||
|
node: options.node,
|
||||||
|
...(options.auth && { auth: options.auth }),
|
||||||
|
});
|
||||||
|
this.index = options.index;
|
||||||
|
this.readyDeferred = plugins.smartpromise.defer();
|
||||||
|
this.setupIndex();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async setupIndex() {
|
||||||
|
try {
|
||||||
|
const indexExists = await this.client.indices.exists({ index: this.index });
|
||||||
|
|
||||||
|
if (!indexExists) {
|
||||||
|
await this.client.indices.create({
|
||||||
|
index: this.index,
|
||||||
|
body: {
|
||||||
|
mappings: {
|
||||||
|
properties: {
|
||||||
|
key: {
|
||||||
|
type: 'keyword'
|
||||||
|
},
|
||||||
|
value: {
|
||||||
|
type: 'text'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.readyDeferred.resolve();
|
||||||
|
} catch (err) {
|
||||||
|
this.readyDeferred.reject(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async set(key: string, value: string) {
|
||||||
|
await this.readyDeferred.promise;
|
||||||
|
await this.client.index({
|
||||||
|
index: this.index,
|
||||||
|
id: key,
|
||||||
|
body: {
|
||||||
|
key,
|
||||||
|
value
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async get(key: string): Promise<string | null> {
|
||||||
|
await this.readyDeferred.promise;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await this.client.get({
|
||||||
|
index: this.index,
|
||||||
|
id: key
|
||||||
|
});
|
||||||
|
return response._source['value'];
|
||||||
|
} catch (error) {
|
||||||
|
if (error.meta && error.meta.statusCode === 404) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete(key: string) {
|
||||||
|
await this.readyDeferred.promise;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.client.delete({
|
||||||
|
index: this.index,
|
||||||
|
id: key
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
if (error.meta && error.meta.statusCode !== 404) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async clear() {
|
||||||
|
await this.readyDeferred.promise;
|
||||||
|
|
||||||
|
await this.client.deleteByQuery({
|
||||||
|
index: this.index,
|
||||||
|
body: {
|
||||||
|
query: {
|
||||||
|
match_all: {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
75
ts/els.classes.smartlogdestination.ts
Normal file
75
ts/els.classes.smartlogdestination.ts
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
import { Client as ElasticClient } from '@elastic/elasticsearch';
|
||||||
|
import type { ILogContext, ILogPackage, ILogDestination } from '@push.rocks/smartlog-interfaces';
|
||||||
|
import { ElasticScheduler } from './els.classes.elasticscheduler.js';
|
||||||
|
import { ElasticIndex } from './els.classes.elasticindex.js';
|
||||||
|
|
||||||
|
export interface IStandardLogParams {
|
||||||
|
message: string;
|
||||||
|
severity: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IElasticSearchConstructorOptions {
|
||||||
|
indexPrefix: string;
|
||||||
|
indexRetention: number;
|
||||||
|
node: string;
|
||||||
|
auth?: {
|
||||||
|
username: string;
|
||||||
|
password: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ElsSmartlogDestination<T> {
|
||||||
|
public client: ElasticClient;
|
||||||
|
public elasticScheduler = new ElasticScheduler(this);
|
||||||
|
public elasticIndex: ElasticIndex = new ElasticIndex(this);
|
||||||
|
|
||||||
|
public indexPrefix: string;
|
||||||
|
public indexRetention: number;
|
||||||
|
|
||||||
|
constructor(optionsArg: IElasticSearchConstructorOptions) {
|
||||||
|
this.client = new ElasticClient({
|
||||||
|
node: optionsArg.node,
|
||||||
|
...(optionsArg.auth && { auth: optionsArg.auth }),
|
||||||
|
});
|
||||||
|
this.indexPrefix = `${optionsArg.indexPrefix}`;
|
||||||
|
this.indexRetention = optionsArg.indexRetention;
|
||||||
|
this.setupDataStream();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async setupDataStream() {
|
||||||
|
// Define an index template
|
||||||
|
await this.client.indices.putIndexTemplate({
|
||||||
|
name: `${this.indexPrefix}_template`,
|
||||||
|
index_patterns: [`${this.indexPrefix}-*`],
|
||||||
|
data_stream: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) {
|
||||||
|
const now = new Date();
|
||||||
|
const indexToUse = `${this.indexPrefix}-data-stream`; // Use data stream name
|
||||||
|
|
||||||
|
if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) {
|
||||||
|
this.elasticScheduler.scheduleDoc(logPackageArg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.client.index(
|
||||||
|
{
|
||||||
|
index: indexToUse,
|
||||||
|
body: {
|
||||||
|
'@timestamp': new Date(logPackageArg.timestamp).toISOString(),
|
||||||
|
...logPackageArg,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
get logDestination(): ILogDestination {
|
||||||
|
return {
|
||||||
|
handleLog: async (smartlogPackageArg: ILogPackage) => {
|
||||||
|
await this.log(smartlogPackageArg);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
8
ts/els.plugins.ts
Normal file
8
ts/els.plugins.ts
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
import * as elasticsearch from '@elastic/elasticsearch';
|
||||||
|
import * as lik from '@push.rocks/lik';
|
||||||
|
import * as smartdelay from '@push.rocks/smartdelay';
|
||||||
|
import * as smartlogInterfaces from '@push.rocks/smartlog-interfaces';
|
||||||
|
import * as smartpromise from '@push.rocks/smartpromise';
|
||||||
|
import * as smarttime from '@push.rocks/smarttime';
|
||||||
|
|
||||||
|
export { elasticsearch, lik, smartdelay, smartlogInterfaces, smartpromise, smarttime };
|
@ -1 +1,4 @@
|
|||||||
export * from './elasticsearch.classes.elasticsearch.js';
|
export * from './els.classes.smartlogdestination.js';
|
||||||
|
export * from './els.classes.fastpush.js';
|
||||||
|
export * from './els.classes.elasticdoc.js';
|
||||||
|
export * from './els.classes.kvstore.js';
|
||||||
|
Loading…
x
Reference in New Issue
Block a user