fix(core): update

This commit is contained in:
2024-06-14 16:33:00 +02:00
parent 07895c2767
commit 1aed44f035
12 changed files with 6433 additions and 4208 deletions

View File

@@ -60,14 +60,14 @@ export class TimeDataTable {
public async setup() {
// create table in clickhouse
await this.smartClickHouseDbRef.clickhouseClient.queryPromise(`
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
CREATE TABLE IF NOT EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} (
timestamp DateTime64(3, 'Europe/Berlin'),
message String
) ENGINE=MergeTree() ORDER BY timestamp`);
// lets adjust the TTL
await this.smartClickHouseDbRef.clickhouseClient.queryPromise(`
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} MODIFY TTL toDateTime(timestamp) + INTERVAL ${this.options.retainDataForDays} DAY
`);
@@ -88,7 +88,7 @@ export class TimeDataTable {
* updates the columns
*/
public async updateColumns() {
this.columns = await this.smartClickHouseDbRef.clickhouseClient.queryPromise(`
this.columns = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
SELECT * FROM system.columns
WHERE database LIKE '${this.smartClickHouseDbRef.options.database}'
AND table LIKE '${this.options.tableName}' FORMAT JSONEachRow
@@ -146,7 +146,7 @@ export class TimeDataTable {
}
const alterString = `ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} ADD COLUMN ${pathArg} ${typeArg} FIRST`;
try {
await this.smartClickHouseDbRef.clickhouseClient.queryPromise(`
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
${alterString}
`);
} catch (err) {
@@ -178,7 +178,7 @@ export class TimeDataTable {
storageJson[key] = value;
}
const result = await this.smartClickHouseDbRef.clickhouseClient
const result = await this.smartClickHouseDbRef.clickhouseHttpClient
.insertPromise(this.smartClickHouseDbRef.options.database, this.options.tableName, [
storageJson,
])
@@ -197,4 +197,90 @@ export class TimeDataTable {
});
return result;
}
/**
* deletes the entire table
*/
public async delete() {
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
DROP TABLE IF EXISTS ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
`);
this.columns = [];
}
/**
* deletes entries older than a specified number of days
* @param days number of days
*/
public async deleteOldEntries(days: number) {
await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
ALTER TABLE ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
DELETE WHERE timestamp < now() - INTERVAL ${days} DAY
`);
}
public async getLastEntries(count: number) {
const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
ORDER BY timestamp DESC
LIMIT ${count} FORMAT JSONEachRow
`);
return result;
}
public async getEntriesNewerThan(unixTimestamp: number) {
const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
WHERE timestamp > toDateTime(${unixTimestamp / 1000}) FORMAT JSONEachRow
`);
return result;
}
public async getEntriesBetween(unixTimestampStart: number, unixTimestampEnd: number) {
const result = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
SELECT * FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName}
WHERE timestamp > toDateTime(${unixTimestampStart / 1000})
AND timestamp < toDateTime(${unixTimestampEnd / 1000}) FORMAT JSONEachRow
`);
return result;
}
/**
* streams all new entries using an observable
*/
public streamNewEntries(): plugins.smartrx.rxjs.Observable<any> {
return new plugins.smartrx.rxjs.Observable((observer) => {
const pollInterval = 1000; // Poll every 1 second
let lastTimestamp: number;
const fetchLastEntryTimestamp = async () => {
const lastEntry = await this.smartClickHouseDbRef.clickhouseHttpClient.queryPromise(`
SELECT max(timestamp) as lastTimestamp FROM ${this.smartClickHouseDbRef.options.database}.${this.options.tableName} FORMAT JSONEachRow
`);
lastTimestamp = lastEntry.length
? new Date(lastEntry[0].lastTimestamp).getTime()
: Date.now();
};
const fetchNewEntries = async () => {
const newEntries = await this.getEntriesNewerThan(lastTimestamp);
if (newEntries.length > 0) {
for (const entry of newEntries) {
observer.next(entry);
}
lastTimestamp = new Date(newEntries[newEntries.length - 1].timestamp).getTime();
}
};
const startPolling = async () => {
await fetchLastEntryTimestamp();
const intervalId = setInterval(fetchNewEntries, pollInterval);
// Cleanup on unsubscribe
return () => clearInterval(intervalId);
};
startPolling().catch((err) => observer.error(err));
});
}
}