feat(core): introduce typed ClickHouse table API, query builder, and result handling; enhance HTTP client and add schema evolution, batch inserts and mutations; update docs/tests and bump deps
This commit is contained in:
498
ts/smartclickhouse.classes.clickhousetable.ts
Normal file
498
ts/smartclickhouse.classes.clickhousetable.ts
Normal file
@@ -0,0 +1,498 @@
|
||||
import * as plugins from './smartclickhouse.plugins.js';
|
||||
import type { SmartClickHouseDb } from './smartclickhouse.classes.smartclickhouse.js';
|
||||
import { ClickhouseQueryBuilder } from './smartclickhouse.classes.querybuilder.js';
|
||||
import type {
|
||||
IClickhouseTableOptions,
|
||||
IColumnInfo,
|
||||
TClickhouseColumnType,
|
||||
} from './smartclickhouse.types.js';
|
||||
import { detectClickhouseType, escapeClickhouseValue } from './smartclickhouse.types.js';
|
||||
|
||||
export class ClickhouseTable<T extends Record<string, any>> {
|
||||
// ---- STATIC FACTORY ----
|
||||
|
||||
public static async create<T extends Record<string, any>>(
|
||||
db: SmartClickHouseDb,
|
||||
options: IClickhouseTableOptions<T>,
|
||||
): Promise<ClickhouseTable<T>> {
|
||||
const table = new ClickhouseTable<T>(db, options);
|
||||
await table.setup();
|
||||
return table;
|
||||
}
|
||||
|
||||
// ---- INSTANCE ----
|
||||
|
||||
public db: SmartClickHouseDb;
|
||||
public options: IClickhouseTableOptions<T>;
|
||||
public columns: IColumnInfo[] = [];
|
||||
private healingDeferred: plugins.smartpromise.Deferred<any> | null = null;
|
||||
|
||||
constructor(db: SmartClickHouseDb, options: IClickhouseTableOptions<T>) {
|
||||
this.db = db;
|
||||
this.options = {
|
||||
autoSchemaEvolution: true,
|
||||
...options,
|
||||
database: options.database || db.options.database,
|
||||
engine: options.engine || { engine: 'MergeTree' },
|
||||
};
|
||||
}
|
||||
|
||||
// ---- SCHEMA MANAGEMENT ----
|
||||
|
||||
/**
|
||||
* Creates the table if it doesn't exist and refreshes column metadata
|
||||
*/
|
||||
public async setup(): Promise<void> {
|
||||
const { database, tableName, engine, orderBy, partitionBy, primaryKey, ttl, retainDataForDays, columns } = this.options;
|
||||
|
||||
// Build column definitions
|
||||
let columnDefs: string;
|
||||
if (columns && columns.length > 0) {
|
||||
columnDefs = columns.map((col) => {
|
||||
let def = `${col.name} ${col.type}`;
|
||||
if (col.defaultExpression) def += ` DEFAULT ${col.defaultExpression}`;
|
||||
if (col.codec) def += ` CODEC(${col.codec})`;
|
||||
return def;
|
||||
}).join(',\n ');
|
||||
} else {
|
||||
// Default minimal schema — downstream code can add columns via auto-schema evolution
|
||||
columnDefs = `timestamp DateTime64(3, 'Europe/Berlin'),\n message String`;
|
||||
}
|
||||
|
||||
// Build engine clause
|
||||
let engineClause: string = engine.engine;
|
||||
if (engine.engine === 'ReplacingMergeTree' && engine.versionColumn) {
|
||||
engineClause = `ReplacingMergeTree(${engine.versionColumn})`;
|
||||
} else if (engine.engine === 'CollapsingMergeTree' && engine.signColumn) {
|
||||
engineClause = `CollapsingMergeTree(${engine.signColumn})`;
|
||||
} else if (engine.engine === 'VersionedCollapsingMergeTree' && engine.signColumn && engine.versionColumn) {
|
||||
engineClause = `VersionedCollapsingMergeTree(${engine.signColumn}, ${engine.versionColumn})`;
|
||||
} else {
|
||||
engineClause = `${engine.engine}()`;
|
||||
}
|
||||
|
||||
// Build ORDER BY
|
||||
const orderByStr = Array.isArray(orderBy) ? orderBy.join(', ') : orderBy;
|
||||
|
||||
let createSQL = `
|
||||
CREATE TABLE IF NOT EXISTS ${database}.${tableName} (
|
||||
${columnDefs}
|
||||
) ENGINE = ${engineClause}`;
|
||||
|
||||
if (partitionBy) {
|
||||
createSQL += `\n PARTITION BY ${partitionBy}`;
|
||||
}
|
||||
|
||||
createSQL += `\n ORDER BY (${orderByStr})`;
|
||||
|
||||
if (primaryKey) {
|
||||
const primaryKeyStr = Array.isArray(primaryKey) ? primaryKey.join(', ') : primaryKey;
|
||||
createSQL += `\n PRIMARY KEY (${primaryKeyStr})`;
|
||||
}
|
||||
|
||||
await this.db.clickhouseHttpClient.queryPromise(createSQL);
|
||||
|
||||
// Apply TTL if configured
|
||||
if (ttl) {
|
||||
await this.db.clickhouseHttpClient.queryPromise(`
|
||||
ALTER TABLE ${database}.${tableName}
|
||||
MODIFY TTL toDateTime(${String(ttl.column)}) + INTERVAL ${ttl.interval}
|
||||
`);
|
||||
} else if (retainDataForDays && retainDataForDays > 0) {
|
||||
// Legacy shorthand
|
||||
await this.db.clickhouseHttpClient.queryPromise(`
|
||||
ALTER TABLE ${database}.${tableName}
|
||||
MODIFY TTL toDateTime(timestamp) + INTERVAL ${retainDataForDays} DAY
|
||||
`);
|
||||
}
|
||||
|
||||
await this.updateColumns();
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh column metadata from system.columns
|
||||
*/
|
||||
public async updateColumns(): Promise<IColumnInfo[]> {
|
||||
this.columns = await this.db.clickhouseHttpClient.queryPromise(`
|
||||
SELECT * FROM system.columns
|
||||
WHERE database = '${this.options.database}'
|
||||
AND table = '${this.options.tableName}' FORMAT JSONEachRow
|
||||
`);
|
||||
return this.columns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Auto-schema evolution: detect new columns from data and add them
|
||||
*/
|
||||
public async syncSchema(data: Record<string, any>): Promise<void> {
|
||||
const flatData = plugins.smartobject.toFlatObject(data);
|
||||
for (const key of Object.keys(flatData)) {
|
||||
if (key === 'timestamp') continue;
|
||||
const value = flatData[key];
|
||||
const clickhouseType = detectClickhouseType(value);
|
||||
if (!clickhouseType) continue;
|
||||
await this.ensureColumn(key, clickhouseType);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- INSERT ----
|
||||
|
||||
/**
|
||||
* Insert a single row
|
||||
*/
|
||||
public async insert(data: Partial<T>): Promise<void> {
|
||||
if (this.healingDeferred) return;
|
||||
|
||||
const storageDoc = await this.prepareDocument(data);
|
||||
await this.executeInsert([storageDoc]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert multiple rows
|
||||
*/
|
||||
public async insertMany(data: Partial<T>[]): Promise<void> {
|
||||
if (this.healingDeferred) return;
|
||||
if (data.length === 0) return;
|
||||
|
||||
// Schema sync across all documents
|
||||
if (this.options.autoSchemaEvolution) {
|
||||
const allKeys = new Set<string>();
|
||||
const sampleValues: Record<string, any> = {};
|
||||
for (const doc of data) {
|
||||
const flat = plugins.smartobject.toFlatObject(doc);
|
||||
for (const [key, value] of Object.entries(flat)) {
|
||||
if (!allKeys.has(key)) {
|
||||
allKeys.add(key);
|
||||
sampleValues[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
await this.syncSchema(sampleValues);
|
||||
}
|
||||
|
||||
const storageDocs = data.map((doc) => this.flattenDocument(doc));
|
||||
await this.executeInsert(storageDocs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert in batches of configurable size
|
||||
*/
|
||||
public async insertBatch(data: Partial<T>[], options?: { batchSize?: number }): Promise<void> {
|
||||
const batchSize = options?.batchSize || 10000;
|
||||
if (this.healingDeferred) return;
|
||||
if (data.length === 0) return;
|
||||
|
||||
// Schema sync across all documents first
|
||||
if (this.options.autoSchemaEvolution) {
|
||||
const sampleValues: Record<string, any> = {};
|
||||
for (const doc of data) {
|
||||
const flat = plugins.smartobject.toFlatObject(doc);
|
||||
for (const [key, value] of Object.entries(flat)) {
|
||||
if (!(key in sampleValues)) {
|
||||
sampleValues[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
await this.syncSchema(sampleValues);
|
||||
}
|
||||
|
||||
const storageDocs = data.map((doc) => this.flattenDocument(doc));
|
||||
await this.db.clickhouseHttpClient.insertBatch(
|
||||
this.options.database,
|
||||
this.options.tableName,
|
||||
storageDocs,
|
||||
batchSize,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a push-based insert stream using ObservableIntake
|
||||
*/
|
||||
public createInsertStream(options?: { batchSize?: number; flushIntervalMs?: number }): plugins.smartrx.ObservableIntake<Partial<T>> {
|
||||
const batchSize = options?.batchSize || 100;
|
||||
const flushIntervalMs = options?.flushIntervalMs || 1000;
|
||||
const intake = new plugins.smartrx.ObservableIntake<Partial<T>>();
|
||||
let buffer: Partial<T>[] = [];
|
||||
let flushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
const flush = async () => {
|
||||
if (buffer.length === 0) return;
|
||||
const toInsert = buffer;
|
||||
buffer = [];
|
||||
await this.insertMany(toInsert);
|
||||
};
|
||||
|
||||
const scheduleFlush = () => {
|
||||
if (flushTimer) clearTimeout(flushTimer);
|
||||
flushTimer = setTimeout(async () => {
|
||||
await flush();
|
||||
}, flushIntervalMs);
|
||||
};
|
||||
|
||||
intake.subscribe(
|
||||
async (doc) => {
|
||||
buffer.push(doc);
|
||||
if (buffer.length >= batchSize) {
|
||||
if (flushTimer) clearTimeout(flushTimer);
|
||||
await flush();
|
||||
} else {
|
||||
scheduleFlush();
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
async () => {
|
||||
if (flushTimer) clearTimeout(flushTimer);
|
||||
await flush();
|
||||
},
|
||||
);
|
||||
|
||||
return intake;
|
||||
}
|
||||
|
||||
// ---- QUERY ----
|
||||
|
||||
/**
|
||||
* Returns a fluent query builder for this table
|
||||
*/
|
||||
public query(): ClickhouseQueryBuilder<T> {
|
||||
return new ClickhouseQueryBuilder<T>(
|
||||
this.options.tableName,
|
||||
this.options.database,
|
||||
this.db.clickhouseHttpClient,
|
||||
);
|
||||
}
|
||||
|
||||
// ---- UPDATE ----
|
||||
|
||||
/**
|
||||
* Update rows matching a where condition (ClickHouse mutation - use sparingly)
|
||||
*/
|
||||
public async update(
|
||||
set: Partial<T>,
|
||||
whereFn: (q: ClickhouseQueryBuilder<T>) => ClickhouseQueryBuilder<T>,
|
||||
): Promise<void> {
|
||||
const qb = whereFn(new ClickhouseQueryBuilder<T>(this.options.tableName, this.options.database, this.db.clickhouseHttpClient));
|
||||
const whereClause = qb.buildWhereClause();
|
||||
if (!whereClause) {
|
||||
throw new Error('UPDATE requires a WHERE clause. Use a where condition to target specific rows.');
|
||||
}
|
||||
|
||||
const setClauses = Object.entries(set)
|
||||
.map(([key, value]) => `${key} = ${escapeClickhouseValue(value)}`)
|
||||
.join(', ');
|
||||
|
||||
await this.db.clickhouseHttpClient.mutatePromise(
|
||||
`ALTER TABLE ${this.options.database}.${this.options.tableName} UPDATE ${setClauses} WHERE ${whereClause}`
|
||||
);
|
||||
await this.waitForMutations();
|
||||
}
|
||||
|
||||
// ---- DELETE ----
|
||||
|
||||
/**
|
||||
* Delete rows matching a where condition (ClickHouse mutation)
|
||||
*/
|
||||
public async deleteWhere(
|
||||
whereFn: (q: ClickhouseQueryBuilder<T>) => ClickhouseQueryBuilder<T>,
|
||||
): Promise<void> {
|
||||
const qb = whereFn(new ClickhouseQueryBuilder<T>(this.options.tableName, this.options.database, this.db.clickhouseHttpClient));
|
||||
const whereClause = qb.buildWhereClause();
|
||||
if (!whereClause) {
|
||||
throw new Error('DELETE requires a WHERE clause.');
|
||||
}
|
||||
|
||||
await this.db.clickhouseHttpClient.mutatePromise(
|
||||
`ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${whereClause}`
|
||||
);
|
||||
await this.waitForMutations();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete entries older than a given interval on a column
|
||||
*/
|
||||
public async deleteOlderThan(column: keyof T & string, interval: string): Promise<void> {
|
||||
await this.db.clickhouseHttpClient.mutatePromise(
|
||||
`ALTER TABLE ${this.options.database}.${this.options.tableName} DELETE WHERE ${String(column)} < now() - INTERVAL ${interval}`
|
||||
);
|
||||
await this.waitForMutations();
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop the entire table
|
||||
*/
|
||||
public async drop(): Promise<void> {
|
||||
await this.db.clickhouseHttpClient.queryPromise(
|
||||
`DROP TABLE IF EXISTS ${this.options.database}.${this.options.tableName}`
|
||||
);
|
||||
this.columns = [];
|
||||
}
|
||||
|
||||
// ---- UTILITIES ----
|
||||
|
||||
/**
|
||||
* Wait for all pending mutations on this table to complete
|
||||
*/
|
||||
public async waitForMutations(): Promise<void> {
|
||||
let pending = true;
|
||||
while (pending) {
|
||||
const mutations = await this.db.clickhouseHttpClient.queryPromise(`
|
||||
SELECT count() AS cnt FROM system.mutations
|
||||
WHERE is_done = 0 AND database = '${this.options.database}' AND table = '${this.options.tableName}' FORMAT JSONEachRow
|
||||
`);
|
||||
const count = mutations[0] ? parseInt(mutations[0].cnt, 10) : 0;
|
||||
if (count === 0) {
|
||||
pending = false;
|
||||
} else {
|
||||
await plugins.smartdelay.delayFor(1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total row count
|
||||
*/
|
||||
public async getRowCount(): Promise<number> {
|
||||
const result = await this.db.clickhouseHttpClient.queryPromise(`
|
||||
SELECT count() AS cnt FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow
|
||||
`);
|
||||
return result[0] ? parseInt(result[0].cnt, 10) : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Optimize table (useful for ReplacingMergeTree deduplication)
|
||||
*/
|
||||
public async optimize(final: boolean = false): Promise<void> {
|
||||
const finalClause = final ? ' FINAL' : '';
|
||||
await this.db.clickhouseHttpClient.queryPromise(
|
||||
`OPTIMIZE TABLE ${this.options.database}.${this.options.tableName}${finalClause}`
|
||||
);
|
||||
}
|
||||
|
||||
// ---- OBSERVATION ----
|
||||
|
||||
/**
|
||||
* Watch for new entries via polling. Returns an RxJS Observable.
|
||||
*/
|
||||
public watch(options?: { pollInterval?: number }): plugins.smartrx.rxjs.Observable<T> {
|
||||
const pollInterval = options?.pollInterval || 1000;
|
||||
|
||||
return new plugins.smartrx.rxjs.Observable<T>((observer) => {
|
||||
let lastTimestamp: number;
|
||||
let intervalId: ReturnType<typeof setInterval>;
|
||||
let stopped = false;
|
||||
|
||||
const fetchInitialTimestamp = async () => {
|
||||
const result = await this.db.clickhouseHttpClient.queryPromise(`
|
||||
SELECT max(timestamp) as lastTimestamp
|
||||
FROM ${this.options.database}.${this.options.tableName} FORMAT JSONEachRow
|
||||
`);
|
||||
lastTimestamp = result.length && result[0].lastTimestamp
|
||||
? new Date(result[0].lastTimestamp).getTime()
|
||||
: Date.now();
|
||||
};
|
||||
|
||||
const fetchNewEntries = async () => {
|
||||
if (stopped) return;
|
||||
try {
|
||||
const entries = await this.db.clickhouseHttpClient.queryPromise(`
|
||||
SELECT * FROM ${this.options.database}.${this.options.tableName}
|
||||
WHERE timestamp > toDateTime(${lastTimestamp / 1000})
|
||||
ORDER BY timestamp ASC FORMAT JSONEachRow
|
||||
`);
|
||||
for (const entry of entries) {
|
||||
observer.next(entry);
|
||||
}
|
||||
if (entries.length > 0) {
|
||||
lastTimestamp = new Date(entries[entries.length - 1].timestamp).getTime();
|
||||
}
|
||||
} catch (err) {
|
||||
observer.error(err);
|
||||
}
|
||||
};
|
||||
|
||||
const start = async () => {
|
||||
await fetchInitialTimestamp();
|
||||
intervalId = setInterval(fetchNewEntries, pollInterval);
|
||||
};
|
||||
|
||||
start().catch((err) => observer.error(err));
|
||||
|
||||
return () => {
|
||||
stopped = true;
|
||||
clearInterval(intervalId);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
// ---- PRIVATE HELPERS ----
|
||||
|
||||
private async ensureColumn(name: string, type: TClickhouseColumnType): Promise<void> {
|
||||
// Check cached columns first
|
||||
const exists = this.columns.some((col) => col.name === name);
|
||||
if (exists) return;
|
||||
|
||||
// Refresh and check again
|
||||
await this.updateColumns();
|
||||
if (this.columns.some((col) => col.name === name)) return;
|
||||
|
||||
// Add column
|
||||
try {
|
||||
await this.db.clickhouseHttpClient.queryPromise(
|
||||
`ALTER TABLE ${this.options.database}.${this.options.tableName} ADD COLUMN \`${name}\` ${type}`
|
||||
);
|
||||
} catch (err) {
|
||||
// Column might have been added concurrently — ignore "already exists" errors
|
||||
if (!String(err).includes('already exists')) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
await this.updateColumns();
|
||||
}
|
||||
|
||||
private flattenDocument(data: Partial<T>): Record<string, any> {
|
||||
const flat = plugins.smartobject.toFlatObject(data);
|
||||
const storageDoc: Record<string, any> = {};
|
||||
for (const [key, value] of Object.entries(flat)) {
|
||||
const type = detectClickhouseType(value);
|
||||
if (type || key === 'timestamp') {
|
||||
storageDoc[key] = value;
|
||||
}
|
||||
}
|
||||
return storageDoc;
|
||||
}
|
||||
|
||||
private async prepareDocument(data: Partial<T>): Promise<Record<string, any>> {
|
||||
if (this.options.autoSchemaEvolution) {
|
||||
await this.syncSchema(data as Record<string, any>);
|
||||
}
|
||||
return this.flattenDocument(data);
|
||||
}
|
||||
|
||||
private async executeInsert(docs: Record<string, any>[]): Promise<void> {
|
||||
try {
|
||||
await this.db.clickhouseHttpClient.insertPromise(
|
||||
this.options.database,
|
||||
this.options.tableName,
|
||||
docs,
|
||||
);
|
||||
} catch (err) {
|
||||
await this.handleInsertError();
|
||||
}
|
||||
}
|
||||
|
||||
private async handleInsertError(): Promise<void> {
|
||||
if (this.healingDeferred) return;
|
||||
|
||||
this.healingDeferred = plugins.smartpromise.defer();
|
||||
console.log('ClickhouseTable: Insert error. Attempting self-healing...');
|
||||
|
||||
try {
|
||||
await this.db.pingDatabaseUntilAvailable();
|
||||
await this.db.createDatabase();
|
||||
await this.setup();
|
||||
} finally {
|
||||
this.healingDeferred.resolve();
|
||||
this.healingDeferred = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user