Compare commits

...

84 Commits

Author SHA1 Message Date
36d7cb69a3 2.0.16 2023-08-30 12:49:39 +02:00
4924e0a151 fix(core): update 2023-08-30 12:49:39 +02:00
cd98529541 2.0.15 2023-08-30 12:45:39 +02:00
e6a9282987 fix(core): update 2023-08-30 12:45:39 +02:00
9dcadcd611 2.0.14 2023-08-29 12:11:16 +02:00
4b045ff988 fix(core): update 2023-08-29 12:11:15 +02:00
023dd1b519 2.0.13 2023-08-29 12:10:39 +02:00
4971385eae fix(core): update 2023-08-29 12:10:38 +02:00
e209839962 2.0.12 2023-08-29 11:48:33 +02:00
e44365b674 fix(core): update 2023-08-29 11:48:32 +02:00
bd154089c3 2.0.11 2023-08-29 11:44:49 +02:00
0be693da60 fix(core): update 2023-08-29 11:44:48 +02:00
040c93dec3 2.0.10 2023-08-29 11:43:07 +02:00
21e55bd341 fix(core): update 2023-08-29 11:43:07 +02:00
f1d04fe63c 2.0.9 2023-08-29 11:33:07 +02:00
49c4660131 fix(core): update 2023-08-29 11:33:06 +02:00
e5fd0361fc 2.0.8 2023-08-29 11:15:23 +02:00
d6a291d8d4 fix(core): update 2023-08-29 11:15:22 +02:00
fc87fd7ab7 2.0.7 2023-08-29 11:11:25 +02:00
203444d1a6 fix(core): update 2023-08-29 11:11:25 +02:00
cdbf1fd316 2.0.6 2023-08-29 10:03:38 +02:00
10108d8338 fix(core): update 2023-08-29 10:03:37 +02:00
36abb2c7c0 2.0.5 2023-08-29 10:00:43 +02:00
8f00d90bb1 fix(core): update 2023-08-29 10:00:42 +02:00
41f1758d46 2.0.4 2023-08-29 09:56:59 +02:00
2b7cd33996 fix(core): update 2023-08-29 09:56:58 +02:00
80a04ca893 2.0.3 2023-08-29 09:31:43 +02:00
93f739c79e fix(core): update 2023-08-29 09:31:42 +02:00
a1b5bf5c0c 2.0.2 2023-08-29 09:18:16 +02:00
084c5d137c fix(core): update 2023-08-29 09:18:15 +02:00
4aa0592bd5 2.0.1 2023-08-25 16:07:53 +02:00
0313e5045a fix(core): update 2023-08-25 16:07:52 +02:00
7442d93f58 2.0.0 2023-08-25 16:07:17 +02:00
35e70aae62 BREAKING CHANGE(core): update 2023-08-25 16:07:16 +02:00
cc30f43a28 1.0.56 2023-08-25 16:06:53 +02:00
100a8fc12e fix(core): update 2023-08-25 16:06:52 +02:00
f32403961d 1.0.55 2023-08-18 07:58:10 +02:00
9734949241 fix(core): update 2023-08-18 07:58:09 +02:00
b70444824b 1.0.54 2023-08-17 20:53:28 +02:00
0eb0903667 fix(core): update 2023-08-17 20:53:28 +02:00
4d11dca22c 1.0.53 2023-08-17 19:24:35 +02:00
3079adbbd9 fix(core): update 2023-08-17 19:24:34 +02:00
bc9de8e4d6 1.0.52 2023-08-17 19:21:27 +02:00
3fa7d66236 fix(core): update 2023-08-17 19:21:26 +02:00
2a0b0b2478 1.0.51 2023-08-14 13:09:21 +02:00
35e99663a4 fix(core): update 2023-08-14 13:09:20 +02:00
2cc5855206 1.0.50 2023-08-14 10:58:29 +02:00
8f9f2fdf05 fix(core): update 2023-08-14 10:58:28 +02:00
7ef36b5c40 1.0.49 2023-08-02 03:17:30 +02:00
67a8f3fe4d fix(core): update 2023-08-02 03:17:30 +02:00
5ae2c37519 1.0.48 2023-08-02 03:11:18 +02:00
fcb67ec878 fix(core): update 2023-08-02 03:11:17 +02:00
9e25494f8f 1.0.47 2023-08-01 15:05:46 +02:00
dd8ba4736a fix(core): update 2023-08-01 15:05:45 +02:00
d395310410 1.0.46 2023-08-01 12:38:54 +02:00
49233ce45f fix(core): update 2023-08-01 12:38:53 +02:00
fb93dce8bc 1.0.45 2023-08-01 12:24:22 +02:00
30cbc05aa2 fix(core): update 2023-08-01 12:24:22 +02:00
2a595a1a9a 1.0.44 2023-07-05 23:45:48 +02:00
d62b18e93c fix(core): update 2023-07-05 23:45:48 +02:00
d6176f820a 1.0.43 2023-07-05 17:27:24 +02:00
0ebc1d5288 fix(core): update 2023-07-05 17:27:24 +02:00
2b0003546a 1.0.42 2023-07-05 15:31:01 +02:00
60617f2fca fix(core): update 2023-07-05 15:31:00 +02:00
9c767d07e4 1.0.41 2023-07-05 10:26:00 +02:00
f3aa94dcb7 fix(core): update 2023-07-05 10:25:59 +02:00
a0be0edd9d 1.0.40 2023-07-05 10:22:53 +02:00
ad24ba2f5d fix(core): update 2023-07-05 10:22:53 +02:00
b0cf4bb27f 1.0.39 2023-07-05 09:38:43 +02:00
fd29ceab80 fix(core): update 2023-07-05 09:38:43 +02:00
bcca434a24 1.0.38 2023-07-04 13:57:55 +02:00
d4a9ad8f67 fix(core): update 2023-07-04 13:57:55 +02:00
d4c7c33668 1.0.37 2023-07-04 10:50:19 +02:00
8340257b00 fix(core): update 2023-07-04 10:50:18 +02:00
32265e83f3 1.0.36 2023-07-04 10:13:15 +02:00
e2df11cea2 fix(core): update 2023-07-04 10:13:15 +02:00
2719ba28f6 1.0.35 2023-07-04 09:59:58 +02:00
6d78a7ba0c fix(core): update 2023-07-04 09:59:58 +02:00
5897c6e7de 1.0.34 2023-07-04 09:46:04 +02:00
20369614a2 fix(core): update 2023-07-04 09:46:04 +02:00
7ceaf694fe 1.0.33 2023-07-04 09:33:19 +02:00
391c6bd45d fix(core): update 2023-07-04 09:33:18 +02:00
1a702071c6 1.0.32 2023-07-04 09:13:45 +02:00
0fe2f6a4ae fix(core): update 2023-07-04 09:13:44 +02:00
21 changed files with 2576 additions and 850 deletions

