Files
spark/ts/spark.classes.heartbeatmanager.ts
T

183 lines
5.3 KiB
TypeScript

import type { Spark } from './spark.classes.spark.ts';
import { logger } from './spark.logging.ts';
export class SparkHeartbeatManager {
public sparkRef: Spark;
private heartbeatTimer?: number;
private cloudlyConnectionStatus: TSparkCloudlyConnectionStatus = 'not-configured';
constructor(sparkRefArg: Spark) {
this.sparkRef = sparkRefArg;
}
public async start() {
const config = await this.readConfig();
if (!config.cloudlyUrl || !config.nodeId || !config.nodeToken) {
this.cloudlyConnectionStatus = 'not-configured';
logger.log('info', 'Spark heartbeat is not configured. nodeId/nodeToken/cloudlyUrl are required.');
return;
}
await this.sendHeartbeatOnce().catch((errorArg) => {
logger.log('warn', `initial Spark heartbeat failed: ${(errorArg as Error).message}`);
});
const intervalMs = await this.getHeartbeatIntervalMs();
this.heartbeatTimer = setInterval(() => {
this.sendHeartbeatOnce().catch((errorArg) => {
logger.log('warn', `Spark heartbeat failed: ${(errorArg as Error).message}`);
});
}, intervalMs);
}
public stop() {
if (this.heartbeatTimer !== undefined) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
}
}
public async sendHeartbeatOnce(): Promise<ISparkHeartbeatResponse> {
const config = await this.readConfig();
if (!config.cloudlyUrl || !config.nodeId || !config.nodeToken) {
this.cloudlyConnectionStatus = 'not-configured';
return {
accepted: false,
message: 'Spark heartbeat is not configured',
};
}
this.cloudlyConnectionStatus = 'connecting';
const [metrics, runtimeInfo] = await Promise.all([
this.sparkRef.sparkMetricsCollector.collectMetrics(),
this.getRuntimeInfo(config),
]);
const response = await this.postJson<ISparkHeartbeatResponse>(
config.cloudlyUrl,
'/spark/v1/nodes/heartbeat',
{
nodeId: config.nodeId,
nodeToken: config.nodeToken,
metrics,
runtimeInfo,
},
);
if (!response.accepted) {
this.cloudlyConnectionStatus = 'failed';
throw new Error(response.message || 'Cloudly rejected Spark heartbeat');
}
this.cloudlyConnectionStatus = 'connected';
return response;
}
private async getRuntimeInfo(configArg: ISparkHeartbeatConfig): Promise<ISparkNodeRuntimeInfo> {
const dockerInfo = await this.sparkRef.sparkMetricsCollector.collectDockerInfo();
const mode = await this.getMode();
return {
runtime: 'spark',
nodeId: configArg.nodeId!,
mode,
hostname: Deno.hostname(),
platform: Deno.build.os,
arch: Deno.build.arch,
osRelease: this.getOsRelease(),
sparkVersion: this.sparkRef.sparkInfo.projectInfo.version,
cloudlyUrl: configArg.cloudlyUrl,
cloudlyConnectionStatus: this.cloudlyConnectionStatus,
dockerAvailable: dockerInfo.dockerAvailable,
swarmNodeId: dockerInfo.swarmNodeId,
checkedAt: Date.now(),
};
}
private async getMode(): Promise<TSparkNodeMode | undefined> {
const mode = await this.sparkRef.sparkConfig.kvStore.readKey('mode');
if (mode === 'cloudly' || mode === 'coreflow-node') {
return mode;
}
return undefined;
}
private getOsRelease(): string | undefined {
try {
return Deno.osRelease();
} catch {
return undefined;
}
}
private async getHeartbeatIntervalMs() {
const configuredInterval = Number(
await this.sparkRef.sparkConfig.kvStore.readKey('heartbeatIntervalMs'),
);
if (Number.isFinite(configuredInterval) && configuredInterval >= 5000) {
return configuredInterval;
}
return 60000;
}
private async readConfig(): Promise<ISparkHeartbeatConfig> {
return {
cloudlyUrl: await this.sparkRef.sparkConfig.kvStore.readKey('cloudlyUrl'),
nodeId: await this.sparkRef.sparkConfig.kvStore.readKey('nodeId'),
nodeToken: await this.sparkRef.sparkConfig.kvStore.readKey('nodeToken'),
};
}
private async postJson<TResponse>(
cloudlyUrlArg: string,
pathArg: string,
bodyArg: Record<string, unknown>,
): Promise<TResponse> {
const url = new URL(
pathArg,
cloudlyUrlArg.endsWith('/') ? cloudlyUrlArg : `${cloudlyUrlArg}/`,
);
const response = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify(bodyArg),
});
if (!response.ok) {
throw new Error(`Cloudly request failed: ${pathArg} -> HTTP ${response.status}`);
}
return await response.json() as TResponse;
}
}
interface ISparkHeartbeatConfig {
cloudlyUrl?: string;
nodeId?: string;
nodeToken?: string;
}
type TSparkNodeMode = 'cloudly' | 'coreflow-node';
type TSparkCloudlyConnectionStatus = 'not-configured' | 'connecting' | 'connected' | 'failed';
interface ISparkNodeRuntimeInfo {
runtime: 'spark';
nodeId: string;
mode?: TSparkNodeMode;
hostname?: string;
platform: string;
arch: string;
osRelease?: string;
sparkVersion: string;
cloudlyUrl?: string;
cloudlyConnectionStatus: TSparkCloudlyConnectionStatus;
dockerAvailable: boolean;
swarmNodeId?: string;
checkedAt: number;
lastError?: string;
}
interface ISparkHeartbeatResponse {
accepted: boolean;
message?: string;
}