import * as plugins from './smartclickhouse.plugins.js'; import { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js'; export type TClickhouseColumnDataType = 'String' | "DateTime64(3, 'Europe/Berlin')" | 'Float64' | 'Array(String)' | 'Array(Float64)'; export interface IColumnInfo { database: string; table: string; name: string; type: TClickhouseColumnDataType; position: string; default_kind: string; default_expression: string; data_compressed_bytes: string; data_uncompressed_bytes: string; marks_bytes: string; comment: string; is_in_partition_key: 0 | 1; is_in_sorting_key: 0 | 1; is_in_primary_key: 0 | 1; is_in_sampling_key: 0 | 1; compression_codec: string; character_octet_length: null; numeric_precision: null; numeric_precision_radix: null; numeric_scale: null; datetime_precision: '3'; } export interface ITimeDataTableOptions { tableName: string; retainDataForDays: number; } export class TimeDataTable { public static async getTable(smartClickHouseDbRefArg: SmartClickHouseDb, tableNameArg: string) { const newTable = new TimeDataTable(smartClickHouseDbRefArg, { tableName: tableNameArg, retainDataForDays: 30 }); await newTable.setup(); return newTable; } // INSTANCE public healingDeferred: plugins.smartpromise.Deferred; public smartClickHouseDbRef: SmartClickHouseDb; public options: ITimeDataTableOptions; constructor(smartClickHouseDbRefArg: SmartClickHouseDb, optionsArg: ITimeDataTableOptions) { this.smartClickHouseDbRef = smartClickHouseDbRefArg; this.options = optionsArg; } public async setup() { // create table in clickhouse await this.smartClickHouseDbRef.clickhouseClient .queryPromise(` CREATE TABLE IF NOT EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ( timestamp DateTime64(3, 'Europe/Berlin'), message String ) ENGINE=MergeTree() ORDER BY timestamp`); // lets adjust the TTL await this.smartClickHouseDbRef.clickhouseClient .queryPromise(` ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} MODIFY TTL toDateTime(timestamp) + INTERVAL ${this.options.retainDataForDays} DAY `); await this.updateColumns(); console.log(`=======================`) console.log( `table with name "${this.options.tableName}" in database ${this.smartClickHouseDbRef.options.database} has the following columns:` ); for (const column of this.columns) { console.log(`>> ${column.name}: ${column.type}`); } console.log('^^^^^^^^^^^^^^\n'); } public columns: IColumnInfo[] = []; /** * updates the columns */ public async updateColumns() { this.columns = await this.smartClickHouseDbRef.clickhouseClient.queryPromise(` SELECT * FROM system.columns WHERE database LIKE '${this.smartClickHouseDbRef.options.database}' AND table LIKE '${this.options.tableName}' FORMAT JSONEachRow `); return this.columns; } /** * stores a json and tries to map it to the nested syntax */ public async addData(dataArg: any) { if (this.healingDeferred) { return; } // the storageJson let storageJson: { [key: string]: any } = {}; // helper stuff const getClickhouseTypeForValue = (valueArg: any): TClickhouseColumnDataType => { const typeConversion: {[key: string]: TClickhouseColumnDataType} = { string: 'String', number: 'Float64', undefined: null, null: null }; if (valueArg instanceof Array) { const arrayType = typeConversion[(typeof valueArg[0]) as string]; if (!arrayType) { return null; } else { return `Array(${arrayType})` as TClickhouseColumnDataType; } } return typeConversion[(typeof valueArg) as string]; } const checkPath = async (pathArg: string, typeArg: TClickhouseColumnDataType, prechecked = false) => { let columnFound = false; for (const column of this.columns) { if (pathArg === column.name) { columnFound = true; break; } } if (!columnFound) { if (!prechecked) { await this.updateColumns(); await checkPath(pathArg, typeArg, true); return; } const alterString = `ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ADD COLUMN ${pathArg} ${typeArg} FIRST` try { await this.smartClickHouseDbRef.clickhouseClient.queryPromise(` ${alterString} `); } catch(err) { console.log(alterString); for (const column of this.columns) { console.log(column.name); } } await this.updateColumns(); } }; // key checking const flatDataArg = plugins.smartobject.toFlatObject(dataArg); for (const key of Object.keys(flatDataArg)) { const value = flatDataArg[key]; if (key === 'timestamp' && typeof value !== 'number') { throw new Error('timestamp must be of type number'); } else if (key === 'timestamp') { storageJson.timestamp = flatDataArg[key]; continue; } // lets deal with the rest const clickhouseType = getClickhouseTypeForValue(value); if (!clickhouseType) { continue; } await checkPath(key, clickhouseType); storageJson[key] = value; } const result = await this.smartClickHouseDbRef.clickhouseClient.insertPromise(this.smartClickHouseDbRef.options.database, this.options.tableName, [ storageJson, ]).catch(async () => { if (this.healingDeferred) { return; } this.healingDeferred = plugins.smartpromise.defer(); console.log(`Ran into an error. Trying to set up things properly again.`); await this.smartClickHouseDbRef.pingDatabaseUntilAvailable(); await this.smartClickHouseDbRef.createDatabase(); await this.setup(); this.columns = []; this.healingDeferred.resolve(); this.healingDeferred = null; }); return result; } }