View File

@ -12,29 +12,38 @@ stages:
- release - release
- metadata - metadata
before_script:
- pnpm install -g pnpm
- pnpm install -g @shipzone/npmci
- npmci npm prepare
# ==================== # ====================
# security stage # security stage
# ==================== # ====================
mirror: # ====================
# security stage
# ====================
auditProductionDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security stage: security
script: script:
- npmci git mirror - npmci command npm config set registry https://registry.npmjs.org
- npmci command pnpm audit --audit-level=high --prod
tags: tags:
- lossless - lossless
- docker - docker
- notpriv allow_failure: true
snyk: auditDevDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security stage: security
script: script:
- npmci npm prepare - npmci command npm config set registry https://registry.npmjs.org
- npmci command npm install -g snyk - npmci command pnpm audit --audit-level=high --dev
- npmci command npm install --ignore-scripts
- npmci command snyk test
tags: tags:
- lossless - lossless
- docker - docker
- notpriv allow_failure: true
# ==================== # ====================
# test stage # test stage
@ -43,33 +52,24 @@ snyk:
testStable: testStable:
stage: test stage: test
script: script:
- npmci npm prepare
- npmci node install stable
- npmci npm install - npmci npm install
- npmci npm test - npmci npm test
coverage: /\d+.?\d+?\%\s*coverage/ coverage: /\d+.?\d+?\%\s*coverage/
tags: tags:
- lossless
- docker - docker
- priv
testBuild: testBuild:
stage: test stage: test
script: script:
- npmci npm prepare
- npmci node install stable
- npmci npm install - npmci npm install
- npmci command npm run build - npmci npm build
coverage: /\d+.?\d+?\%\s*coverage/ coverage: /\d+.?\d+?\%\s*coverage/
tags: tags:
- lossless
- docker - docker
- notpriv
release: release:
stage: release stage: release
script: script:
- npmci node install stable
- npmci npm publish - npmci npm publish
only: only:
- tags - tags
@ -84,11 +84,12 @@ release:
codequality: codequality:
stage: metadata stage: metadata
allow_failure: true allow_failure: true
only:
- tags
script: script:
- npmci command npm install -g tslint typescript - npmci command npm install -g typescript
- npmci npm prepare - npmci npm prepare
- npmci npm install - npmci npm install
- npmci command "tslint -c tslint.json ./ts/**/*.ts"
tags: tags:
- lossless - lossless
- docker - docker
@ -108,11 +109,9 @@ trigger:
pages: pages:
stage: metadata stage: metadata
script: script:
- npmci node install lts - npmci node install stable
- npmci command npm install -g @gitzone/tsdoc
- npmci npm prepare
- npmci npm install - npmci npm install
- npmci command tsdoc - npmci command npm run buildDocs
tags: tags:
- lossless - lossless
- docker - docker

View File

