305 lines
9.9 KiB
TypeScript
305 lines
9.9 KiB
TypeScript
import * as plugins from './smartclickhouse.plugins.js';
|
|
import { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js';
|
|
|
|
export type TClickhouseColumnDataType =
|
|
| 'String'
|
|
| "DateTime64(3, 'Europe/Berlin')"
|
|
| 'Float64'
|
|
| 'Array(String)'
|
|
| 'Array(Float64)';
|
|
export interface IColumnInfo {
|
|
database: string;
|
|
table: string;
|
|
name: string;
|
|
type: TClickhouseColumnDataType;
|
|
position: string;
|
|
default_kind: string;
|
|
default_expression: string;
|
|
data_compressed_bytes: string;
|
|
data_uncompressed_bytes: string;
|
|
marks_bytes: string;
|
|
comment: string;
|
|
is_in_partition_key: 0 | 1;
|
|
is_in_sorting_key: 0 | 1;
|
|
is_in_primary_key: 0 | 1;
|
|
is_in_sampling_key: 0 | 1;
|
|
compression_codec: string;
|
|
character_octet_length: null;
|
|
numeric_precision: null;
|
|
numeric_precision_radix: null;
|
|
numeric_scale: null;
|
|
datetime_precision: '3';
|
|
}
|
|
|
|
export interface ITimeDataTableOptions {
|
|
tableName: string;
|
|
retainDataForDays: number;
|
|
}
|
|
|
|
export class TimeDataTable {
|
|
public static async getTable(smartClickHouseDbRefArg: SmartClickHouseDb, tableNameArg: string) {
|
|
const newTable = new TimeDataTable(smartClickHouseDbRefArg, {
|
|
tableName: tableNameArg,
|
|
retainDataForDays: 30,
|
|
});
|
|
|
|
await newTable.setup();
|
|
|
|
return newTable;
|
|
}
|
|
|
|
// INSTANCE
|
|
public healingDeferred: plugins.smartpromise.Deferred<any>;
|
|
public smartClickHouseDbRef: SmartClickHouseDb;
|
|
public options: ITimeDataTableOptions;
|
|
|
|
constructor(smartClickHouseDbRefArg: SmartClickHouseDb, optionsArg: ITimeDataTableOptions) {
|
|
this.smartClickHouseDbRef = smartClickHouseDbRefArg;
|
|
this.options = optionsArg;
|
|
}
|
|
|
|
public async setup() {
|
|
// create table in clickhouse
|
|
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
CREATE TABLE IF NOT EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} (
|
|
timestamp DateTime64(3, 'Europe/Berlin'),
|
|
message String
|
|
) ENGINE=MergeTree() ORDER BY timestamp`);
|
|
|
|
// lets adjust the TTL
|
|
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} MODIFY TTL toDateTime(timestamp) + INTERVAL ${this.options.retainDataForDays} DAY
|
|
`);
|
|
|
|
await this.updateColumns();
|
|
console.log(`=======================`);
|
|
console.log(
|
|
`table with name "${this.options.tableName}" in database ${this.smartClickHouseDbRef.options.database} has the following columns:`
|
|
);
|
|
for (const column of this.columns) {
|
|
console.log(`>> ${column.name}: ${column.type}`);
|
|
}
|
|
console.log('^^^^^^^^^^^^^^\n');
|
|
}
|
|
|
|
public columns: IColumnInfo[] = [];
|
|
|
|
/**
|
|
* updates the columns
|
|
*/
|
|
public async updateColumns() {
|
|
this.columns = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
SELECT * FROM system.columns
|
|
WHERE database LIKE '${this.smartClickHouseDbRef.options.database}'
|
|
AND table LIKE '${this.options.tableName}' FORMAT JSONEachRow
|
|
`);
|
|
return this.columns;
|
|
}
|
|
|
|
/**
|
|
* stores a json and tries to map it to the nested syntax
|
|
*/
|
|
public async addData(dataArg: any) {
|
|
if (this.healingDeferred) {
|
|
return;
|
|
}
|
|
|
|
// the storageJson
|
|
let storageJson: { [key: string]: any } = {};
|
|
|
|
// helper stuff
|
|
|
|
const getClickhouseTypeForValue = (valueArg: any): TClickhouseColumnDataType => {
|
|
const typeConversion: { [key: string]: TClickhouseColumnDataType } = {
|
|
string: 'String',
|
|
number: 'Float64',
|
|
undefined: null,
|
|
null: null,
|
|
};
|
|
if (valueArg instanceof Array) {
|
|
const arrayType = typeConversion[typeof valueArg[0] as string];
|
|
if (!arrayType) {
|
|
return null;
|
|
} else {
|
|
return `Array(${arrayType})` as TClickhouseColumnDataType;
|
|
}
|
|
}
|
|
return typeConversion[typeof valueArg as string];
|
|
};
|
|
const checkPath = async (
|
|
pathArg: string,
|
|
typeArg: TClickhouseColumnDataType,
|
|
prechecked = false
|
|
) => {
|
|
let columnFound = false;
|
|
for (const column of this.columns) {
|
|
if (pathArg === column.name) {
|
|
columnFound = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!columnFound) {
|
|
if (!prechecked) {
|
|
await this.updateColumns();
|
|
await checkPath(pathArg, typeArg, true);
|
|
return;
|
|
}
|
|
const alterString = `ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ADD COLUMN ${pathArg} ${typeArg} FIRST`;
|
|
try {
|
|
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
${alterString}
|
|
`);
|
|
} catch (err) {
|
|
console.log(alterString);
|
|
for (const column of this.columns) {
|
|
console.log(column.name);
|
|
}
|
|
}
|
|
await this.updateColumns();
|
|
}
|
|
};
|
|
|
|
// key checking
|
|
const flatDataArg = plugins.smartobject.toFlatObject(dataArg);
|
|
for (const key of Object.keys(flatDataArg)) {
|
|
const value = flatDataArg[key];
|
|
if (key === 'timestamp' && typeof value !== 'number') {
|
|
throw new Error('timestamp must be of type number');
|
|
} else if (key === 'timestamp') {
|
|
storageJson.timestamp = flatDataArg[key];
|
|
continue;
|
|
}
|
|
// lets deal with the rest
|
|
const clickhouseType = getClickhouseTypeForValue(value);
|
|
if (!clickhouseType) {
|
|
continue;
|
|
}
|
|
await checkPath(key, clickhouseType);
|
|
storageJson[key] = value;
|
|
}
|
|
|
|
const result = await this.smartClickHouseDbRef.clickhouseHttpClient
|
|
.insertPromise(this.smartClickHouseDbRef.options.database, this.options.tableName, [
|
|
storageJson,
|
|
])
|
|
.catch(async () => {
|
|
if (this.healingDeferred) {
|
|
return;
|
|
}
|
|
this.healingDeferred = plugins.smartpromise.defer();
|
|
console.log(`Ran into an error. Trying to set up things properly again.`);
|
|
await this.smartClickHouseDbRef.pingDatabaseUntilAvailable();
|
|
await this.smartClickHouseDbRef.createDatabase();
|
|
await this.setup();
|
|
this.columns = [];
|
|
this.healingDeferred.resolve();
|
|
this.healingDeferred = null;
|
|
});
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* deletes the entire table
|
|
*/
|
|
public async delete() {
|
|
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
DROP TABLE IF EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
|
|
`);
|
|
this.columns = [];
|
|
}
|
|
|
|
/**
|
|
* deletes entries older than a specified number of days
|
|
* @param days number of days
|
|
*/
|
|
public async deleteOldEntries(days: number) {
|
|
// Perform the deletion operation
|
|
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
|
|
DELETE WHERE timestamp < now() - INTERVAL ${days} DAY
|
|
`);
|
|
await this.waitForMutations();
|
|
}
|
|
|
|
public async waitForMutations() {
|
|
// Wait for the mutation to complete
|
|
let mutations;
|
|
do {
|
|
mutations = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
SELECT count() AS mutations_count FROM system.mutations
|
|
WHERE is_done = 0 AND table = '${this.options.tableName}'
|
|
`);
|
|
|
|
if (mutations[0] && mutations[0].mutations_count > 0) {
|
|
console.log('Waiting for mutations to complete...');
|
|
await new Promise((resolve) => setTimeout(resolve, 1000));
|
|
}
|
|
} while (mutations[0] && mutations[0].mutations_count > 0);
|
|
}
|
|
|
|
public async getLastEntries(count: number) {
|
|
const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
|
|
ORDER BY timestamp DESC
|
|
LIMIT ${count} FORMAT JSONEachRow
|
|
`);
|
|
return result;
|
|
}
|
|
|
|
public async getEntriesNewerThan(unixTimestamp: number) {
|
|
const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
|
|
WHERE timestamp > toDateTime(${unixTimestamp / 1000}) FORMAT JSONEachRow
|
|
`);
|
|
return result;
|
|
}
|
|
|
|
public async getEntriesBetween(unixTimestampStart: number, unixTimestampEnd: number) {
|
|
const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
|
|
WHERE timestamp > toDateTime(${unixTimestampStart / 1000})
|
|
AND timestamp < toDateTime(${unixTimestampEnd / 1000}) FORMAT JSONEachRow
|
|
`);
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* streams all new entries using an observable
|
|
*/
|
|
public watchNewEntries(): plugins.smartrx.rxjs.Observable<any> {
|
|
return new plugins.smartrx.rxjs.Observable((observer) => {
|
|
const pollInterval = 1000; // Poll every 1 second
|
|
let lastTimestamp: number;
|
|
|
|
const fetchLastEntryTimestamp = async () => {
|
|
const lastEntry = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
|
|
SELECT max(timestamp) as lastTimestamp FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} FORMAT JSONEachRow
|
|
`);
|
|
lastTimestamp = lastEntry.length
|
|
? new Date(lastEntry[0].lastTimestamp).getTime()
|
|
: Date.now();
|
|
};
|
|
|
|
const fetchNewEntries = async () => {
|
|
const newEntries = await this.getEntriesNewerThan(lastTimestamp);
|
|
if (newEntries.length > 0) {
|
|
for (const entry of newEntries) {
|
|
observer.next(entry);
|
|
}
|
|
lastTimestamp = new Date(newEntries[newEntries.length - 1].timestamp).getTime();
|
|
}
|
|
};
|
|
|
|
const startPolling = async () => {
|
|
await fetchLastEntryTimestamp();
|
|
const intervalId = setInterval(fetchNewEntries, pollInterval);
|
|
|
|
// Cleanup on unsubscribe
|
|
return () => clearInterval(intervalId);
|
|
};
|
|
|
|
startPolling().catch((err) => observer.error(err));
|
|
});
|
|
}
|
|
}
|