184 lines
5.8 KiB
TypeScript
184 lines
5.8 KiB
TypeScript
import * as plugins from './smartclickhouse.plugins.js';
|
|
import { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js';
|
|
|
|
export type TClickhouseColumnDataType = 'String' | "DateTime64(3, 'Europe/Berlin')" | 'Float64' | 'Array(String)' | 'Array(Float64)';
|
|
export interface IColumnInfo {
|
|
database: string;
|
|
table: string;
|
|
name: string;
|
|
type: TClickhouseColumnDataType;
|
|
position: string;
|
|
default_kind: string;
|
|
default_expression: string;
|
|
data_compressed_bytes: string;
|
|
data_uncompressed_bytes: string;
|
|
marks_bytes: string;
|
|
comment: string;
|
|
is_in_partition_key: 0 | 1;
|
|
is_in_sorting_key: 0 | 1;
|
|
is_in_primary_key: 0 | 1;
|
|
is_in_sampling_key: 0 | 1;
|
|
compression_codec: string;
|
|
character_octet_length: null;
|
|
numeric_precision: null;
|
|
numeric_precision_radix: null;
|
|
numeric_scale: null;
|
|
datetime_precision: '3';
|
|
}
|
|
|
|
export interface ITimeDataTableOptions {
|
|
tableName: string;
|
|
retainDataForDays: number;
|
|
}
|
|
|
|
export class TimeDataTable {
|
|
public static async getTable(smartClickHouseDbRefArg: SmartClickHouseDb, tableNameArg: string) {
|
|
const newTable = new TimeDataTable(smartClickHouseDbRefArg, {
|
|
tableName: tableNameArg,
|
|
retainDataForDays: 30
|
|
});
|
|
|
|
// create table in clickhouse
|
|
await smartClickHouseDbRefArg.clickhouseClient
|
|
.queryPromise(`
|
|
CREATE TABLE IF NOT EXISTS ${newTable.options.tableName} (
|
|
timestamp DateTime64(3, 'Europe/Berlin'),
|
|
message String
|
|
) ENGINE=MergeTree() ORDER BY timestamp`);
|
|
|
|
// lets adjust the TTL
|
|
await smartClickHouseDbRefArg.clickhouseClient
|
|
.queryPromise(`
|
|
ALTER TABLE ${newTable.options.tableName} MODIFY TTL toDateTime(timestamp) + INTERVAL ${newTable.options.retainDataForDays} DAY
|
|
`);
|
|
|
|
await newTable.updateColumns();
|
|
console.log(`=======================`)
|
|
console.log(
|
|
`table with name "${newTable.options.tableName}" in databse ${newTable.smartClickHouseDbRef.options.database} has the following columns:`
|
|
);
|
|
for (const column of newTable.columns) {
|
|
console.log(`>> ${column.name}: ${column.type}`);
|
|
}
|
|
console.log('^^^^^^^^^^^^^^\n');
|
|
|
|
return newTable;
|
|
}
|
|
|
|
// INSTANCE
|
|
public healingDeferred: plugins.smartpromise.Deferred<any>;
|
|
public smartClickHouseDbRef: SmartClickHouseDb;
|
|
public options: ITimeDataTableOptions;
|
|
|
|
constructor(smartClickHouseDbRefArg: SmartClickHouseDb, optionsArg: ITimeDataTableOptions) {
|
|
this.smartClickHouseDbRef = smartClickHouseDbRefArg;
|
|
this.options = optionsArg;
|
|
}
|
|
|
|
public columns: IColumnInfo[] = [];
|
|
|
|
/**
|
|
* updates the columns
|
|
*/
|
|
public async updateColumns() {
|
|
this.columns = await this.smartClickHouseDbRef.clickhouseClient.queryPromise(`
|
|
SELECT * FROM system.columns
|
|
WHERE database LIKE '${this.smartClickHouseDbRef.options.database}'
|
|
AND table LIKE '${this.options.tableName}'
|
|
`);
|
|
return this.columns;
|
|
}
|
|
|
|
/**
|
|
* stores a json and tries to map it to the nested syntax
|
|
*/
|
|
public async addData(dataArg: any) {
|
|
|
|
// the storageJson
|
|
let storageJson: { [key: string]: any } = {};
|
|
|
|
// helper stuff
|
|
|
|
const getClickhouseTypeForValue = (valueArg: any): TClickhouseColumnDataType => {
|
|
const typeConversion: {[key: string]: TClickhouseColumnDataType} = {
|
|
string: 'String',
|
|
number: 'Float64',
|
|
undefined: null,
|
|
null: null
|
|
};
|
|
if (valueArg instanceof Array) {
|
|
const arrayType = typeConversion[(typeof valueArg[0]) as string];
|
|
if (!arrayType) {
|
|
return null;
|
|
} else {
|
|
return `Array(${arrayType})` as TClickhouseColumnDataType;
|
|
}
|
|
}
|
|
return typeConversion[(typeof valueArg) as string];
|
|
}
|
|
const checkPath = async (pathArg: string, typeArg: TClickhouseColumnDataType, prechecked = false) => {
|
|
let columnFound = false;
|
|
for (const column of this.columns) {
|
|
if (pathArg === column.name) {
|
|
columnFound = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!columnFound) {
|
|
if (!prechecked) {
|
|
await this.updateColumns();
|
|
await checkPath(pathArg, typeArg, true);
|
|
return;
|
|
}
|
|
const alterString = `ALTER TABLE ${this.options.tableName} ADD COLUMN ${pathArg} ${typeArg} FIRST`
|
|
try {
|
|
await this.smartClickHouseDbRef.clickhouseClient.queryPromise(`
|
|
${alterString}
|
|
`);
|
|
} catch(err) {
|
|
console.log(alterString);
|
|
for (const column of this.columns) {
|
|
console.log(column.name);
|
|
}
|
|
}
|
|
await this.updateColumns();
|
|
}
|
|
};
|
|
|
|
// key checking
|
|
const flatDataArg = plugins.smartobject.toFlatObject(dataArg);
|
|
for (const key of Object.keys(flatDataArg)) {
|
|
const value = flatDataArg[key];
|
|
if (key === 'timestamp' && typeof value !== 'number') {
|
|
throw new Error('timestamp must be of type number');
|
|
} else if (key === 'timestamp') {
|
|
storageJson.timestamp = flatDataArg[key];
|
|
continue;
|
|
}
|
|
// lets deal with the rest
|
|
const clickhouseType = getClickhouseTypeForValue(value);
|
|
if (!clickhouseType) {
|
|
continue;
|
|
}
|
|
await checkPath(key, clickhouseType);
|
|
storageJson[key] = value;
|
|
}
|
|
|
|
const result = await this.smartClickHouseDbRef.clickhouseClient.insertPromise(this.options.tableName, [
|
|
storageJson,
|
|
]).catch(async () => {
|
|
if (this.healingDeferred) {
|
|
return;
|
|
}
|
|
this.healingDeferred = plugins.smartpromise.defer();
|
|
console.log(`Ran into an error. Trying to set up things properly again.`);
|
|
await this.smartClickHouseDbRef.pingDatabaseUntilAvailable();
|
|
await this.smartClickHouseDbRef.createDatabase();
|
|
this.columns = [];
|
|
this.healingDeferred.resolve();
|
|
this.healingDeferred = null;
|
|
});
|
|
return result;
|
|
}
|
|
}
|