fix(core): update
This commit is contained in:
45
ts/elasticsearch.classes.elasticindex.ts
Normal file
45
ts/elasticsearch.classes.elasticindex.ts
Normal file
@ -0,0 +1,45 @@
|
||||
import * as plugins from './elasticsearch.plugins';
|
||||
import { ElasticSearch } from './elasticsearch.classes.elasticsearch';
|
||||
import { ILogPackage } from '@pushrocks/smartlog-interfaces';
|
||||
|
||||
import { Stringmap } from '@pushrocks/lik';
|
||||
|
||||
export class ElasticIndex {
|
||||
private stringmap = new Stringmap();
|
||||
private elasticSearchRef: ElasticSearch<any>;
|
||||
|
||||
constructor(elasticSearchInstanceArg: ElasticSearch<ILogPackage>) {
|
||||
this.elasticSearchRef = elasticSearchInstanceArg;
|
||||
}
|
||||
|
||||
public async ensureIndex(indexArg: string) {
|
||||
const done = plugins.smartpromise.defer();
|
||||
if(this.stringmap.checkString(indexArg)) {
|
||||
return;
|
||||
}
|
||||
this.elasticSearchRef.client.cat.indices({
|
||||
format: 'json',
|
||||
bytes: 'm'
|
||||
}, async (err, response: any[]) => {
|
||||
// console.log(response);
|
||||
const index = response.find(indexObject => {
|
||||
return indexObject.index === indexArg;
|
||||
});
|
||||
|
||||
if(!index) {
|
||||
const done2 = plugins.smartpromise.defer();
|
||||
this.elasticSearchRef.client.indices.create({
|
||||
waitForActiveShards: '2',
|
||||
index: indexArg
|
||||
}, (error, response) => {
|
||||
// console.lof(response)
|
||||
done2.resolve();
|
||||
});
|
||||
await done2.promise;
|
||||
}
|
||||
this.stringmap.addString(indexArg);
|
||||
done.resolve();
|
||||
});
|
||||
await done.promise;
|
||||
}
|
||||
}
|
44
ts/elasticsearch.classes.elasticscheduler.ts
Normal file
44
ts/elasticsearch.classes.elasticscheduler.ts
Normal file
@ -0,0 +1,44 @@
|
||||
import { ElasticSearch, IStandardLogParams } from './elasticsearch.classes.elasticsearch';
|
||||
|
||||
export class ElasticScheduler {
|
||||
elasticSearchRef: ElasticSearch<any>;
|
||||
docsScheduled = false;
|
||||
docsStorage: any[] = [];
|
||||
|
||||
constructor(elasticLogRefArg: ElasticSearch<any>) {
|
||||
this.elasticSearchRef = elasticLogRefArg;
|
||||
}
|
||||
|
||||
public addFailedDoc(objectArg: any | IStandardLogParams) {
|
||||
this.docsStorage.push(objectArg);
|
||||
this.setRetry();
|
||||
}
|
||||
public scheduleDoc(logObject: any) {
|
||||
this.docsStorage.push(logObject);
|
||||
}
|
||||
|
||||
public setRetry() {
|
||||
setTimeout(() => {
|
||||
const oldStorage = this.docsStorage;
|
||||
this.docsStorage = [];
|
||||
for (let logObject of oldStorage) {
|
||||
this.elasticSearchRef.log(logObject, true);
|
||||
}
|
||||
if (this.docsStorage.length === 0) {
|
||||
console.log('ElasticLog retry success!!!');
|
||||
this.docsScheduled = false;
|
||||
} else {
|
||||
console.log('ElasticLog retry failed');
|
||||
this.setRetry();
|
||||
}
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
public deferSend() {
|
||||
if (!this.docsScheduled) {
|
||||
console.log('Retry ElasticLog in 5 seconds!');
|
||||
this.docsScheduled = true;
|
||||
this.setRetry();
|
||||
}
|
||||
}
|
||||
}
|
@ -3,7 +3,8 @@ import { Client as ElasticClient } from 'elasticsearch';
|
||||
import { ILogContext, ILogPackage, ILogDestination } from '@pushrocks/smartlog-interfaces';
|
||||
|
||||
// other classes
|
||||
import { LogScheduler } from './elasticsearch.classes.logscheduler';
|
||||
import { ElasticScheduler } from './elasticsearch.classes.elasticscheduler';
|
||||
import { ElasticIndex } from './elasticsearch.classes.elasticindex';
|
||||
|
||||
export interface IStandardLogParams {
|
||||
message: string;
|
||||
@ -19,8 +20,9 @@ export interface IElasticLogConstructorOptions {
|
||||
}
|
||||
|
||||
export class ElasticSearch<T> {
|
||||
client: ElasticClient;
|
||||
logScheduler = new LogScheduler(this);
|
||||
public client: ElasticClient;
|
||||
public elasticScheduler = new ElasticScheduler(this);
|
||||
public elasticIndex: ElasticIndex = new ElasticIndex(this);
|
||||
|
||||
/**
|
||||
* sets up an instance of Elastic log
|
||||
@ -52,15 +54,21 @@ export class ElasticSearch<T> {
|
||||
|
||||
public async log(logPackageArg: ILogPackage, scheduleOverwrite = false) {
|
||||
const now = new Date();
|
||||
if (this.logScheduler.logsScheduled && !scheduleOverwrite) {
|
||||
this.logScheduler.scheduleLog(logPackageArg);
|
||||
const indexToUse = `smartlog-${now.getFullYear()}.${('0' + (now.getMonth() + 1)).slice(-2)}.${(
|
||||
'0' + now.getDate()
|
||||
).slice(-2)}`;
|
||||
|
||||
|
||||
if (this.elasticScheduler.docsScheduled && !scheduleOverwrite) {
|
||||
this.elasticScheduler.scheduleDoc(logPackageArg);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.elasticIndex.ensureIndex(indexToUse);
|
||||
|
||||
this.client.index(
|
||||
{
|
||||
index: `smartlog-${now.getFullYear()}.${('0' + (now.getMonth() + 1)).slice(-2)}.${(
|
||||
'0' + now.getDate()
|
||||
).slice(-2)}`,
|
||||
index: indexToUse,
|
||||
type: 'log',
|
||||
body: {
|
||||
'@timestamp': new Date(logPackageArg.timestamp).toISOString(),
|
||||
@ -71,7 +79,7 @@ export class ElasticSearch<T> {
|
||||
if (error) {
|
||||
console.log('ElasticLog encountered an error:');
|
||||
console.log(error);
|
||||
this.logScheduler.addFailedLog(logPackageArg);
|
||||
this.elasticScheduler.addFailedDoc(logPackageArg);
|
||||
} else {
|
||||
console.log(`ElasticLog: ${logPackageArg.message}`);
|
||||
}
|
||||
|
@ -1,44 +0,0 @@
|
||||
import { ElasticSearch, IStandardLogParams } from './elasticsearch.classes.elasticsearch';
|
||||
|
||||
export class LogScheduler {
|
||||
elasticLogRef: ElasticSearch<any>;
|
||||
logsScheduled = false;
|
||||
logStorage: any[] = [];
|
||||
|
||||
constructor(elasticLogRefArg: ElasticSearch<any>) {
|
||||
this.elasticLogRef = elasticLogRefArg;
|
||||
}
|
||||
|
||||
addFailedLog(objectArg: any | IStandardLogParams) {
|
||||
this.logStorage.push(objectArg);
|
||||
this.setRetry();
|
||||
}
|
||||
scheduleLog(logObject: any) {
|
||||
this.logStorage.push(logObject);
|
||||
}
|
||||
|
||||
setRetry() {
|
||||
setTimeout(() => {
|
||||
const oldStorage = this.logStorage;
|
||||
this.logStorage = [];
|
||||
for (let logObject of oldStorage) {
|
||||
this.elasticLogRef.log(logObject, true);
|
||||
}
|
||||
if (this.logStorage.length === 0) {
|
||||
console.log('ElasticLog retry success!!!');
|
||||
this.logsScheduled = false;
|
||||
} else {
|
||||
console.log('ElasticLog retry failed');
|
||||
this.setRetry();
|
||||
}
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
deferSend() {
|
||||
if (!this.logsScheduled) {
|
||||
console.log('Retry ElasticLog in 5 seconds!');
|
||||
this.logsScheduled = true;
|
||||
this.setRetry();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,6 @@
|
||||
import * as elasticsearch from 'elasticsearch';
|
||||
import * as smartdelay from '@pushrocks/smartdelay';
|
||||
import * as smartlogInterfaces from '@pushrocks/smartlog-interfaces';
|
||||
export { elasticsearch, smartdelay, smartlogInterfaces };
|
||||
import * as smartpromise from '@pushrocks/smartpromise';
|
||||
|
||||
export { elasticsearch, smartdelay, smartlogInterfaces, smartpromise };
|
||||
|
Reference in New Issue
Block a user