feat(cloudly): accept Spark node heartbeats
This commit is contained in:
@@ -4,6 +4,18 @@ import { Cluster } from '../manager.cluster/classes.cluster.js';
|
||||
import { ClusterNode } from './classes.clusternode.js';
|
||||
import { CurlFresh } from './classes.curlfresh.js';
|
||||
|
||||
interface ISparkHeartbeatRequest {
|
||||
nodeId?: string;
|
||||
nodeToken?: string;
|
||||
metrics?: plugins.servezoneInterfaces.data.IClusterNodeMetrics;
|
||||
runtimeInfo?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
interface ISparkHeartbeatResponse {
|
||||
accepted: boolean;
|
||||
message?: string;
|
||||
}
|
||||
|
||||
export class CloudlyNodeManager {
|
||||
public cloudlyRef: Cloudly;
|
||||
public typedRouter = new plugins.typedrequest.TypedRouter();
|
||||
@@ -51,6 +63,69 @@ export class CloudlyNodeManager {
|
||||
|
||||
public async stop() {}
|
||||
|
||||
public async handleSparkHeartbeatHttpRequest(
|
||||
ctxArg: plugins.typedserver.IRequestContext,
|
||||
): Promise<Response> {
|
||||
try {
|
||||
const requestData = await this.readJsonBody<ISparkHeartbeatRequest>(ctxArg);
|
||||
const response = await this.acceptSparkHeartbeat(requestData);
|
||||
return this.createJsonResponse(200, response);
|
||||
} catch (error) {
|
||||
return this.createJsonResponse(400, {
|
||||
accepted: false,
|
||||
message: `Spark heartbeat failed: ${(error as Error).message}`,
|
||||
} satisfies ISparkHeartbeatResponse);
|
||||
}
|
||||
}
|
||||
|
||||
public async acceptSparkHeartbeat(
|
||||
requestDataArg: ISparkHeartbeatRequest,
|
||||
): Promise<ISparkHeartbeatResponse> {
|
||||
if (!requestDataArg.nodeId) {
|
||||
return {
|
||||
accepted: false,
|
||||
message: 'Spark node id is missing',
|
||||
};
|
||||
}
|
||||
if (!requestDataArg.nodeToken) {
|
||||
return {
|
||||
accepted: false,
|
||||
message: 'Spark node token is missing',
|
||||
};
|
||||
}
|
||||
if (!this.isSparkMetrics(requestDataArg.metrics)) {
|
||||
return {
|
||||
accepted: false,
|
||||
message: 'Spark metrics are missing or invalid',
|
||||
};
|
||||
}
|
||||
if (!this.isSparkRuntimeInfo(requestDataArg.runtimeInfo, requestDataArg.nodeId)) {
|
||||
return {
|
||||
accepted: false,
|
||||
message: 'Spark runtime info is missing or invalid',
|
||||
};
|
||||
}
|
||||
|
||||
const node = await this.CClusterNode.getInstance({ id: requestDataArg.nodeId });
|
||||
if (!node) {
|
||||
return {
|
||||
accepted: false,
|
||||
message: 'Spark node is unknown',
|
||||
};
|
||||
}
|
||||
if (node.sparkNodeTokenHash !== this.hashSecret(requestDataArg.nodeToken)) {
|
||||
return {
|
||||
accepted: false,
|
||||
message: 'Spark node token is invalid',
|
||||
};
|
||||
}
|
||||
|
||||
await node.updateSparkHeartbeat(requestDataArg.metrics, requestDataArg.runtimeInfo);
|
||||
return {
|
||||
accepted: true,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* creates the node infrastructure on hetzner
|
||||
* ensures that there are exactly the resources that are needed
|
||||
@@ -133,4 +208,49 @@ export class CloudlyNodeManager {
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
private isSparkMetrics(valueArg: unknown): valueArg is plugins.servezoneInterfaces.data.IClusterNodeMetrics {
|
||||
if (!valueArg || typeof valueArg !== 'object') {
|
||||
return false;
|
||||
}
|
||||
const metrics = valueArg as Partial<plugins.servezoneInterfaces.data.IClusterNodeMetrics>;
|
||||
return typeof metrics.cpuUsagePercent === 'number'
|
||||
&& typeof metrics.memoryUsedMB === 'number'
|
||||
&& typeof metrics.memoryAvailableMB === 'number'
|
||||
&& typeof metrics.diskUsedGB === 'number'
|
||||
&& typeof metrics.diskAvailableGB === 'number'
|
||||
&& typeof metrics.containerCount === 'number'
|
||||
&& typeof metrics.timestamp === 'number';
|
||||
}
|
||||
|
||||
private isSparkRuntimeInfo(valueArg: unknown, nodeIdArg: string): valueArg is Record<string, unknown> {
|
||||
if (!valueArg || typeof valueArg !== 'object') {
|
||||
return false;
|
||||
}
|
||||
const runtimeInfo = valueArg as Record<string, unknown>;
|
||||
return runtimeInfo.runtime === 'spark'
|
||||
&& runtimeInfo.nodeId === nodeIdArg
|
||||
&& typeof runtimeInfo.checkedAt === 'number';
|
||||
}
|
||||
|
||||
private hashSecret(secretArg: string) {
|
||||
return plugins.crypto.createHash('sha256').update(secretArg).digest('hex');
|
||||
}
|
||||
|
||||
private async readJsonBody<T>(ctxArg: plugins.typedserver.IRequestContext): Promise<T> {
|
||||
const bodyString = (await ctxArg.text()).trim();
|
||||
return bodyString ? JSON.parse(bodyString) as T : {} as T;
|
||||
}
|
||||
|
||||
private createJsonResponse(
|
||||
statusCodeArg: number,
|
||||
bodyArg: object,
|
||||
): Response {
|
||||
return new Response(JSON.stringify(bodyArg), {
|
||||
status: statusCodeArg,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user