@ -1,6 +1,6 @@
{ {
"name": "@mojoio/elasticsearch", "name": "@apiclient.xyz/elasticsearch",
"version": "1.0.31", "version": "2.0.16",
"private": false, "private": false,
"description": "log to elasticsearch in a kibana compatible format", "description": "log to elasticsearch in a kibana compatible format",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -10,25 +10,24 @@
"scripts": { "scripts": {
"test": "(tstest test/)", "test": "(tstest test/)",
"format": "(gitzone format)", "format": "(gitzone format)",
"build": "(tsbuild)", "build": "(tsbuild --allowimplicitany)",
"buildDocs": "tsdoc" "buildDocs": "tsdoc"
}, },
"devDependencies": { "devDependencies": {
"@gitzone/tsbuild": "^2.1.66", "@git.zone/tsbuild": "^2.1.70",
"@gitzone/tsrun": "^1.2.42", "@git.zone/tsrun": "^1.2.46",
"@gitzone/tstest": "^1.0.74", "@git.zone/tstest": "^1.0.80",
"@pushrocks/qenv": "^5.0.2", "@push.rocks/qenv": "^6.0.2",
"@pushrocks/tapbundle": "^5.0.8", "@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.3.3" "@types/node": "^20.5.7"
}, },
"dependencies": { "dependencies": {
"@pushrocks/lik": "^6.0.2", "@elastic/elasticsearch": "8.9.0",
"@pushrocks/smartdelay": "^3.0.1", "@push.rocks/lik": "^6.0.5",
"@pushrocks/smartlog-interfaces": "^3.0.0", "@push.rocks/smartdelay": "^3.0.5",
"@pushrocks/smartpromise": "^4.0.2", "@push.rocks/smartlog-interfaces": "^3.0.0",
"@pushrocks/smarttime": "^4.0.1", "@push.rocks/smartpromise": "^4.0.2",
"@types/elasticsearch": "^5.0.40", "@push.rocks/smarttime": "^4.0.5"
"elasticsearch": "^16.7.3"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

2267
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,2 @@
required: required:
- ELK_DOMAIN
- ELK_PORT
- ELK_USER
- ELK_PASS

View File

@ -37,7 +37,6 @@ For further information read the linked docs at the top of this README.
We are always happy for code contributions. If you are not the code contributing type that is ok. + Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: [Contribute monthly :)](https://lossless.link/contribute) We are always happy for code contributions. If you are not the code contributing type that is ok. + Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: [Contribute monthly :)](https://lossless.link/contribute)
## Contribution ## Contribution
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :) We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)

0
test/00tapwrap.ts Normal file
View File

89
test/test.nonci.ts Normal file
View File

@ -0,0 +1,89 @@
import { expect, tap } from '@push.rocks/tapbundle';
import { Qenv } from '@push.rocks/qenv';
import * as elasticsearch from '../ts/index.js';
let testElasticLog: elasticsearch.ElsSmartlogDestination<any>;
let testElasticDoc: elasticsearch.ElasticDoc;
tap.test('first test', async () => {
testElasticLog = new elasticsearch.ElsSmartlogDestination({
indexPrefix: 'testprefix',
indexRetention: 7,
node: 'http://localhost:9200',
auth: {
username: 'elastic',
password: 'YourPassword'
}
});
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElsSmartlogDestination);
});
tap.test('should send a message to Elasticsearch', async () => {
await testElasticLog.log({
timestamp: Date.now(),
type: 'increment',
level: 'info',
context: {
company: 'Lossless GmbH',
companyunit: 'lossless.cloud',
containerName: 'testcontainer',
environment: 'test',
runtime: 'node',
zone: 'ship.zone',
},
message: 'GET https://myroute.to.a.cool.destination/sorare?hello=there',
correlation: null,
});
});
tap.test('should create an ElasticDoc instance', async () => {
testElasticDoc = new elasticsearch.ElasticDoc({
index: 'testindex',
node: 'http://localhost:9200',
auth: {
username: 'elastic',
password: 'YourPassword'
}
});
expect(testElasticDoc).toBeInstanceOf(elasticsearch.ElasticDoc);
});
tap.test('should add and update documents in a piping session', async () => {
await testElasticDoc.startPipingSession({});
await testElasticDoc.pipeDocument({
docId: '1',
timestamp: new Date().toISOString(),
doc: { name: 'doc1' }
});
await testElasticDoc.pipeDocument({
docId: '2',
timestamp: new Date().toISOString(),
doc: { name: 'doc2' }
});
await testElasticDoc.pipeDocument({
docId: '1',
timestamp: new Date().toISOString(),
doc: { name: 'updated doc1' }
});
});
tap.test('should delete documents not part of the piping session', async () => {
await testElasticDoc.endPipingSession();
});
tap.test('should take and store snapshot', async () => {
await testElasticDoc.takeSnapshot(async (iterator, prevSnapshot) => {
const aggregationData = [];
for await (const doc of iterator) {
// Sample aggregation: counting documents
aggregationData.push(doc);
}
const snapshot = {
date: new Date().toISOString(),
aggregationData,
};
return snapshot;
});
});
tap.start();

View File

@ -1,38 +0,0 @@
import { expect, tap } from '@pushrocks/tapbundle';
import { Qenv } from '@pushrocks/qenv';
import * as elasticsearch from '../ts/index.js';
const testQenv = new Qenv('./', './.nogit/');
let testElasticLog: elasticsearch.ElasticSearch<any>;
tap.test('first test', async () => {
testElasticLog = new elasticsearch.ElasticSearch({
indexPrefix: 'smartlog',
indexRetention: 7,
domain: testQenv.getEnvVarOnDemand('ELK_DOMAIN'),
port: parseInt(testQenv.getEnvVarOnDemand('ELK_PORT'), 10),
ssl: true,
});
expect(testElasticLog).toBeInstanceOf(elasticsearch.ElasticSearch);
});
tap.test('should send a message to Elasticsearch', async () => {
testElasticLog.log({
timestamp: Date.now(),
type: 'increment',
level: 'info',
context: {
company: 'Lossless GmbH',
companyunit: 'lossless.cloud',
containerName: 'testcontainer',
environment: 'test',
runtime: 'node',
zone: 'ship.zone',
},
message: 'GET https://myroute.to.a.cool.destination/sorare?hello=there',
correlation: null,
});
});
tap.start();

View File

@ -2,7 +2,7 @@
* autocreated commitinfo by @pushrocks/commitinfo * autocreated commitinfo by @pushrocks/commitinfo
*/ */
export const commitinfo = { export const commitinfo = {
name: '@mojoio/elasticsearch', name: '@apiclient.xyz/elasticsearch',
version: '1.0.31', version: '2.0.16',
description: 'log to elasticsearch in a kibana compatible format' description: 'log to elasticsearch in a kibana compatible format'
} }

View File

@ -1,103 +0,0 @@
import * as plugins from './elasticsearch.plugins';
import { ElasticSearch } from './elasticsearch.classes.elasticsearch';
import { ILogPackage } from '@pushrocks/smartlog-interfaces';
import { Stringmap } from '@pushrocks/lik';
export class ElasticIndex {
private stringmap = new Stringmap();
private elasticSearchRef: ElasticSearch<any>;
constructor(elasticSearchInstanceArg: ElasticSearch<ILogPackage>) {
this.elasticSearchRef = elasticSearchInstanceArg;
}
public async ensureIndex(indexArg: string) {
const done = plugins.smartpromise.defer();
if (this.stringmap.checkString(indexArg)) {
done.resolve();
return;
}
this.elasticSearchRef.client.cat.indices(
{
format: 'json',
bytes: 'm',
},
async (err, responseArg: any[]) => {
if (err) {
console.log(err);
return;
}
// lets delete indexes that violate the retention
if (Array.isArray(responseArg)) {
const filteredIndices = responseArg.filter((indexObjectArg) => {
return indexObjectArg.index.startsWith('smartlog');
});
const filteredIndexNames = filteredIndices.map((indexObjectArg) => {
return indexObjectArg.index;
});
this.deleteOldIndices(filteredIndexNames);
}
let index = null;
if (Array.isArray(responseArg)) {
index = responseArg.find((indexObject) => {
return indexObject.index === indexArg;
});
}
if (!index) {
const done2 = plugins.smartpromise.defer();
this.elasticSearchRef.client.indices.create(
{
waitForActiveShards: '1',
index: indexArg,
},
(error, response) => {
// console.lof(response)
done2.resolve();
}
);
await done2.promise;
}
this.stringmap.addString(indexArg);
done.resolve();
}
);
await done.promise;
}
public createNewIndex(indexNameArg: string) {}
public async deleteOldIndices(indicesArray: string[]) {
const todayAsUnix: number = Date.now();
const rententionPeriodAsUnix: number = plugins.smarttime.units.days(
this.elasticSearchRef.indexRetention
);
for (const indexName of indicesArray) {
const regexResult = /^smartlog-([0-9]*)\.([0-9]*)\.([0-9]*)$/.exec(indexName);
const dateAsUnix: number = new Date(
`${regexResult[1]}-${regexResult[2]}-${regexResult[3]}`
).getTime();
if (todayAsUnix - rententionPeriodAsUnix > dateAsUnix) {
console.log(`found old index ${indexName}`);
const done2 = plugins.smartpromise.defer();
this.elasticSearchRef.client.indices.delete(
{
index: indexName,
},
(err2, response2) => {
if (err2) {
console.log(err2);
}
console.log(`deleted ${indexName}`);
done2.resolve();
}
);
await done2.promise;
}
}
}
}

View File

@ -1,44 +0,0 @@
import { ElasticSearch, type IStandardLogParams } from './elasticsearch.classes.elasticsearch.js';
export class ElasticScheduler {
elasticSearchRef: ElasticSearch<any>;
docsScheduled = false;
docsStorage: any[] = [];
constructor(elasticLogRefArg: ElasticSearch<any>) {
this.elasticSearchRef = elasticLogRefArg;
}
public addFailedDoc(objectArg: any | IStandardLogParams) {
this.docsStorage.push(objectArg);
this.setRetry();
}
public scheduleDoc(logObject: any) {
this.docsStorage.push(logObject);
}
public setRetry() {
setTimeout(() => {
const oldStorage = this.docsStorage;
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
if (this.docsStorage.length === 0) {
console.log('ElasticLog retry success!!!');
this.docsScheduled = false;
} else {
console.log('ElasticLog retry failed');
this.setRetry();
}
}, 5000);
}
public deferSend() {
if (!this.docsScheduled) {
console.log('Retry ElasticLog in 5 seconds!');
this.docsScheduled = true;
this.setRetry();
}
}
}

View File

@ -1,105 +0,0 @@
// interfaces
import { Client as ElasticClient } from 'elasticsearch';
import type { ILogContext, ILogPackage, ILogDestination } from '@pushrocks/smartlog-interfaces';
// other classes
import { ElasticScheduler } from './elasticsearch.classes.elasticscheduler.js';
import { ElasticIndex } from './elasticsearch.classes.elasticindex.js';
export interface IStandardLogParams {
message: string;
severity: string;
}
export interface IElasticSearchConstructorOptions {
indexPrefix: string;
indexRetention: number;
port: number;
domain: string;
ssl: boolean;
user?: string;
pass?: string;
}
export class ElasticSearch<T> {
public client: ElasticClient;
public elasticScheduler = new ElasticScheduler(this);
public elasticIndex: ElasticIndex = new ElasticIndex(this);
public indexPrefix: string;
public indexRetention: number;
/**
* sets up an instance of Elastic log
* @param optionsArg
*/
constructor(optionsArg: IElasticSearchConstructorOptions) {
this.client = new ElasticClient({
host: this.computeHostString(optionsArg),
// log: 'trace'
});
this.indexPrefix = optionsArg.indexPrefix;
this.indexRetention = optionsArg.indexRetention;
}
/**
* computes the host string from the constructor options
* @param optionsArg
*/
private computeHostString(optionsArg: IElasticSearchConstructorOptions): string {
let hostString = `${optionsArg.domain}:${optionsArg.port}`;
if (optionsArg.user && optionsArg.pass) {
hostString = `${optionsArg.user}:${optionsArg.pass}@${hostString}`;
}
if (optionsArg.ssl) {
hostString = `https://${hostString}`;
} else {
hostString = `http://${hostString}`;
}
console.log(hostString);
return hostString;
}
public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) {
const now = new Date();
const indexToUse = `${this.indexPrefix}-${now.getFullYear()}.${(
'0' +
(now.getMonth() + 1)
).slice(-2)}.${('0' + now.getDate()).slice(-2)}`;
if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) {
this.elasticScheduler.scheduleDoc(logPackageArg);
return;
}
await this.elasticIndex.ensureIndex(indexToUse);
this.client.index(
{
index: indexToUse,
type: 'log',
body: {
'@timestamp': new Date(logPackageArg.timestamp).toISOString(),
...logPackageArg,
},
},
(error, response) => {
if (error) {
console.log('ElasticLog encountered an error:');
console.log(error);
this.elasticScheduler.addFailedDoc(logPackageArg);
} else {
// console.log(`ElasticLog: ${logPackageArg.message}`);
}
}
);
}
get logDestination(): ILogDestination {
return {
handleLog: async (smartlogPackageArg: ILogPackage) => {
this.log(smartlogPackageArg);
},
};
}
}

