import * as plugins from './smartclickhouse.plugins.js'; import type { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js'; import { ClickhouseQueryBuilder } from './smartclickhouse.classes.querybuilder.js'; import type { IClickhouseTableOptions, IColumnInfo, TClickhouseColumnType, } from './smartclickhouse.types.js'; import { detectClickhouseType, escapeClickhouseValue } from './smartclickhouse.types.js'; export class ClickhouseTable> { // ---- STATIC FACTORY ---- public static async create>( db: SmartClickHouseDb, options: IClickhouseTableOptions, ): Promise> { const table = new ClickhouseTable(db, options); await table.setup(); return table; } // ---- INSTANCE ---- public db: SmartClickHouseDb; public options: IClickhouseTableOptions; public columns: IColumnInfo[] = []; private healingDeferred: plugins.smartpromise.Deferred | null = null; constructor(db: SmartClickHouseDb, options: IClickhouseTableOptions) { this.db = db; this.options = { autoSchemaEvolution: true, ...options, database: options.database || db.options.database, engine: options.engine || { engine: 'MergeTree' }, }; } // ---- SCHEMA MANAGEMENT ---- /** * Creates the table if it doesn't exist and refreshes column metadata */ public async setup(): Promise { const { database, tableName, engine, orderBy, partitionBy, primaryKey, ttl, retainDataForDays, columns } = this.options; // Build column definitions let columnDefs: string; if (columns && columns.length > 0) { columnDefs = columns.map((col) => { let def = `${col.name} ${col.type}`; if (col.defaultExpression) def += ` DEFAULT ${col.defaultExpression}`; if (col.codec) def += ` CODEC(${col.codec})`; return def; }).join(',\n '); } else { // Default minimal schema — downstream code can add columns via auto-schema evolution columnDefs = `timestamp DateTime64(3, 'Europe/Berlin'),\n message String`; } // Build engine clause let engineClause: string = engine.engine; if (engine.engine === 'ReplacingMergeTree' && engine.versionColumn) { engineClause = `ReplacingMergeTree(${engine.versionColumn})`; } else if (engine.engine === 'CollapsingMergeTree' && engine.signColumn) { engineClause = `CollapsingMergeTree(${engine.signColumn})`; } else if (engine.engine === 'VersionedCollapsingMergeTree' && engine.signColumn && engine.versionColumn) { engineClause = `VersionedCollapsingMergeTree(${engine.signColumn}, ${engine.versionColumn})`; } else { engineClause = `${engine.engine}()`; } // Build ORDER BY const orderByStr = Array.isArray(orderBy) ? orderBy.join(', ') : orderBy; let createSQL = ` CREATE TABLE IF NOT EXISTS ${database}.${tableName} ( ${columnDefs} ) ENGINE = ${engineClause}`; if (partitionBy) { createSQL += `\n PARTITION BY ${partitionBy}`; } createSQL += `\n ORDER BY (${orderByStr})`; if (primaryKey) { const primaryKeyStr = Array.isArray(primaryKey) ? primaryKey.join(', ') : primaryKey; createSQL += `\n PRIMARY KEY (${primaryKeyStr})`; } await this.db.clickhouseHttpClient.queryPromise(createSQL); // Apply TTL if configured if (ttl) { await this.db.clickhouseHttpClient.queryPromise(` ALTER TABLE ${database}.${tableName} MODIFY TTL toDateTime(${String(ttl.column)}) + INTERVAL ${ttl.interval} `); } else if (retainDataForDays && retainDataForDays > 0) { // Legacy shorthand await this.db.clickhouseHttpClient.queryPromise(` ALTER TABLE ${database}.${tableName} MODIFY TTL toDateTime(timestamp) + INTERVAL ${retainDataForDays} DAY `); } await this.updateColumns(); } /** * Refresh column metadata from system.columns */ public async updateColumns(): Promise { this.columns = await this.db.clickhouseHttpClient.queryPromise(` SELECT * FROM system.columns WHERE database = '${this.options.database}' AND table = '${this.options.tableName}' FORMAT JSONEachRow `); return this.columns; } /** * Auto-schema evolution: detect new columns from data and add them */ public async syncSchema(data: Record): Promise { const flatData = plugins.smartobject.toFlatObject(data); for (const key of Object.keys(flatData)) { if (key === 'timestamp') continue; const value = flatData[key]; const clickhouseType = detectClickhouseType(value); if (!clickhouseType) continue; await this.ensureColumn(key, clickhouseType); } } // ---- INSERT ---- /** * Insert a single row */ public async insert(data: Partial): Promise { if (this.healingDeferred) return; const storageDoc = await this.prepareDocument(data); await this.executeInsert([storageDoc]); } /** * Insert multiple rows */ public async insertMany(data: Partial[]): Promise { if (this.healingDeferred) return; if (data.length === 0) return; // Schema sync across all documents if (this.options.autoSchemaEvolution) { const allKeys = new Set(); const sampleValues: Record = {}; for (const doc of data) { const flat = plugins.smartobject.toFlatObject(doc); for (const [key, value] of Object.entries(flat)) { if (!allKeys.has(key)) { allKeys.add(key); sampleValues[key] = value; } } } await this.syncSchema(sampleValues); } const storageDocs = data.map((doc) => this.flattenDocument(doc)); await this.executeInsert(storageDocs); } /** * Insert in batches of configurable size */ public async insertBatch(data: Partial[], options?: { batchSize?: number }): Promise { const batchSize = options?.batchSize || 10000; if (this.healingDeferred) return; if (data.length === 0) return; // Schema sync across all documents first if (this.options.autoSchemaEvolution) { const sampleValues: Record = {}; for (const doc of data) { const flat = plugins.smartobject.toFlatObject(doc); for (const [key, value] of Object.entries(flat)) { if (!(key in sampleValues)) { sampleValues[key] = value; } } } await this.syncSchema(sampleValues); } const storageDocs = data.map((doc) => this.flattenDocument(doc)); await this.db.clickhouseHttpClient.insertBatch( this.options.database, this.options.tableName, storageDocs, batchSize, ); } /** * Create a push-based insert stream using ObservableIntake */ public createInsertStream(options?: { batchSize?: number; flushIntervalMs?: number }): plugins.smartrx.ObservableIntake> { const batchSize = options?.batchSize || 100; const flushIntervalMs = options?.flushIntervalMs || 1000; const intake = new plugins.smartrx.ObservableIntake>(); let buffer: Partial[] = []; let flushTimer: ReturnType | null = null; const flush = async () => { if (buffer.length === 0) return; const toInsert = buffer; buffer = []; await this.insertMany(toInsert); }; const scheduleFlush = () => { if (flushTimer) clearTimeout(flushTimer); flushTimer = setTimeout(async () => { await flush(); }, flushIntervalMs); }; intake.subscribe( async (doc) => { buffer.push(doc); if (buffer.length >= batchSize) { if (flushTimer) clearTimeout(flushTimer); await flush(); } else { scheduleFlush(); } }, undefined, async () => { if (flushTimer) clearTimeout(flushTimer); await flush(); }, ); return intake; } // ---- QUERY ---- /** * Returns a fluent query builder for this table */ public query(): ClickhouseQueryBuilder { return new ClickhouseQueryBuilder( this.options.tableName, this.options.database, this.db.clickhouseHttpClient, ); } // ---- UPDATE ---- /** * Update rows matching a where condition (ClickHouse mutation - use sparingly) */ public async update( set: Partial, whereFn: (q: ClickhouseQueryBuilder) => ClickhouseQueryBuilder, ): Promise { const qb = whereFn(new ClickhouseQueryBuilder(this.options.tableName, this.options.database, this.db.clickhouseHttpClient)); const whereClause = qb.buildWhereClause(); if (!whereClause) { throw new Error('UPDATE requires a WHERE clause. Use a where condition to target specific rows.'); } const setClauses = Object.entries(set) .map(([key, value]) => `${key} = ${escapeClickhouseValue(value)}`) .join(', '); await this.db.clickhouseHttpClient.mutatePromise( `ALTER TABLE ${this.options.database}.${this.options.tableName} UPDATE ${setClauses} WHERE ${whereClause}` ); await this.waitForMutations(); } // ---- DELETE ---- /** * Delete rows matching a where condition (ClickHouse mutation) */ public async deleteWhere( whereFn: (q: ClickhouseQueryBuilder) => ClickhouseQueryBuilder, ): Promise { const qb = whereFn(new ClickhouseQueryBuilder(this.options.tableName, this.options.database, this.db.clickhouseHttpClient)); const whereClause = qb.buildWhereClause(); if (!whereClause) { throw new Error('DELETE requires a WHERE clause.'); } await this.db.clickhouseHttpClient.mutatePromise( `ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${whereClause}` ); await this.waitForMutations(); } /** * Delete entries older than a given interval on a column */ public async deleteOlderThan(column: keyof T & string, interval: string): Promise { await this.db.clickhouseHttpClient.mutatePromise( `ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${String(column)} < now() - INTERVAL ${interval}` ); await this.waitForMutations(); } /** * Drop the entire table */ public async drop(): Promise { await this.db.clickhouseHttpClient.queryPromise( `DROP TABLE IF EXISTS ${this.options.database}.${this.options.tableName}` ); this.columns = []; } // ---- UTILITIES ---- /** * Wait for all pending mutations on this table to complete */ public async waitForMutations(): Promise { let pending = true; while (pending) { const mutations = await this.db.clickhouseHttpClient.queryPromise(` SELECT count() AS cnt FROM system.mutations WHERE is_done = 0 AND database = '${this.options.database}' AND table = '${this.options.tableName}' FORMAT JSONEachRow `); const count = mutations[0] ? parseInt(mutations[0].cnt, 10) : 0; if (count === 0) { pending = false; } else { await plugins.smartdelay.delayFor(1000); } } } /** * Get the total row count */ public async getRowCount(): Promise { const result = await this.db.clickhouseHttpClient.queryPromise(` SELECT count() AS cnt FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow `); return result[0] ? parseInt(result[0].cnt, 10) : 0; } /** * Optimize table (useful for ReplacingMergeTree deduplication) */ public async optimize(final: boolean = false): Promise { const finalClause = final ? ' FINAL' : ''; await this.db.clickhouseHttpClient.queryPromise( `OPTIMIZE TABLE ${this.options.database}.${this.options.tableName}${finalClause}` ); } // ---- OBSERVATION ---- /** * Watch for new entries via polling. Returns an RxJS Observable. */ public watch(options?: { pollInterval?: number }): plugins.smartrx.rxjs.Observable { const pollInterval = options?.pollInterval || 1000; return new plugins.smartrx.rxjs.Observable((observer) => { let lastTimestamp: number; let intervalId: ReturnType; let stopped = false; const fetchInitialTimestamp = async () => { const result = await this.db.clickhouseHttpClient.queryPromise(` SELECT max(timestamp) as lastTimestamp FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow `); lastTimestamp = result.length && result[0].lastTimestamp ? new Date(result[0].lastTimestamp).getTime() : Date.now(); }; const fetchNewEntries = async () => { if (stopped) return; try { const entries = await this.db.clickhouseHttpClient.queryPromise(` SELECT * FROM ${this.options.database}.${this.options.tableName} WHERE timestamp > toDateTime(${lastTimestamp / 1000}) ORDER BY timestamp ASC FORMAT JSONEachRow `); for (const entry of entries) { observer.next(entry); } if (entries.length > 0) { lastTimestamp = new Date(entries[entries.length - 1].timestamp).getTime(); } } catch (err) { observer.error(err); } }; const start = async () => { await fetchInitialTimestamp(); intervalId = setInterval(fetchNewEntries, pollInterval); }; start().catch((err) => observer.error(err)); return () => { stopped = true; clearInterval(intervalId); }; }); } // ---- PRIVATE HELPERS ---- private async ensureColumn(name: string, type: TClickhouseColumnType): Promise { // Check cached columns first const exists = this.columns.some((col) => col.name === name); if (exists) return; // Refresh and check again await this.updateColumns(); if (this.columns.some((col) => col.name === name)) return; // Add column try { await this.db.clickhouseHttpClient.queryPromise( `ALTER TABLE ${this.options.database}.${this.options.tableName} ADD COLUMN \`${name}\` ${type}` ); } catch (err) { // Column might have been added concurrently — ignore "already exists" errors if (!String(err).includes('already exists')) { throw err; } } await this.updateColumns(); } private flattenDocument(data: Partial): Record { const flat = plugins.smartobject.toFlatObject(data); const storageDoc: Record = {}; for (const [key, value] of Object.entries(flat)) { const type = detectClickhouseType(value); if (type || key === 'timestamp') { storageDoc[key] = value; } } return storageDoc; } private async prepareDocument(data: Partial): Promise> { if (this.options.autoSchemaEvolution) { await this.syncSchema(data as Record); } return this.flattenDocument(data); } private async executeInsert(docs: Record[]): Promise { try { await this.db.clickhouseHttpClient.insertPromise( this.options.database, this.options.tableName, docs, ); } catch (err) { await this.handleInsertError(); } } private async handleInsertError(): Promise { if (this.healingDeferred) return; this.healingDeferred = plugins.smartpromise.defer(); console.log('ClickhouseTable: Insert error. Attempting self-healing...'); try { await this.db.pingDatabaseUntilAvailable(); await this.db.createDatabase(); await this.setup(); } finally { this.healingDeferred.resolve(); this.healingDeferred = null; } } }