import * as plugins from './smartclickhouse.plugins.js'; import { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js'; export type TClickhouseColumnDataType = | 'String' | "DateTime64(3, 'Europe/Berlin')" | 'Float64' | 'Array(String)' | 'Array(Float64)'; export interface IColumnInfo { database: string; table: string; name: string; type: TClickhouseColumnDataType; position: string; default_kind: string; default_expression: string; data_compressed_bytes: string; data_uncompressed_bytes: string; marks_bytes: string; comment: string; is_in_partition_key: 0 | 1; is_in_sorting_key: 0 | 1; is_in_primary_key: 0 | 1; is_in_sampling_key: 0 | 1; compression_codec: string; character_octet_length: null; numeric_precision: null; numeric_precision_radix: null; numeric_scale: null; datetime_precision: '3'; } export interface ITimeDataTableOptions { tableName: string; retainDataForDays: number; } export class TimeDataTable { public static async getTable(smartClickHouseDbRefArg: SmartClickHouseDb, tableNameArg: string) { const newTable = new TimeDataTable(smartClickHouseDbRefArg, { tableName: tableNameArg, retainDataForDays: 30, }); await newTable.setup(); return newTable; } // INSTANCE public healingDeferred: plugins.smartpromise.Deferred; public smartClickHouseDbRef: SmartClickHouseDb; public options: ITimeDataTableOptions; constructor(smartClickHouseDbRefArg: SmartClickHouseDb, optionsArg: ITimeDataTableOptions) { this.smartClickHouseDbRef = smartClickHouseDbRefArg; this.options = optionsArg; } public async setup() { // create table in clickhouse await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` CREATE TABLE IF NOT EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ( timestamp DateTime64(3, 'Europe/Berlin'), message String ) ENGINE=MergeTree() ORDER BY timestamp`); // lets adjust the TTL await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} MODIFY TTL toDateTime(timestamp) + INTERVAL ${this.options.retainDataForDays} DAY `); await this.updateColumns(); console.log(`=======================`); console.log( `table with name "${this.options.tableName}" in database ${this.smartClickHouseDbRef.options.database} has the following columns:` ); for (const column of this.columns) { console.log(`>> ${column.name}: ${column.type}`); } console.log('^^^^^^^^^^^^^^\n'); } public columns: IColumnInfo[] = []; /** * updates the columns */ public async updateColumns() { this.columns = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` SELECT * FROM system.columns WHERE database LIKE '${this.smartClickHouseDbRef.options.database}' AND table LIKE '${this.options.tableName}' FORMAT JSONEachRow `); return this.columns; } /** * stores a json and tries to map it to the nested syntax */ public async addData(dataArg: any) { if (this.healingDeferred) { return; } // the storageJson let storageJson: { [key: string]: any } = {}; // helper stuff const getClickhouseTypeForValue = (valueArg: any): TClickhouseColumnDataType => { const typeConversion: { [key: string]: TClickhouseColumnDataType } = { string: 'String', number: 'Float64', undefined: null, null: null, }; if (valueArg instanceof Array) { const arrayType = typeConversion[typeof valueArg[0] as string]; if (!arrayType) { return null; } else { return `Array(${arrayType})` as TClickhouseColumnDataType; } } return typeConversion[typeof valueArg as string]; }; const checkPath = async ( pathArg: string, typeArg: TClickhouseColumnDataType, prechecked = false ) => { let columnFound = false; for (const column of this.columns) { if (pathArg === column.name) { columnFound = true; break; } } if (!columnFound) { if (!prechecked) { await this.updateColumns(); await checkPath(pathArg, typeArg, true); return; } const alterString = `ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ADD COLUMN ${pathArg} ${typeArg} FIRST`; try { await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` ${alterString} `); } catch (err) { console.log(alterString); for (const column of this.columns) { console.log(column.name); } } await this.updateColumns(); } }; // key checking const flatDataArg = plugins.smartobject.toFlatObject(dataArg); for (const key of Object.keys(flatDataArg)) { const value = flatDataArg[key]; if (key === 'timestamp' && typeof value !== 'number') { throw new Error('timestamp must be of type number'); } else if (key === 'timestamp') { storageJson.timestamp = flatDataArg[key]; continue; } // lets deal with the rest const clickhouseType = getClickhouseTypeForValue(value); if (!clickhouseType) { continue; } await checkPath(key, clickhouseType); storageJson[key] = value; } const result = await this.smartClickHouseDbRef.clickhouseHttpClient .insertPromise(this.smartClickHouseDbRef.options.database, this.options.tableName, [ storageJson, ]) .catch(async () => { if (this.healingDeferred) { return; } this.healingDeferred = plugins.smartpromise.defer(); console.log(`Ran into an error. Trying to set up things properly again.`); await this.smartClickHouseDbRef.pingDatabaseUntilAvailable(); await this.smartClickHouseDbRef.createDatabase(); await this.setup(); this.columns = []; this.healingDeferred.resolve(); this.healingDeferred = null; }); return result; } /** * deletes the entire table */ public async delete() { await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` DROP TABLE IF EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} `); this.columns = []; } /** * deletes entries older than a specified number of days * @param days number of days */ public async deleteOldEntries(days: number) { // Perform the deletion operation await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} DELETE WHERE timestamp < now() - INTERVAL ${days} DAY `); await this.waitForMutations(); } public async waitForMutations() { // Wait for the mutation to complete let mutations; do { mutations = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` SELECT count() AS mutations_count FROM system.mutations WHERE is_done = 0 AND table = '${this.options.tableName}' `); if (mutations[0] && mutations[0].mutations_count > 0) { console.log('Waiting for mutations to complete...'); await new Promise((resolve) => setTimeout(resolve, 1000)); } } while (mutations[0] && mutations[0].mutations_count > 0); } public async getLastEntries(count: number) { const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ORDER BY timestamp DESC LIMIT ${count} FORMAT JSONEachRow `); return result; } public async getEntriesNewerThan(unixTimestamp: number) { const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} WHERE timestamp > toDateTime(${unixTimestamp / 1000}) FORMAT JSONEachRow `); return result; } public async getEntriesBetween(unixTimestampStart: number, unixTimestampEnd: number) { const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} WHERE timestamp > toDateTime(${unixTimestampStart / 1000}) AND timestamp < toDateTime(${unixTimestampEnd / 1000}) FORMAT JSONEachRow `); return result; } /** * streams all new entries using an observable */ public streamNewEntries(): plugins.smartrx.rxjs.Observable { return new plugins.smartrx.rxjs.Observable((observer) => { const pollInterval = 1000; // Poll every 1 second let lastTimestamp: number; const fetchLastEntryTimestamp = async () => { const lastEntry = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(` SELECT max(timestamp) as lastTimestamp FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} FORMAT JSONEachRow `); lastTimestamp = lastEntry.length ? new Date(lastEntry[0].lastTimestamp).getTime() : Date.now(); }; const fetchNewEntries = async () => { const newEntries = await this.getEntriesNewerThan(lastTimestamp); if (newEntries.length > 0) { for (const entry of newEntries) { observer.next(entry); } lastTimestamp = new Date(newEntries[newEntries.length - 1].timestamp).getTime(); } }; const startPolling = async () => { await fetchLastEntryTimestamp(); const intervalId = setInterval(fetchNewEntries, pollInterval); // Cleanup on unsubscribe return () => clearInterval(intervalId); }; startPolling().catch((err) => observer.error(err)); }); } }