View File

@ -1,8 +0,0 @@
import * as elasticsearch from 'elasticsearch';
import * as lik from '@pushrocks/lik';
import * as smartdelay from '@pushrocks/smartdelay';
import * as smartlogInterfaces from '@pushrocks/smartlog-interfaces';
import * as smartpromise from '@pushrocks/smartpromise';
import * as smarttime from '@pushrocks/smarttime';
export { elasticsearch, lik, smartdelay, smartlogInterfaces, smartpromise, smarttime };

View File

@ -0,0 +1,249 @@
import { Client as ElasticClient } from '@elastic/elasticsearch';
export interface IElasticDocConstructorOptions {
index: string;
node: string;
auth?: {
username: string;
password: string;
};
}
export interface ISnapshot {
date: string;
aggregationData: any;
}
export type SnapshotProcessor = (
iterator: AsyncIterable<any>,
prevSnapshot: ISnapshot | null
) => Promise<ISnapshot>;
export class ElasticDoc {
public client: ElasticClient;
public index: string;
private sessionDocs: Set<string> = new Set();
private indexInitialized: boolean = false;
private latestTimestamp: string | null = null; // Store the latest timestamp
private onlyNew: boolean = false; // Whether to only pipe new docs
public fastForward: boolean = false; // Whether to fast forward to the latest timestamp
private BATCH_SIZE = 1000;
constructor(options: IElasticDocConstructorOptions) {
this.client = new ElasticClient({
node: options.node,
...(options.auth && { auth: options.auth }),
});
this.index = options.index;
}
private async ensureIndexExists(doc: any) {
if (!this.indexInitialized) {
const indexExists = await this.client.indices.exists({ index: this.index });
if (!indexExists) {
const mappings = this.createMappingsFromDoc(doc);
await this.client.indices.create({
index: this.index,
body: {
// mappings,
settings: {
// You can define the settings according to your requirements here
},
},
});
}
this.indexInitialized = true;
}
}
private createMappingsFromDoc(doc: any): any {
const properties: any = {};
for (const key in doc) {
if (key === '@timestamp') {
properties[key] = { type: 'date' };
continue;
}
properties[key] = { type: typeof doc[key] === 'number' ? 'float' : 'text' };
}
return { properties };
}
async startPipingSession(options: { onlyNew?: boolean }) {
this.sessionDocs.clear();
this.onlyNew = options.onlyNew;
const indexExists = await this.client.indices.exists({ index: this.index });
if (this.onlyNew && indexExists) {
const response = await this.client.search({
index: this.index,
sort: '@timestamp:desc',
size: 1,
});
// If the search query succeeded, the index exists.
const hit = response.hits.hits[0];
this.latestTimestamp = hit?._source?.['@timestamp'] || null;
if (this.latestTimestamp) {
console.log(`Working in "onlyNew" mode. Hence we are omitting documents prior to ${this.latestTimestamp}`);
} else {
console.log(`Working in "onlyNew" mode, but no documents found in index ${this.index}. Hence processing all documents now.`);
}
} else if (this.onlyNew && !indexExists) {
console.log(`Working in "onlyNew" mode, but index ${this.index} does not exist. Hence processing all documents now.`);
}
}
async pipeDocument(optionsArg: { docId: string; timestamp?: string | number; doc: any }) {
await this.ensureIndexExists(optionsArg.doc);
const documentBody = {
...optionsArg.doc,
...(optionsArg.timestamp && { '@timestamp': optionsArg.timestamp }),
};
// If 'onlyNew' is true, compare the document timestamp with the latest timestamp
if (this.onlyNew) {
if (this.latestTimestamp && optionsArg.timestamp <= this.latestTimestamp) {
this.fastForward = true;
} else {
this.fastForward = false;
await this.client.index({
index: this.index,
id: optionsArg.docId,
body: documentBody,
});
}
} else {
this.fastForward = false;
await this.client.index({
index: this.index,
id: optionsArg.docId,
body: documentBody,
});
}
this.sessionDocs.add(optionsArg.docId);
}
async endPipingSession() {
const allDocIds: string[] = [];
const responseQueue = [];
let response = await this.client.search({
index: this.index,
scroll: '1m',
size: this.BATCH_SIZE,
});
while (true) {
response.hits.hits.forEach((hit: any) => allDocIds.push(hit._id));
if (!response.hits.hits.length) {
break;
}
response = await this.client.scroll({ scroll_id: response._scroll_id, scroll: '1m' });
}
for (const docId of allDocIds) {
if (!this.sessionDocs.has(docId)) {
responseQueue.push({
delete: {
_index: this.index,
_id: docId,
},
});
if (responseQueue.length >= this.BATCH_SIZE) {
await this.client.bulk({ refresh: true, body: responseQueue });
responseQueue.length = 0;
}
}
}
if (responseQueue.length > 0) {
await this.client.bulk({ refresh: true, body: responseQueue });
}
this.sessionDocs.clear();
}
async takeSnapshot(processIterator: SnapshotProcessor) {
const snapshotIndex = `${this.index}_snapshots`;
const indexExists = await this.client.indices.exists({ index: snapshotIndex });
if (!indexExists) {
await this.client.indices.create({
index: snapshotIndex,
body: {
mappings: {
properties: {
date: {
type: 'date',
},
aggregationData: {
type: 'object',
enabled: true,
},
},
},
},
});
}
const documentIterator = this.getDocumentIterator();
const newSnapshot = await processIterator(documentIterator, await this.getLastSnapshot());
await this.storeSnapshot(newSnapshot);
}
private async getLastSnapshot(): Promise<ISnapshot | null> {
const snapshotIndex = `${this.index}_snapshots`;
const indexExists = await this.client.indices.exists({ index: snapshotIndex });
if (!indexExists) {
return null;
}
const response = await this.client.search({
index: snapshotIndex,
sort: 'date:desc',
size: 1,
});
if (response.hits.hits.length > 0) {
const hit = response.hits.hits[0];
return {
date: hit._source['date'],
aggregationData: hit._source['aggregationData'],
};
} else {
return null;
}
}
private async *getDocumentIterator() {
let response = await this.client.search({
index: this.index,
scroll: '1m',
size: this.BATCH_SIZE,
});
while (true) {
for (const hit of response.hits.hits) {
yield hit._source;
}
if (!response.hits.hits.length) {
break;
}
response = await this.client.scroll({ scroll_id: response._scroll_id, scroll: '1m' });
}
}
private async storeSnapshot(snapshot: ISnapshot) {
await this.client.index({
index: `${this.index}_snapshots`,
body: snapshot,
});
}
}

