167 lines
4.8 KiB
TypeScript
167 lines
4.8 KiB
TypeScript
import * as plugins from './smartclickhouse.plugins.js';
|
|
|
|
export interface IClickhouseHttpClientOptions {
|
|
username?: string;
|
|
password?: string;
|
|
url: string;
|
|
}
|
|
|
|
export class ClickhouseHttpClient {
|
|
// STATIC
|
|
public static async createAndStart(optionsArg: IClickhouseHttpClientOptions) {
|
|
const clickhouseHttpInstance = new ClickhouseHttpClient(optionsArg);
|
|
await clickhouseHttpInstance.start();
|
|
return clickhouseHttpInstance;
|
|
}
|
|
|
|
// INSTANCE
|
|
public options: IClickhouseHttpClientOptions;
|
|
public webrequestInstance = new plugins.webrequest.WebrequestClient({
|
|
logging: false,
|
|
});
|
|
public computedProperties: {
|
|
connectionUrl: string;
|
|
parsedUrl: plugins.smarturl.Smarturl;
|
|
} = {
|
|
connectionUrl: null,
|
|
parsedUrl: null,
|
|
};
|
|
|
|
constructor(optionsArg: IClickhouseHttpClientOptions) {
|
|
this.options = optionsArg;
|
|
}
|
|
|
|
public async start() {
|
|
this.computedProperties.parsedUrl = plugins.smarturl.Smarturl.createFromUrl(this.options.url);
|
|
console.log(this.computedProperties.parsedUrl);
|
|
this.computedProperties.connectionUrl = this.computedProperties.parsedUrl.toString();
|
|
}
|
|
|
|
public async ping() {
|
|
const ping = await this.webrequestInstance.request(
|
|
this.computedProperties.connectionUrl.toString(),
|
|
{
|
|
method: 'GET',
|
|
timeout: 1000,
|
|
}
|
|
);
|
|
return ping.status === 200 ? true : false;
|
|
}
|
|
|
|
/**
|
|
* Execute a query and return parsed JSONEachRow results
|
|
*/
|
|
public async queryPromise(queryArg: string): Promise<any[]> {
|
|
const returnArray: any[] = [];
|
|
const response = await this.webrequestInstance.request(
|
|
`${this.computedProperties.connectionUrl}?query=${encodeURIComponent(queryArg)}`,
|
|
{
|
|
method: 'POST',
|
|
headers: this.getHeaders(),
|
|
}
|
|
);
|
|
|
|
const responseText = await response.text();
|
|
|
|
// Check for errors (ClickHouse returns non-200 for errors)
|
|
if (!response.ok) {
|
|
throw new Error(`ClickHouse query error: ${responseText.trim()}`);
|
|
}
|
|
|
|
if (response.headers.get('X-ClickHouse-Format') === 'JSONEachRow') {
|
|
const jsonArray = responseText.split('\n');
|
|
for (const jsonArg of jsonArray) {
|
|
if (!jsonArg) continue;
|
|
returnArray.push(JSON.parse(jsonArg));
|
|
}
|
|
} else if (responseText.trim()) {
|
|
// Try to parse as JSONEachRow even without header (e.g. when FORMAT is in query)
|
|
const lines = responseText.trim().split('\n');
|
|
for (const line of lines) {
|
|
if (!line) continue;
|
|
try {
|
|
returnArray.push(JSON.parse(line));
|
|
} catch {
|
|
// Not JSON — return raw text as single-element array
|
|
return [{ raw: responseText.trim() }];
|
|
}
|
|
}
|
|
}
|
|
|
|
return returnArray;
|
|
}
|
|
|
|
/**
|
|
* Execute a typed query returning T[]
|
|
*/
|
|
public async queryTyped<T>(queryArg: string): Promise<T[]> {
|
|
return this.queryPromise(queryArg) as Promise<T[]>;
|
|
}
|
|
|
|
/**
|
|
* Insert documents as JSONEachRow
|
|
*/
|
|
public async insertPromise(databaseArg: string, tableArg: string, documents: any[]) {
|
|
const queryArg = `INSERT INTO ${databaseArg}.${tableArg} FORMAT JSONEachRow`;
|
|
const response = await this.webrequestInstance.request(
|
|
`${this.computedProperties.connectionUrl}?query=${encodeURIComponent(queryArg)}`,
|
|
{
|
|
method: 'POST',
|
|
body: documents.map((docArg) => JSON.stringify(docArg)).join('\n'),
|
|
headers: this.getHeaders(),
|
|
}
|
|
);
|
|
|
|
if (!response.ok) {
|
|
const errorText = await response.text();
|
|
throw new Error(`ClickHouse insert error: ${errorText.trim()}`);
|
|
}
|
|
|
|
return response;
|
|
}
|
|
|
|
/**
|
|
* Insert documents in batches of configurable size
|
|
*/
|
|
public async insertBatch(
|
|
databaseArg: string,
|
|
tableArg: string,
|
|
documents: any[],
|
|
batchSize: number = 10000,
|
|
) {
|
|
for (let i = 0; i < documents.length; i += batchSize) {
|
|
const batch = documents.slice(i, i + batchSize);
|
|
await this.insertPromise(databaseArg, tableArg, batch);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute a mutation (ALTER TABLE UPDATE/DELETE) and optionally wait for completion
|
|
*/
|
|
public async mutatePromise(queryArg: string): Promise<void> {
|
|
const response = await this.webrequestInstance.request(
|
|
`${this.computedProperties.connectionUrl}?query=${encodeURIComponent(queryArg)}`,
|
|
{
|
|
method: 'POST',
|
|
headers: this.getHeaders(),
|
|
}
|
|
);
|
|
|
|
if (!response.ok) {
|
|
const errorText = await response.text();
|
|
throw new Error(`ClickHouse mutation error: ${errorText.trim()}`);
|
|
}
|
|
}
|
|
|
|
private getHeaders() {
|
|
const headers: { [key: string]: string } = {};
|
|
if (this.options.username) {
|
|
headers['X-ClickHouse-User'] = this.options.username;
|
|
}
|
|
if (this.options.password) {
|
|
headers['X-ClickHouse-Key'] = this.options.password;
|
|
}
|
|
return headers;
|
|
}
|
|
}
|