test(cluster): cover persistence initialization and pruning
This commit is contained in:
@@ -0,0 +1,134 @@
|
||||
import { assertEquals, assertExists } from 'jsr:@std/assert@^1.0.0';
|
||||
import { CLUSTER, PATHS } from '../ts/constants.ts';
|
||||
import { ClusterManager } from '../ts/cluster/cluster-manager.ts';
|
||||
import type { IClusterNodeHeartbeat } from '../ts/interfaces/cluster.ts';
|
||||
|
||||
function createNode(nodeName: string, lastSeenAt: number): IClusterNodeHeartbeat {
|
||||
return {
|
||||
nodeName,
|
||||
role: nodeName === 'control' ? 'control-plane' : 'worker',
|
||||
endpoint: `http://${nodeName}:8080`,
|
||||
healthy: true,
|
||||
resources: {
|
||||
gpuCount: 1,
|
||||
totalVramGb: 24,
|
||||
availableVramGb: 24,
|
||||
maxSingleGpuVramGb: 24,
|
||||
largestGpuGroupCount: 1,
|
||||
largestGpuGroupVramGb: 24,
|
||||
deploymentCount: 0,
|
||||
topologyGroups: [
|
||||
{
|
||||
id: 'nvidia-0',
|
||||
vendor: 'nvidia',
|
||||
gpuIds: ['gpu-0'],
|
||||
gpuCount: 1,
|
||||
totalVramGb: 24,
|
||||
maxSingleGpuVramGb: 24,
|
||||
busNumbers: [1],
|
||||
},
|
||||
],
|
||||
},
|
||||
deployments: [],
|
||||
lastSeenAt,
|
||||
};
|
||||
}
|
||||
|
||||
async function waitForPersistence(): Promise<void> {
|
||||
await new Promise((resolve) => setTimeout(resolve, 25));
|
||||
}
|
||||
|
||||
Deno.test('ClusterManager initialize loads persisted state and prunes stale nodes', async () => {
|
||||
const originalDataDir = PATHS.DATA_DIR;
|
||||
const tempDir = await Deno.makeTempDir();
|
||||
(PATHS as { DATA_DIR: string }).DATA_DIR = tempDir;
|
||||
|
||||
try {
|
||||
const now = Date.now();
|
||||
await Deno.writeTextFile(
|
||||
`${tempDir}/cluster-state.json`,
|
||||
JSON.stringify({
|
||||
nodes: [
|
||||
createNode('control', now),
|
||||
createNode('worker-fresh', now),
|
||||
createNode('worker-stale', now - CLUSTER.NODE_STALE_AFTER_MS - 1000),
|
||||
],
|
||||
}),
|
||||
);
|
||||
await Deno.writeTextFile(
|
||||
`${tempDir}/cluster-control-state.json`,
|
||||
JSON.stringify({
|
||||
desiredDeployments: [
|
||||
{ modelId: 'meta-llama/Llama-3.1-8B-Instruct', desiredReplicas: 2, updatedAt: now },
|
||||
],
|
||||
nodeSchedulerStates: {
|
||||
'worker-fresh': 'cordoned',
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const clusterManager = new ClusterManager();
|
||||
clusterManager.configure({
|
||||
enabled: true,
|
||||
nodeName: 'control',
|
||||
role: 'control-plane',
|
||||
bindHost: '0.0.0.0',
|
||||
gossipPort: 7946,
|
||||
heartbeatIntervalMs: 5000,
|
||||
seedNodes: [],
|
||||
});
|
||||
|
||||
await clusterManager.initialize();
|
||||
|
||||
assertEquals(clusterManager.getAllNodes().map((node) => node.nodeName), ['control', 'worker-fresh']);
|
||||
assertExists(clusterManager.getLocalNode());
|
||||
assertEquals(clusterManager.getDesiredDeployments().length, 1);
|
||||
assertEquals(clusterManager.getNodeSchedulerState('worker-fresh'), 'cordoned');
|
||||
} finally {
|
||||
(PATHS as { DATA_DIR: string }).DATA_DIR = originalDataDir;
|
||||
await Deno.remove(tempDir, { recursive: true });
|
||||
}
|
||||
});
|
||||
|
||||
Deno.test('ClusterManager persists state only after initialization completes', async () => {
|
||||
const originalDataDir = PATHS.DATA_DIR;
|
||||
const tempDir = await Deno.makeTempDir();
|
||||
(PATHS as { DATA_DIR: string }).DATA_DIR = tempDir;
|
||||
|
||||
try {
|
||||
const clusterManager = new ClusterManager();
|
||||
clusterManager.configure({
|
||||
enabled: true,
|
||||
nodeName: 'control',
|
||||
role: 'control-plane',
|
||||
bindHost: '0.0.0.0',
|
||||
gossipPort: 7946,
|
||||
heartbeatIntervalMs: 5000,
|
||||
seedNodes: [],
|
||||
});
|
||||
|
||||
clusterManager.updateLocalNode(createNode('control', Date.now()));
|
||||
clusterManager.upsertDesiredDeployment('meta-llama/Llama-3.1-8B-Instruct', 1);
|
||||
await waitForPersistence();
|
||||
|
||||
assertEquals(await Deno.stat(`${tempDir}/cluster-state.json`).catch(() => null), null);
|
||||
assertEquals(await Deno.stat(`${tempDir}/cluster-control-state.json`).catch(() => null), null);
|
||||
|
||||
await clusterManager.initialize();
|
||||
clusterManager.updateLocalNode(createNode('control', Date.now()));
|
||||
clusterManager.setNodeSchedulerState('control', 'active');
|
||||
clusterManager.upsertDesiredDeployment('meta-llama/Llama-3.1-8B-Instruct', 3);
|
||||
await waitForPersistence();
|
||||
|
||||
const stateFile = JSON.parse(await Deno.readTextFile(`${tempDir}/cluster-state.json`));
|
||||
const controlFile = JSON.parse(await Deno.readTextFile(`${tempDir}/cluster-control-state.json`));
|
||||
|
||||
assertEquals(stateFile.nodes.length, 1);
|
||||
assertEquals(stateFile.nodes[0].nodeName, 'control');
|
||||
assertEquals(controlFile.desiredDeployments[0].desiredReplicas, 3);
|
||||
assertEquals(controlFile.nodeSchedulerStates.control, 'active');
|
||||
} finally {
|
||||
(PATHS as { DATA_DIR: string }).DATA_DIR = originalDataDir;
|
||||
await Deno.remove(tempDir, { recursive: true });
|
||||
}
|
||||
});
|
||||
Reference in New Issue
Block a user