View File

@ -0,0 +1,103 @@
import * as plugins from './els.plugins.js';
import { ElsSmartlogDestination } from './els.classes.smartlogdestination.js';
import { type ILogPackage } from '@push.rocks/smartlog-interfaces';
import { Stringmap } from '@push.rocks/lik';
export class ElasticIndex {
private stringmap = new Stringmap();
private elasticSearchRef: ElsSmartlogDestination<any>;
constructor(elasticSearchInstanceArg: ElsSmartlogDestination<ILogPackage>) {
this.elasticSearchRef = elasticSearchInstanceArg;
}
public async ensureIndex(prefixArg: string, indexNameArg: string) {
if (this.stringmap.checkString(indexNameArg)) {
return indexNameArg;
}
const responseArg = await this.elasticSearchRef.client.cat.indices({
format: 'json',
bytes: 'mb',
}).catch(err => {
console.log(err);
});
if (!responseArg) {
throw new Error('Could not get valid response from elastic search');
}
if (Array.isArray(responseArg)) {
const filteredIndices = responseArg.filter((indexObjectArg) => {
return indexObjectArg.index.startsWith(prefixArg);
});
const filteredIndexNames = filteredIndices.map((indexObjectArg) => {
return indexObjectArg.index;
});
await this.deleteOldIndices(prefixArg, filteredIndexNames);
}
let index = null;
if (Array.isArray(responseArg)) {
index = responseArg.find((indexItemArg) => {
return indexItemArg.index === indexNameArg;
});
}
if (!index) {
await this.createNewIndex(indexNameArg);
}
this.stringmap.addString(indexNameArg);
return index;
}
public async createNewIndex(indexNameArg: string) {
const response = await this.elasticSearchRef.client.indices.create({
wait_for_active_shards: 1,
index: indexNameArg,
body: {
mappings: {
properties: {
'@timestamp': {
type: 'date',
},
logPackageArg: {
properties: {
payload: {
type: 'object',
dynamic: true
}
}
},
},
},
},
});
}
public async deleteOldIndices(prefixArg: string, indicesArray: string[]) {
const todayAsUnix: number = Date.now();
const rententionPeriodAsUnix: number = plugins.smarttime.units.days(
this.elasticSearchRef.indexRetention
);
for (const indexName of indicesArray) {
if (!indexName.startsWith(prefixArg)) continue;
const indexRegex = new RegExp(`^${prefixArg}-([0-9]*)-([0-9]*)-([0-9]*)$`)
const regexResult = indexRegex.exec(indexName);
const dateAsUnix: number = new Date(
`${regexResult[1]}-${regexResult[2]}-${regexResult[3]}`
).getTime();
if (todayAsUnix - rententionPeriodAsUnix > dateAsUnix) {
console.log(`found old index ${indexName}`);
const response = await this.elasticSearchRef.client.indices.delete(
{
index: indexName,
}).catch(err => {
console.log(err);
});
}
}
}
}

