499 lines
16 KiB
TypeScript
499 lines
16 KiB
TypeScript
import * as plugins from './smartclickhouse.plugins.js';
|
|
import type { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js';
|
|
import { ClickhouseQueryBuilder } from './smartclickhouse.classes.querybuilder.js';
|
|
import type {
|
|
IClickhouseTableOptions,
|
|
IColumnInfo,
|
|
TClickhouseColumnType,
|
|
} from './smartclickhouse.types.js';
|
|
import { detectClickhouseType, escapeClickhouseValue } from './smartclickhouse.types.js';
|
|
|
|
export class ClickhouseTable<T extends Record<string, any>> {
|
|
// ---- STATIC FACTORY ----
|
|
|
|
public static async create<T extends Record<string, any>>(
|
|
db: SmartClickHouseDb,
|
|
options: IClickhouseTableOptions<T>,
|
|
): Promise<ClickhouseTable<T>> {
|
|
const table = new ClickhouseTable<T>(db, options);
|
|
await table.setup();
|
|
return table;
|
|
}
|
|
|
|
// ---- INSTANCE ----
|
|
|
|
public db: SmartClickHouseDb;
|
|
public options: IClickhouseTableOptions<T>;
|
|
public columns: IColumnInfo[] = [];
|
|
private healingDeferred: plugins.smartpromise.Deferred<any> | null = null;
|
|
|
|
constructor(db: SmartClickHouseDb, options: IClickhouseTableOptions<T>) {
|
|
this.db = db;
|
|
this.options = {
|
|
autoSchemaEvolution: true,
|
|
...options,
|
|
database: options.database || db.options.database,
|
|
engine: options.engine || { engine: 'MergeTree' },
|
|
};
|
|
}
|
|
|
|
// ---- SCHEMA MANAGEMENT ----
|
|
|
|
/**
|
|
* Creates the table if it doesn't exist and refreshes column metadata
|
|
*/
|
|
public async setup(): Promise<void> {
|
|
const { database, tableName, engine, orderBy, partitionBy, primaryKey, ttl, retainDataForDays, columns } = this.options;
|
|
|
|
// Build column definitions
|
|
let columnDefs: string;
|
|
if (columns && columns.length > 0) {
|
|
columnDefs = columns.map((col) => {
|
|
let def = `${col.name} ${col.type}`;
|
|
if (col.defaultExpression) def += ` DEFAULT ${col.defaultExpression}`;
|
|
if (col.codec) def += ` CODEC(${col.codec})`;
|
|
return def;
|
|
}).join(',\n ');
|
|
} else {
|
|
// Default minimal schema — downstream code can add columns via auto-schema evolution
|
|
columnDefs = `timestamp DateTime64(3, 'Europe/Berlin'),\n message String`;
|
|
}
|
|
|
|
// Build engine clause
|
|
let engineClause: string = engine.engine;
|
|
if (engine.engine === 'ReplacingMergeTree' && engine.versionColumn) {
|
|
engineClause = `ReplacingMergeTree(${engine.versionColumn})`;
|
|
} else if (engine.engine === 'CollapsingMergeTree' && engine.signColumn) {
|
|
engineClause = `CollapsingMergeTree(${engine.signColumn})`;
|
|
} else if (engine.engine === 'VersionedCollapsingMergeTree' && engine.signColumn && engine.versionColumn) {
|
|
engineClause = `VersionedCollapsingMergeTree(${engine.signColumn}, ${engine.versionColumn})`;
|
|
} else {
|
|
engineClause = `${engine.engine}()`;
|
|
}
|
|
|
|
// Build ORDER BY
|
|
const orderByStr = Array.isArray(orderBy) ? orderBy.join(', ') : orderBy;
|
|
|
|
let createSQL = `
|
|
CREATE TABLE IF NOT EXISTS ${database}.${tableName} (
|
|
${columnDefs}
|
|
) ENGINE = ${engineClause}`;
|
|
|
|
if (partitionBy) {
|
|
createSQL += `\n PARTITION BY ${partitionBy}`;
|
|
}
|
|
|
|
createSQL += `\n ORDER BY (${orderByStr})`;
|
|
|
|
if (primaryKey) {
|
|
const primaryKeyStr = Array.isArray(primaryKey) ? primaryKey.join(', ') : primaryKey;
|
|
createSQL += `\n PRIMARY KEY (${primaryKeyStr})`;
|
|
}
|
|
|
|
await this.db.clickhouseHttpClient.queryPromise(createSQL);
|
|
|
|
// Apply TTL if configured
|
|
if (ttl) {
|
|
await this.db.clickhouseHttpClient.queryPromise(`
|
|
ALTER TABLE ${database}.${tableName}
|
|
MODIFY TTL toDateTime(${String(ttl.column)}) + INTERVAL ${ttl.interval}
|
|
`);
|
|
} else if (retainDataForDays && retainDataForDays > 0) {
|
|
// Legacy shorthand
|
|
await this.db.clickhouseHttpClient.queryPromise(`
|
|
ALTER TABLE ${database}.${tableName}
|
|
MODIFY TTL toDateTime(timestamp) + INTERVAL ${retainDataForDays} DAY
|
|
`);
|
|
}
|
|
|
|
await this.updateColumns();
|
|
}
|
|
|
|
/**
|
|
* Refresh column metadata from system.columns
|
|
*/
|
|
public async updateColumns(): Promise<IColumnInfo[]> {
|
|
this.columns = await this.db.clickhouseHttpClient.queryPromise(`
|
|
SELECT * FROM system.columns
|
|
WHERE database = '${this.options.database}'
|
|
AND table = '${this.options.tableName}' FORMAT JSONEachRow
|
|
`);
|
|
return this.columns;
|
|
}
|
|
|
|
/**
|
|
* Auto-schema evolution: detect new columns from data and add them
|
|
*/
|
|
public async syncSchema(data: Record<string, any>): Promise<void> {
|
|
const flatData = plugins.smartobject.toFlatObject(data);
|
|
for (const key of Object.keys(flatData)) {
|
|
if (key === 'timestamp') continue;
|
|
const value = flatData[key];
|
|
const clickhouseType = detectClickhouseType(value);
|
|
if (!clickhouseType) continue;
|
|
await this.ensureColumn(key, clickhouseType);
|
|
}
|
|
}
|
|
|
|
// ---- INSERT ----
|
|
|
|
/**
|
|
* Insert a single row
|
|
*/
|
|
public async insert(data: Partial<T>): Promise<void> {
|
|
if (this.healingDeferred) return;
|
|
|
|
const storageDoc = await this.prepareDocument(data);
|
|
await this.executeInsert([storageDoc]);
|
|
}
|
|
|
|
/**
|
|
* Insert multiple rows
|
|
*/
|
|
public async insertMany(data: Partial<T>[]): Promise<void> {
|
|
if (this.healingDeferred) return;
|
|
if (data.length === 0) return;
|
|
|
|
// Schema sync across all documents
|
|
if (this.options.autoSchemaEvolution) {
|
|
const allKeys = new Set<string>();
|
|
const sampleValues: Record<string, any> = {};
|
|
for (const doc of data) {
|
|
const flat = plugins.smartobject.toFlatObject(doc);
|
|
for (const [key, value] of Object.entries(flat)) {
|
|
if (!allKeys.has(key)) {
|
|
allKeys.add(key);
|
|
sampleValues[key] = value;
|
|
}
|
|
}
|
|
}
|
|
await this.syncSchema(sampleValues);
|
|
}
|
|
|
|
const storageDocs = data.map((doc) => this.flattenDocument(doc));
|
|
await this.executeInsert(storageDocs);
|
|
}
|
|
|
|
/**
|
|
* Insert in batches of configurable size
|
|
*/
|
|
public async insertBatch(data: Partial<T>[], options?: { batchSize?: number }): Promise<void> {
|
|
const batchSize = options?.batchSize || 10000;
|
|
if (this.healingDeferred) return;
|
|
if (data.length === 0) return;
|
|
|
|
// Schema sync across all documents first
|
|
if (this.options.autoSchemaEvolution) {
|
|
const sampleValues: Record<string, any> = {};
|
|
for (const doc of data) {
|
|
const flat = plugins.smartobject.toFlatObject(doc);
|
|
for (const [key, value] of Object.entries(flat)) {
|
|
if (!(key in sampleValues)) {
|
|
sampleValues[key] = value;
|
|
}
|
|
}
|
|
}
|
|
await this.syncSchema(sampleValues);
|
|
}
|
|
|
|
const storageDocs = data.map((doc) => this.flattenDocument(doc));
|
|
await this.db.clickhouseHttpClient.insertBatch(
|
|
this.options.database,
|
|
this.options.tableName,
|
|
storageDocs,
|
|
batchSize,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Create a push-based insert stream using ObservableIntake
|
|
*/
|
|
public createInsertStream(options?: { batchSize?: number; flushIntervalMs?: number }): plugins.smartrx.ObservableIntake<Partial<T>> {
|
|
const batchSize = options?.batchSize || 100;
|
|
const flushIntervalMs = options?.flushIntervalMs || 1000;
|
|
const intake = new plugins.smartrx.ObservableIntake<Partial<T>>();
|
|
let buffer: Partial<T>[] = [];
|
|
let flushTimer: ReturnType<typeof setTimeout> | null = null;
|
|
|
|
const flush = async () => {
|
|
if (buffer.length === 0) return;
|
|
const toInsert = buffer;
|
|
buffer = [];
|
|
await this.insertMany(toInsert);
|
|
};
|
|
|
|
const scheduleFlush = () => {
|
|
if (flushTimer) clearTimeout(flushTimer);
|
|
flushTimer = setTimeout(async () => {
|
|
await flush();
|
|
}, flushIntervalMs);
|
|
};
|
|
|
|
intake.subscribe(
|
|
async (doc) => {
|
|
buffer.push(doc);
|
|
if (buffer.length >= batchSize) {
|
|
if (flushTimer) clearTimeout(flushTimer);
|
|
await flush();
|
|
} else {
|
|
scheduleFlush();
|
|
}
|
|
},
|
|
undefined,
|
|
async () => {
|
|
if (flushTimer) clearTimeout(flushTimer);
|
|
await flush();
|
|
},
|
|
);
|
|
|
|
return intake;
|
|
}
|
|
|
|
// ---- QUERY ----
|
|
|
|
/**
|
|
* Returns a fluent query builder for this table
|
|
*/
|
|
public query(): ClickhouseQueryBuilder<T> {
|
|
return new ClickhouseQueryBuilder<T>(
|
|
this.options.tableName,
|
|
this.options.database,
|
|
this.db.clickhouseHttpClient,
|
|
);
|
|
}
|
|
|
|
// ---- UPDATE ----
|
|
|
|
/**
|
|
* Update rows matching a where condition (ClickHouse mutation - use sparingly)
|
|
*/
|
|
public async update(
|
|
set: Partial<T>,
|
|
whereFn: (q: ClickhouseQueryBuilder<T>) => ClickhouseQueryBuilder<T>,
|
|
): Promise<void> {
|
|
const qb = whereFn(new ClickhouseQueryBuilder<T>(this.options.tableName, this.options.database, this.db.clickhouseHttpClient));
|
|
const whereClause = qb.buildWhereClause();
|
|
if (!whereClause) {
|
|
throw new Error('UPDATE requires a WHERE clause. Use a where condition to target specific rows.');
|
|
}
|
|
|
|
const setClauses = Object.entries(set)
|
|
.map(([key, value]) => `${key} = ${escapeClickhouseValue(value)}`)
|
|
.join(', ');
|
|
|
|
await this.db.clickhouseHttpClient.mutatePromise(
|
|
`ALTER TABLE ${this.options.database}.${this.options.tableName} UPDATE ${setClauses} WHERE ${whereClause}`
|
|
);
|
|
await this.waitForMutations();
|
|
}
|
|
|
|
// ---- DELETE ----
|
|
|
|
/**
|
|
* Delete rows matching a where condition (ClickHouse mutation)
|
|
*/
|
|
public async deleteWhere(
|
|
whereFn: (q: ClickhouseQueryBuilder<T>) => ClickhouseQueryBuilder<T>,
|
|
): Promise<void> {
|
|
const qb = whereFn(new ClickhouseQueryBuilder<T>(this.options.tableName, this.options.database, this.db.clickhouseHttpClient));
|
|
const whereClause = qb.buildWhereClause();
|
|
if (!whereClause) {
|
|
throw new Error('DELETE requires a WHERE clause.');
|
|
}
|
|
|
|
await this.db.clickhouseHttpClient.mutatePromise(
|
|
`ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${whereClause}`
|
|
);
|
|
await this.waitForMutations();
|
|
}
|
|
|
|
/**
|
|
* Delete entries older than a given interval on a column
|
|
*/
|
|
public async deleteOlderThan(column: keyof T & string, interval: string): Promise<void> {
|
|
await this.db.clickhouseHttpClient.mutatePromise(
|
|
`ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${String(column)} < now() - INTERVAL ${interval}`
|
|
);
|
|
await this.waitForMutations();
|
|
}
|
|
|
|
/**
|
|
* Drop the entire table
|
|
*/
|
|
public async drop(): Promise<void> {
|
|
await this.db.clickhouseHttpClient.queryPromise(
|
|
`DROP TABLE IF EXISTS ${this.options.database}.${this.options.tableName}`
|
|
);
|
|
this.columns = [];
|
|
}
|
|
|
|
// ---- UTILITIES ----
|
|
|
|
/**
|
|
* Wait for all pending mutations on this table to complete
|
|
*/
|
|
public async waitForMutations(): Promise<void> {
|
|
let pending = true;
|
|
while (pending) {
|
|
const mutations = await this.db.clickhouseHttpClient.queryPromise(`
|
|
SELECT count() AS cnt FROM system.mutations
|
|
WHERE is_done = 0 AND database = '${this.options.database}' AND table = '${this.options.tableName}' FORMAT JSONEachRow
|
|
`);
|
|
const count = mutations[0] ? parseInt(mutations[0].cnt, 10) : 0;
|
|
if (count === 0) {
|
|
pending = false;
|
|
} else {
|
|
await plugins.smartdelay.delayFor(1000);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the total row count
|
|
*/
|
|
public async getRowCount(): Promise<number> {
|
|
const result = await this.db.clickhouseHttpClient.queryPromise(`
|
|
SELECT count() AS cnt FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow
|
|
`);
|
|
return result[0] ? parseInt(result[0].cnt, 10) : 0;
|
|
}
|
|
|
|
/**
|
|
* Optimize table (useful for ReplacingMergeTree deduplication)
|
|
*/
|
|
public async optimize(final: boolean = false): Promise<void> {
|
|
const finalClause = final ? ' FINAL' : '';
|
|
await this.db.clickhouseHttpClient.queryPromise(
|
|
`OPTIMIZE TABLE ${this.options.database}.${this.options.tableName}${finalClause}`
|
|
);
|
|
}
|
|
|
|
// ---- OBSERVATION ----
|
|
|
|
/**
|
|
* Watch for new entries via polling. Returns an RxJS Observable.
|
|
*/
|
|
public watch(options?: { pollInterval?: number }): plugins.smartrx.rxjs.Observable<T> {
|
|
const pollInterval = options?.pollInterval || 1000;
|
|
|
|
return new plugins.smartrx.rxjs.Observable<T>((observer) => {
|
|
let lastTimestamp: number;
|
|
let intervalId: ReturnType<typeof setInterval>;
|
|
let stopped = false;
|
|
|
|
const fetchInitialTimestamp = async () => {
|
|
const result = await this.db.clickhouseHttpClient.queryPromise(`
|
|
SELECT max(timestamp) as lastTimestamp
|
|
FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow
|
|
`);
|
|
lastTimestamp = result.length && result[0].lastTimestamp
|
|
? new Date(result[0].lastTimestamp).getTime()
|
|
: Date.now();
|
|
};
|
|
|
|
const fetchNewEntries = async () => {
|
|
if (stopped) return;
|
|
try {
|
|
const entries = await this.db.clickhouseHttpClient.queryPromise(`
|
|
SELECT * FROM ${this.options.database}.${this.options.tableName}
|
|
WHERE timestamp > toDateTime(${lastTimestamp / 1000})
|
|
ORDER BY timestamp ASC FORMAT JSONEachRow
|
|
`);
|
|
for (const entry of entries) {
|
|
observer.next(entry);
|
|
}
|
|
if (entries.length > 0) {
|
|
lastTimestamp = new Date(entries[entries.length - 1].timestamp).getTime();
|
|
}
|
|
} catch (err) {
|
|
observer.error(err);
|
|
}
|
|
};
|
|
|
|
const start = async () => {
|
|
await fetchInitialTimestamp();
|
|
intervalId = setInterval(fetchNewEntries, pollInterval);
|
|
};
|
|
|
|
start().catch((err) => observer.error(err));
|
|
|
|
return () => {
|
|
stopped = true;
|
|
clearInterval(intervalId);
|
|
};
|
|
});
|
|
}
|
|
|
|
// ---- PRIVATE HELPERS ----
|
|
|
|
private async ensureColumn(name: string, type: TClickhouseColumnType): Promise<void> {
|
|
// Check cached columns first
|
|
const exists = this.columns.some((col) => col.name === name);
|
|
if (exists) return;
|
|
|
|
// Refresh and check again
|
|
await this.updateColumns();
|
|
if (this.columns.some((col) => col.name === name)) return;
|
|
|
|
// Add column
|
|
try {
|
|
await this.db.clickhouseHttpClient.queryPromise(
|
|
`ALTER TABLE ${this.options.database}.${this.options.tableName} ADD COLUMN \`${name}\` ${type}`
|
|
);
|
|
} catch (err) {
|
|
// Column might have been added concurrently — ignore "already exists" errors
|
|
if (!String(err).includes('already exists')) {
|
|
throw err;
|
|
}
|
|
}
|
|
await this.updateColumns();
|
|
}
|
|
|
|
private flattenDocument(data: Partial<T>): Record<string, any> {
|
|
const flat = plugins.smartobject.toFlatObject(data);
|
|
const storageDoc: Record<string, any> = {};
|
|
for (const [key, value] of Object.entries(flat)) {
|
|
const type = detectClickhouseType(value);
|
|
if (type || key === 'timestamp') {
|
|
storageDoc[key] = value;
|
|
}
|
|
}
|
|
return storageDoc;
|
|
}
|
|
|
|
private async prepareDocument(data: Partial<T>): Promise<Record<string, any>> {
|
|
if (this.options.autoSchemaEvolution) {
|
|
await this.syncSchema(data as Record<string, any>);
|
|
}
|
|
return this.flattenDocument(data);
|
|
}
|
|
|
|
private async executeInsert(docs: Record<string, any>[]): Promise<void> {
|
|
try {
|
|
await this.db.clickhouseHttpClient.insertPromise(
|
|
this.options.database,
|
|
this.options.tableName,
|
|
docs,
|
|
);
|
|
} catch (err) {
|
|
await this.handleInsertError();
|
|
}
|
|
}
|
|
|
|
private async handleInsertError(): Promise<void> {
|
|
if (this.healingDeferred) return;
|
|
|
|
this.healingDeferred = plugins.smartpromise.defer();
|
|
console.log('ClickhouseTable: Insert error. Attempting self-healing...');
|
|
|
|
try {
|
|
await this.db.pingDatabaseUntilAvailable();
|
|
await this.db.createDatabase();
|
|
await this.setup();
|
|
} finally {
|
|
this.healingDeferred.resolve();
|
|
this.healingDeferred = null;
|
|
}
|
|
}
|
|
}
|