Files
smartclickhouse/ts/smartclickhouse.classes.clickhousetable.ts

499 lines
16 KiB
TypeScript

import * as plugins from './smartclickhouse.plugins.js';
import type { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js';
import { ClickhouseQueryBuilder } from './smartclickhouse.classes.querybuilder.js';
import type {
IClickhouseTableOptions,
IColumnInfo,
TClickhouseColumnType,
} from './smartclickhouse.types.js';
import { detectClickhouseType, escapeClickhouseValue } from './smartclickhouse.types.js';
export class ClickhouseTable<T extends Record<string, any>> {
// ---- STATIC FACTORY ----
public static async create<T extends Record<string, any>>(
db: SmartClickHouseDb,
options: IClickhouseTableOptions<T>,
): Promise<ClickhouseTable<T>> {
const table = new ClickhouseTable<T>(db, options);
await table.setup();
return table;
}
// ---- INSTANCE ----
public db: SmartClickHouseDb;
public options: IClickhouseTableOptions<T>;
public columns: IColumnInfo[] = [];
private healingDeferred: plugins.smartpromise.Deferred<any> | null = null;
constructor(db: SmartClickHouseDb, options: IClickhouseTableOptions<T>) {
this.db = db;
this.options = {
autoSchemaEvolution: true,
...options,
database: options.database || db.options.database,
engine: options.engine || { engine: 'MergeTree' },
};
}
// ---- SCHEMA MANAGEMENT ----
/**
* Creates the table if it doesn't exist and refreshes column metadata
*/
public async setup(): Promise<void> {
const { database, tableName, engine, orderBy, partitionBy, primaryKey, ttl, retainDataForDays, columns } = this.options;
// Build column definitions
let columnDefs: string;
if (columns && columns.length > 0) {
columnDefs = columns.map((col) => {
let def = `${col.name} ${col.type}`;
if (col.defaultExpression) def += ` DEFAULT ${col.defaultExpression}`;
if (col.codec) def += ` CODEC(${col.codec})`;
return def;
}).join(',\n ');
} else {
// Default minimal schema — downstream code can add columns via auto-schema evolution
columnDefs = `timestamp DateTime64(3, 'Europe/Berlin'),\n message String`;
}
// Build engine clause
let engineClause: string = engine.engine;
if (engine.engine === 'ReplacingMergeTree' && engine.versionColumn) {
engineClause = `ReplacingMergeTree(${engine.versionColumn})`;
} else if (engine.engine === 'CollapsingMergeTree' && engine.signColumn) {
engineClause = `CollapsingMergeTree(${engine.signColumn})`;
} else if (engine.engine === 'VersionedCollapsingMergeTree' && engine.signColumn && engine.versionColumn) {
engineClause = `VersionedCollapsingMergeTree(${engine.signColumn}, ${engine.versionColumn})`;
} else {
engineClause = `${engine.engine}()`;
}
// Build ORDER BY
const orderByStr = Array.isArray(orderBy) ? orderBy.join(', ') : orderBy;
let createSQL = `
CREATE TABLE IF NOT EXISTS ${database}.${tableName} (
${columnDefs}
) ENGINE = ${engineClause}`;
if (partitionBy) {
createSQL += `\n PARTITION BY ${partitionBy}`;
}
createSQL += `\n ORDER BY (${orderByStr})`;
if (primaryKey) {
const primaryKeyStr = Array.isArray(primaryKey) ? primaryKey.join(', ') : primaryKey;
createSQL += `\n PRIMARY KEY (${primaryKeyStr})`;
}
await this.db.clickhouseHttpClient.queryPromise(createSQL);
// Apply TTL if configured
if (ttl) {
await this.db.clickhouseHttpClient.queryPromise(`
ALTER TABLE ${database}.${tableName}
MODIFY TTL toDateTime(${String(ttl.column)}) + INTERVAL ${ttl.interval}
`);
} else if (retainDataForDays && retainDataForDays > 0) {
// Legacy shorthand
await this.db.clickhouseHttpClient.queryPromise(`
ALTER TABLE ${database}.${tableName}
MODIFY TTL toDateTime(timestamp) + INTERVAL ${retainDataForDays} DAY
`);
}
await this.updateColumns();
}
/**
* Refresh column metadata from system.columns
*/
public async updateColumns(): Promise<IColumnInfo[]> {
this.columns = await this.db.clickhouseHttpClient.queryPromise(`
SELECT * FROM system.columns
WHERE database = '${this.options.database}'
AND table = '${this.options.tableName}' FORMAT JSONEachRow
`);
return this.columns;
}
/**
* Auto-schema evolution: detect new columns from data and add them
*/
public async syncSchema(data: Record<string, any>): Promise<void> {
const flatData = plugins.smartobject.toFlatObject(data);
for (const key of Object.keys(flatData)) {
if (key === 'timestamp') continue;
const value = flatData[key];
const clickhouseType = detectClickhouseType(value);
if (!clickhouseType) continue;
await this.ensureColumn(key, clickhouseType);
}
}
// ---- INSERT ----
/**
* Insert a single row
*/
public async insert(data: Partial<T>): Promise<void> {
if (this.healingDeferred) return;
const storageDoc = await this.prepareDocument(data);
await this.executeInsert([storageDoc]);
}
/**
* Insert multiple rows
*/
public async insertMany(data: Partial<T>[]): Promise<void> {
if (this.healingDeferred) return;
if (data.length === 0) return;
// Schema sync across all documents
if (this.options.autoSchemaEvolution) {
const allKeys = new Set<string>();
const sampleValues: Record<string, any> = {};
for (const doc of data) {
const flat = plugins.smartobject.toFlatObject(doc);
for (const [key, value] of Object.entries(flat)) {
if (!allKeys.has(key)) {
allKeys.add(key);
sampleValues[key] = value;
}
}
}
await this.syncSchema(sampleValues);
}
const storageDocs = data.map((doc) => this.flattenDocument(doc));
await this.executeInsert(storageDocs);
}
/**
* Insert in batches of configurable size
*/
public async insertBatch(data: Partial<T>[], options?: { batchSize?: number }): Promise<void> {
const batchSize = options?.batchSize || 10000;
if (this.healingDeferred) return;
if (data.length === 0) return;
// Schema sync across all documents first
if (this.options.autoSchemaEvolution) {
const sampleValues: Record<string, any> = {};
for (const doc of data) {
const flat = plugins.smartobject.toFlatObject(doc);
for (const [key, value] of Object.entries(flat)) {
if (!(key in sampleValues)) {
sampleValues[key] = value;
}
}
}
await this.syncSchema(sampleValues);
}
const storageDocs = data.map((doc) => this.flattenDocument(doc));
await this.db.clickhouseHttpClient.insertBatch(
this.options.database,
this.options.tableName,
storageDocs,
batchSize,
);
}
/**
* Create a push-based insert stream using ObservableIntake
*/
public createInsertStream(options?: { batchSize?: number; flushIntervalMs?: number }): plugins.smartrx.ObservableIntake<Partial<T>> {
const batchSize = options?.batchSize || 100;
const flushIntervalMs = options?.flushIntervalMs || 1000;
const intake = new plugins.smartrx.ObservableIntake<Partial<T>>();
let buffer: Partial<T>[] = [];
let flushTimer: ReturnType<typeof setTimeout> | null = null;
const flush = async () => {
if (buffer.length === 0) return;
const toInsert = buffer;
buffer = [];
await this.insertMany(toInsert);
};
const scheduleFlush = () => {
if (flushTimer) clearTimeout(flushTimer);
flushTimer = setTimeout(async () => {
await flush();
}, flushIntervalMs);
};
intake.subscribe(
async (doc) => {
buffer.push(doc);
if (buffer.length >= batchSize) {
if (flushTimer) clearTimeout(flushTimer);
await flush();
} else {
scheduleFlush();
}
},
undefined,
async () => {
if (flushTimer) clearTimeout(flushTimer);
await flush();
},
);
return intake;
}
// ---- QUERY ----
/**
* Returns a fluent query builder for this table
*/
public query(): ClickhouseQueryBuilder<T> {
return new ClickhouseQueryBuilder<T>(
this.options.tableName,
this.options.database,
this.db.clickhouseHttpClient,
);
}
// ---- UPDATE ----
/**
* Update rows matching a where condition (ClickHouse mutation - use sparingly)
*/
public async update(
set: Partial<T>,
whereFn: (q: ClickhouseQueryBuilder<T>) => ClickhouseQueryBuilder<T>,
): Promise<void> {
const qb = whereFn(new ClickhouseQueryBuilder<T>(this.options.tableName, this.options.database, this.db.clickhouseHttpClient));
const whereClause = qb.buildWhereClause();
if (!whereClause) {
throw new Error('UPDATE requires a WHERE clause. Use a where condition to target specific rows.');
}
const setClauses = Object.entries(set)
.map(([key, value]) => `${key} = ${escapeClickhouseValue(value)}`)
.join(', ');
await this.db.clickhouseHttpClient.mutatePromise(
`ALTER TABLE ${this.options.database}.${this.options.tableName} UPDATE ${setClauses} WHERE ${whereClause}`
);
await this.waitForMutations();
}
// ---- DELETE ----
/**
* Delete rows matching a where condition (ClickHouse mutation)
*/
public async deleteWhere(
whereFn: (q: ClickhouseQueryBuilder<T>) => ClickhouseQueryBuilder<T>,
): Promise<void> {
const qb = whereFn(new ClickhouseQueryBuilder<T>(this.options.tableName, this.options.database, this.db.clickhouseHttpClient));
const whereClause = qb.buildWhereClause();
if (!whereClause) {
throw new Error('DELETE requires a WHERE clause.');
}
await this.db.clickhouseHttpClient.mutatePromise(
`ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${whereClause}`
);
await this.waitForMutations();
}
/**
* Delete entries older than a given interval on a column
*/
public async deleteOlderThan(column: keyof T & string, interval: string): Promise<void> {
await this.db.clickhouseHttpClient.mutatePromise(
`ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${String(column)} < now() - INTERVAL ${interval}`
);
await this.waitForMutations();
}
/**
* Drop the entire table
*/
public async drop(): Promise<void> {
await this.db.clickhouseHttpClient.queryPromise(
`DROP TABLE IF EXISTS ${this.options.database}.${this.options.tableName}`
);
this.columns = [];
}
// ---- UTILITIES ----
/**
* Wait for all pending mutations on this table to complete
*/
public async waitForMutations(): Promise<void> {
let pending = true;
while (pending) {
const mutations = await this.db.clickhouseHttpClient.queryPromise(`
SELECT count() AS cnt FROM system.mutations
WHERE is_done = 0 AND database = '${this.options.database}' AND table = '${this.options.tableName}' FORMAT JSONEachRow
`);
const count = mutations[0] ? parseInt(mutations[0].cnt, 10) : 0;
if (count === 0) {
pending = false;
} else {
await plugins.smartdelay.delayFor(1000);
}
}
}
/**
* Get the total row count
*/
public async getRowCount(): Promise<number> {
const result = await this.db.clickhouseHttpClient.queryPromise(`
SELECT count() AS cnt FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow
`);
return result[0] ? parseInt(result[0].cnt, 10) : 0;
}
/**
* Optimize table (useful for ReplacingMergeTree deduplication)
*/
public async optimize(final: boolean = false): Promise<void> {
const finalClause = final ? ' FINAL' : '';
await this.db.clickhouseHttpClient.queryPromise(
`OPTIMIZE TABLE ${this.options.database}.${this.options.tableName}${finalClause}`
);
}
// ---- OBSERVATION ----
/**
* Watch for new entries via polling. Returns an RxJS Observable.
*/
public watch(options?: { pollInterval?: number }): plugins.smartrx.rxjs.Observable<T> {
const pollInterval = options?.pollInterval || 1000;
return new plugins.smartrx.rxjs.Observable<T>((observer) => {
let lastTimestamp: number;
let intervalId: ReturnType<typeof setInterval>;
let stopped = false;
const fetchInitialTimestamp = async () => {
const result = await this.db.clickhouseHttpClient.queryPromise(`
SELECT max(timestamp) as lastTimestamp
FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow
`);
lastTimestamp = result.length && result[0].lastTimestamp
? new Date(result[0].lastTimestamp).getTime()
: Date.now();
};
const fetchNewEntries = async () => {
if (stopped) return;
try {
const entries = await this.db.clickhouseHttpClient.queryPromise(`
SELECT * FROM ${this.options.database}.${this.options.tableName}
WHERE timestamp > toDateTime(${lastTimestamp / 1000})
ORDER BY timestamp ASC FORMAT JSONEachRow
`);
for (const entry of entries) {
observer.next(entry);
}
if (entries.length > 0) {
lastTimestamp = new Date(entries[entries.length - 1].timestamp).getTime();
}
} catch (err) {
observer.error(err);
}
};
const start = async () => {
await fetchInitialTimestamp();
intervalId = setInterval(fetchNewEntries, pollInterval);
};
start().catch((err) => observer.error(err));
return () => {
stopped = true;
clearInterval(intervalId);
};
});
}
// ---- PRIVATE HELPERS ----
private async ensureColumn(name: string, type: TClickhouseColumnType): Promise<void> {
// Check cached columns first
const exists = this.columns.some((col) => col.name === name);
if (exists) return;
// Refresh and check again
await this.updateColumns();
if (this.columns.some((col) => col.name === name)) return;
// Add column
try {
await this.db.clickhouseHttpClient.queryPromise(
`ALTER TABLE ${this.options.database}.${this.options.tableName} ADD COLUMN \`${name}\` ${type}`
);
} catch (err) {
// Column might have been added concurrently — ignore "already exists" errors
if (!String(err).includes('already exists')) {
throw err;
}
}
await this.updateColumns();
}
private flattenDocument(data: Partial<T>): Record<string, any> {
const flat = plugins.smartobject.toFlatObject(data);
const storageDoc: Record<string, any> = {};
for (const [key, value] of Object.entries(flat)) {
const type = detectClickhouseType(value);
if (type || key === 'timestamp') {
storageDoc[key] = value;
}
}
return storageDoc;
}
private async prepareDocument(data: Partial<T>): Promise<Record<string, any>> {
if (this.options.autoSchemaEvolution) {
await this.syncSchema(data as Record<string, any>);
}
return this.flattenDocument(data);
}
private async executeInsert(docs: Record<string, any>[]): Promise<void> {
try {
await this.db.clickhouseHttpClient.insertPromise(
this.options.database,
this.options.tableName,
docs,
);
} catch (err) {
await this.handleInsertError();
}
}
private async handleInsertError(): Promise<void> {
if (this.healingDeferred) return;
this.healingDeferred = plugins.smartpromise.defer();
console.log('ClickhouseTable: Insert error. Attempting self-healing...');
try {
await this.db.pingDatabaseUntilAvailable();
await this.db.createDatabase();
await this.setup();
} finally {
this.healingDeferred.resolve();
this.healingDeferred = null;
}
}
}