View File

@ -0,0 +1,63 @@
import { ElsSmartlogDestination, type IStandardLogParams } from './els.classes.smartlogdestination.js';
export class ElasticScheduler {
elasticSearchRef: ElsSmartlogDestination<any>;
docsScheduled = false;
docsStorage: any[] = [];
// maximum size of the buffer
maxBufferSize = 500;
constructor(elasticLogRefArg: ElsSmartlogDestination<any>) {
this.elasticSearchRef = elasticLogRefArg;
}
public addFailedDoc(objectArg: any | IStandardLogParams) {
this.addToStorage(objectArg);
this.setRetry();
}
public scheduleDoc(logObject: any) {
this.addToStorage(logObject);
}
private addToStorage(logObject: any) {
this.docsStorage.push(logObject);
// if buffer is full, send logs immediately
if (this.docsStorage.length >= this.maxBufferSize) {
this.flushLogsToElasticSearch();
}
}
private flushLogsToElasticSearch() {
const oldStorage = this.docsStorage;
this.docsStorage = [];
for (let logObject of oldStorage) {
this.elasticSearchRef.log(logObject, true);
}
}
public setRetry() {
setTimeout(() => {
this.flushLogsToElasticSearch();
if (this.docsStorage.length === 0) {
console.log('ElasticLog retry success!!!');
this.docsScheduled = false;
} else {
console.log('ElasticLog retry failed');
this.setRetry();
}
}, 5000);
}
public deferSend() {
if (!this.docsScheduled) {
console.log('Retry ElasticLog in 5 seconds!');
this.docsScheduled = true;
this.setRetry();
}
}
}

