Files
cloudly/ts/manager.node/classes.nodemanager.ts
T

260 lines
8.6 KiB
TypeScript
Raw Normal View History

import * as plugins from '../plugins.js';
import { Cloudly } from '../classes.cloudly.js';
import { Cluster } from '../manager.cluster/classes.cluster.js';
import { ClusterNode } from './classes.clusternode.js';
import { CurlFresh } from './classes.curlfresh.js';
2026-05-24 12:47:15 +00:00
interface ISparkHeartbeatRequest {
nodeId?: string;
nodeToken?: string;
metrics?: plugins.servezoneInterfaces.data.IClusterNodeMetrics;
runtimeInfo?: plugins.servezoneInterfaces.data.ISparkNodeRuntimeInfo;
2026-05-24 12:47:15 +00:00
}
interface ISparkHeartbeatResponse {
accepted: boolean;
message?: string;
}
export class CloudlyNodeManager {
public cloudlyRef: Cloudly;
public typedRouter = new plugins.typedrequest.TypedRouter();
public curlfreshInstance = new CurlFresh(this);
2026-05-08 13:56:20 +00:00
public hetznerAccount?: plugins.hetznercloud.HetznerAccount;
public get db() {
return this.cloudlyRef.mongodbConnector.smartdataDb;
}
public CClusterNode = plugins.smartdata.setDefaultManagerForDoc(this, ClusterNode);
constructor(cloudlyRefArg: Cloudly) {
this.cloudlyRef = cloudlyRefArg;
/**
* is used be serverconfig module on the node to get the actual node config
*/
this.typedRouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.node.IRequest_Any_Cloudly_GetNodeConfig>(
'getNodeConfig',
async (requestData) => {
const nodeId = requestData.nodeId;
const node = await this.CClusterNode.getInstance({
id: nodeId,
});
return {
configData: await node.createSavableObject(),
};
},
),
);
}
public async start() {
const hetznerToken = await this.cloudlyRef.settingsManager.getSetting('hetznerToken');
if (!hetznerToken) {
console.log('warn', 'No Hetzner token configured in settings. Hetzner features will be disabled.');
return;
}
this.hetznerAccount = new plugins.hetznercloud.HetznerAccount(hetznerToken);
}
public async stop() {}
2026-05-24 12:47:15 +00:00
public async handleSparkHeartbeatHttpRequest(
ctxArg: plugins.typedserver.IRequestContext,
): Promise<Response> {
try {
const requestData = await this.readJsonBody<ISparkHeartbeatRequest>(ctxArg);
const response = await this.acceptSparkHeartbeat(requestData);
return this.createJsonResponse(200, response);
} catch (error) {
return this.createJsonResponse(400, {
accepted: false,
message: `Spark heartbeat failed: ${(error as Error).message}`,
} satisfies ISparkHeartbeatResponse);
}
}
public async acceptSparkHeartbeat(
requestDataArg: ISparkHeartbeatRequest,
): Promise<ISparkHeartbeatResponse> {
if (!requestDataArg.nodeId) {
return {
accepted: false,
message: 'Spark node id is missing',
};
}
if (!requestDataArg.nodeToken) {
return {
accepted: false,
message: 'Spark node token is missing',
};
}
if (!this.isSparkMetrics(requestDataArg.metrics)) {
return {
accepted: false,
message: 'Spark metrics are missing or invalid',
};
}
if (!this.isSparkRuntimeInfo(requestDataArg.runtimeInfo, requestDataArg.nodeId)) {
return {
accepted: false,
message: 'Spark runtime info is missing or invalid',
};
}
const node = await this.CClusterNode.getInstance({ id: requestDataArg.nodeId });
if (!node) {
return {
accepted: false,
message: 'Spark node is unknown',
};
}
if (node.sparkNodeTokenHash !== this.hashSecret(requestDataArg.nodeToken)) {
return {
accepted: false,
message: 'Spark node token is invalid',
};
}
await node.updateSparkHeartbeat(requestDataArg.metrics, requestDataArg.runtimeInfo);
return {
accepted: true,
};
}
/**
* creates the node infrastructure on hetzner
* ensures that there are exactly the resources that are needed
* no more, no less
*/
public async ensureNodeInfrastructure() {
// get all clusters
const allClusters = await this.cloudlyRef.clusterManager.getAllClusters();
for (const cluster of allClusters) {
// Skip clusters that are not set up for Hetzner auto-provisioning
if (cluster.data.setupMode !== 'hetzner') {
console.log(`Skipping node provisioning for cluster ${cluster.id} - setupMode is ${cluster.data.setupMode || 'manual'}`);
continue;
}
2026-05-08 13:56:20 +00:00
const hetznerAccount = this.hetznerAccount;
if (!hetznerAccount) {
throw new Error('Hetzner account is not configured');
}
// get existing nodes
const nodes = await this.getNodesByCluster(cluster);
// if there is no node, create one
if (nodes.length === 0) {
2026-05-08 13:56:20 +00:00
const hetznerServer = await hetznerAccount.createServer({
name: plugins.smartunique.uniSimple('node'),
location: 'nbg1',
type: 'cpx41',
labels: {
clusterId: cluster.id,
priority: '1',
},
2026-05-08 13:56:20 +00:00
userData: await this.curlfreshInstance.getServerUserData(cluster),
});
// First create BareMetal record
const baremetal = await this.cloudlyRef.baremetalManager.createBaremetalFromHetznerServer(hetznerServer);
const newNode = await ClusterNode.createFromHetznerServer(hetznerServer, cluster.id, baremetal.id);
await baremetal.assignNode(newNode.id);
console.log(`cluster created new node for cluster ${cluster.id}`);
} else {
console.log(
`cluster ${cluster.id} already has nodes. Making sure that they actually exist in the real world...`,
);
// if there is a node, make sure that it exists
for (const node of nodes) {
2026-05-08 13:56:20 +00:00
const hetznerServers = await hetznerAccount.getServersByLabel({
clusterId: cluster.id,
});
if (!hetznerServers || hetznerServers.length === 0) {
console.log(`node ${node.id} does not exist in the real world. Creating it now...`);
2026-05-08 13:56:20 +00:00
const hetznerServer = await hetznerAccount.createServer({
name: plugins.smartunique.uniSimple('node'),
location: 'nbg1',
type: 'cpx41',
labels: {
clusterId: cluster.id,
priority: '1',
},
2026-05-08 13:56:20 +00:00
userData: await this.curlfreshInstance.getServerUserData(cluster),
});
// First create BareMetal record
const baremetal = await this.cloudlyRef.baremetalManager.createBaremetalFromHetznerServer(hetznerServer);
const newNode = await ClusterNode.createFromHetznerServer(hetznerServer, cluster.id, baremetal.id);
await baremetal.assignNode(newNode.id);
}
}
}
}
}
public async getNodesByCluster(clusterArg: Cluster) {
const results = await this.CClusterNode.getInstances({
data: {
clusterId: clusterArg.id,
},
});
return results;
}
2026-05-24 12:47:15 +00:00
private isSparkMetrics(valueArg: unknown): valueArg is plugins.servezoneInterfaces.data.IClusterNodeMetrics {
if (!valueArg || typeof valueArg !== 'object') {
return false;
}
const metrics = valueArg as Partial<plugins.servezoneInterfaces.data.IClusterNodeMetrics>;
return typeof metrics.cpuUsagePercent === 'number'
&& typeof metrics.memoryUsedMB === 'number'
&& typeof metrics.memoryAvailableMB === 'number'
&& typeof metrics.diskUsedGB === 'number'
&& typeof metrics.diskAvailableGB === 'number'
&& typeof metrics.containerCount === 'number'
&& typeof metrics.timestamp === 'number';
}
private isSparkRuntimeInfo(
valueArg: unknown,
nodeIdArg: string,
): valueArg is plugins.servezoneInterfaces.data.ISparkNodeRuntimeInfo {
2026-05-24 12:47:15 +00:00
if (!valueArg || typeof valueArg !== 'object') {
return false;
}
const runtimeInfo = valueArg as Record<string, unknown>;
return runtimeInfo.runtime === 'spark'
&& runtimeInfo.nodeId === nodeIdArg
&& typeof runtimeInfo.checkedAt === 'number';
}
private hashSecret(secretArg: string) {
return plugins.crypto.createHash('sha256').update(secretArg).digest('hex');
}
private async readJsonBody<T>(ctxArg: plugins.typedserver.IRequestContext): Promise<T> {
const bodyString = (await ctxArg.text()).trim();
return bodyString ? JSON.parse(bodyString) as T : {} as T;
}
private createJsonResponse(
statusCodeArg: number,
bodyArg: object,
): Response {
return new Response(JSON.stringify(bodyArg), {
status: statusCodeArg,
headers: {
'Content-Type': 'application/json',
},
});
}
}