View File

@ -0,0 +1,68 @@
import { Client as ElasticClient } from '@elastic/elasticsearch';
interface FastPushOptions {
deleteOldData?: boolean; // Clear the index
deleteIndex?: boolean; // Delete the entire index
}
export class FastPush {
private client: ElasticClient;
constructor(node: string, auth?: { username: string; password: string }) {
this.client = new ElasticClient({
node: node,
...(auth && { auth: auth }),
});
}
async pushToIndex(indexName: string, docArray: any[], options?: FastPushOptions) {
if (docArray.length === 0) return;
const indexExists = await this.client.indices.exists({ index: indexName });
if (indexExists) {
if (options?.deleteIndex) {
await this.client.indices.delete({ index: indexName });
} else if (options?.deleteOldData) {
await this.client.deleteByQuery({
index: indexName,
body: {
query: {
match_all: {}
}
}
});
}
}
if (!indexExists || options?.deleteIndex) {
// Create index with mappings (for simplicity, we use dynamic mapping)
await this.client.indices.create({
index: indexName,
body: {
mappings: {
dynamic: "true"
// ... other specific mappings
},
},
});
}
// Bulk insert documents
const bulkBody = [];
for (const doc of docArray) {
bulkBody.push({
index: {
_index: indexName,
},
});
bulkBody.push(doc);
}
await this.client.bulk({ body: bulkBody });
}
}
// Usage example:
// const fastPush = new FastPush('http://localhost:9200', { username: 'elastic', password: 'password' });
// fastPush.pushToIndex('my_index', [{ name: 'John', age: 30 }, { name: 'Jane', age: 25 }], { deleteOldData: true });

111
ts/els.classes.kvstore.ts Normal file
View File

@ -0,0 +1,111 @@
import * as plugins from './els.plugins.js';
import { Client as ElasticClient } from '@elastic/elasticsearch';
export interface IElasticKVStoreConstructorOptions {
index: string;
node: string;
auth?: {
username: string;
password: string;
};
}
export class ElasticKVStore {
public client: ElasticClient;
public index: string;
private readyDeferred: any;
constructor(options: IElasticKVStoreConstructorOptions) {
this.client = new ElasticClient({
node: options.node,
...(options.auth && { auth: options.auth }),
});
this.index = options.index;
this.readyDeferred = plugins.smartpromise.defer();
this.setupIndex();
}
private async setupIndex() {
try {
const indexExists = await this.client.indices.exists({ index: this.index });
if (!indexExists) {
await this.client.indices.create({
index: this.index,
body: {
mappings: {
properties: {
key: {
type: 'keyword'
},
value: {
type: 'text'
}
}
}
}
});
}
this.readyDeferred.resolve();
} catch (err) {
this.readyDeferred.reject(err);
}
}
async set(key: string, value: string) {
await this.readyDeferred.promise;
await this.client.index({
index: this.index,
id: key,
body: {
key,
value
}
});
}
async get(key: string): Promise<string | null> {
await this.readyDeferred.promise;
try {
const response = await this.client.get({
index: this.index,
id: key
});
return response._source['value'];
} catch (error) {
if (error.meta && error.meta.statusCode === 404) {
return null;
}
throw error;
}
}
async delete(key: string) {
await this.readyDeferred.promise;
try {
await this.client.delete({
index: this.index,
id: key
});
} catch (error) {
if (error.meta && error.meta.statusCode !== 404) {
throw error;
}
}
}
async clear() {
await this.readyDeferred.promise;
await this.client.deleteByQuery({
index: this.index,
body: {
query: {
match_all: {}
}
}
});
}
}

View File

@ -0,0 +1,75 @@
import { Client as ElasticClient } from '@elastic/elasticsearch';
import type { ILogContext, ILogPackage, ILogDestination } from '@push.rocks/smartlog-interfaces';
import { ElasticScheduler } from './els.classes.elasticscheduler.js';
import { ElasticIndex } from './els.classes.elasticindex.js';
export interface IStandardLogParams {
message: string;
severity: string;
}
export interface IElasticSearchConstructorOptions {
indexPrefix: string;
indexRetention: number;
node: string;
auth?: {
username: string;
password: string;
};
}
export class ElsSmartlogDestination<T> {
public client: ElasticClient;
public elasticScheduler = new ElasticScheduler(this);
public elasticIndex: ElasticIndex = new ElasticIndex(this);
public indexPrefix: string;
public indexRetention: number;
constructor(optionsArg: IElasticSearchConstructorOptions) {
this.client = new ElasticClient({
node: optionsArg.node,
...(optionsArg.auth && { auth: optionsArg.auth }),
});
this.indexPrefix = `${optionsArg.indexPrefix}`;
this.indexRetention = optionsArg.indexRetention;
this.setupDataStream();
}
private async setupDataStream() {
// Define an index template
await this.client.indices.putIndexTemplate({
name: `${this.indexPrefix}_template`,
index_patterns: [`${this.indexPrefix}-*`],
data_stream: {},
});
}
public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) {
const now = new Date();
const indexToUse = `${this.indexPrefix}-data-stream`; // Use data stream name
if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) {
this.elasticScheduler.scheduleDoc(logPackageArg);
return;
}
await this.client.index(
{
index: indexToUse,
body: {
'@timestamp': new Date(logPackageArg.timestamp).toISOString(),
...logPackageArg,
},
}
);
}
get logDestination(): ILogDestination {
return {
handleLog: async (smartlogPackageArg: ILogPackage) => {
await this.log(smartlogPackageArg);
},
};
}
}

8
ts/els.plugins.ts Normal file
View File

@ -0,0 +1,8 @@
import * as elasticsearch from '@elastic/elasticsearch';
import * as lik from '@push.rocks/lik';
import * as smartdelay from '@push.rocks/smartdelay';
import * as smartlogInterfaces from '@push.rocks/smartlog-interfaces';
import * as smartpromise from '@push.rocks/smartpromise';
import * as smarttime from '@push.rocks/smarttime';
export { elasticsearch, lik, smartdelay, smartlogInterfaces, smartpromise, smarttime };

View File

@ -1 +1,4 @@
export * from './elasticsearch.classes.elasticsearch.js'; export * from './els.classes.smartlogdestination.js';
export * from './els.classes.fastpush.js';
export * from './els.classes.elasticdoc.js';
export * from './els.classes.kvstore.js';