135 lines
4.4 KiB
TypeScript
135 lines
4.4 KiB
TypeScript
|
|
import { assertEquals, assertExists } from 'jsr:@std/assert@^1.0.0';
|
||
|
|
import { CLUSTER, PATHS } from '../ts/constants.ts';
|
||
|
|
import { ClusterManager } from '../ts/cluster/cluster-manager.ts';
|
||
|
|
import type { IClusterNodeHeartbeat } from '../ts/interfaces/cluster.ts';
|
||
|
|
|
||
|
|
function createNode(nodeName: string, lastSeenAt: number): IClusterNodeHeartbeat {
|
||
|
|
return {
|
||
|
|
nodeName,
|
||
|
|
role: nodeName === 'control' ? 'control-plane' : 'worker',
|
||
|
|
endpoint: `http://${nodeName}:8080`,
|
||
|
|
healthy: true,
|
||
|
|
resources: {
|
||
|
|
gpuCount: 1,
|
||
|
|
totalVramGb: 24,
|
||
|
|
availableVramGb: 24,
|
||
|
|
maxSingleGpuVramGb: 24,
|
||
|
|
largestGpuGroupCount: 1,
|
||
|
|
largestGpuGroupVramGb: 24,
|
||
|
|
deploymentCount: 0,
|
||
|
|
topologyGroups: [
|
||
|
|
{
|
||
|
|
id: 'nvidia-0',
|
||
|
|
vendor: 'nvidia',
|
||
|
|
gpuIds: ['gpu-0'],
|
||
|
|
gpuCount: 1,
|
||
|
|
totalVramGb: 24,
|
||
|
|
maxSingleGpuVramGb: 24,
|
||
|
|
busNumbers: [1],
|
||
|
|
},
|
||
|
|
],
|
||
|
|
},
|
||
|
|
deployments: [],
|
||
|
|
lastSeenAt,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
async function waitForPersistence(): Promise<void> {
|
||
|
|
await new Promise((resolve) => setTimeout(resolve, 25));
|
||
|
|
}
|
||
|
|
|
||
|
|
Deno.test('ClusterManager initialize loads persisted state and prunes stale nodes', async () => {
|
||
|
|
const originalDataDir = PATHS.DATA_DIR;
|
||
|
|
const tempDir = await Deno.makeTempDir();
|
||
|
|
(PATHS as { DATA_DIR: string }).DATA_DIR = tempDir;
|
||
|
|
|
||
|
|
try {
|
||
|
|
const now = Date.now();
|
||
|
|
await Deno.writeTextFile(
|
||
|
|
`${tempDir}/cluster-state.json`,
|
||
|
|
JSON.stringify({
|
||
|
|
nodes: [
|
||
|
|
createNode('control', now),
|
||
|
|
createNode('worker-fresh', now),
|
||
|
|
createNode('worker-stale', now - CLUSTER.NODE_STALE_AFTER_MS - 1000),
|
||
|
|
],
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
await Deno.writeTextFile(
|
||
|
|
`${tempDir}/cluster-control-state.json`,
|
||
|
|
JSON.stringify({
|
||
|
|
desiredDeployments: [
|
||
|
|
{ modelId: 'meta-llama/Llama-3.1-8B-Instruct', desiredReplicas: 2, updatedAt: now },
|
||
|
|
],
|
||
|
|
nodeSchedulerStates: {
|
||
|
|
'worker-fresh': 'cordoned',
|
||
|
|
},
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
|
||
|
|
const clusterManager = new ClusterManager();
|
||
|
|
clusterManager.configure({
|
||
|
|
enabled: true,
|
||
|
|
nodeName: 'control',
|
||
|
|
role: 'control-plane',
|
||
|
|
bindHost: '0.0.0.0',
|
||
|
|
gossipPort: 7946,
|
||
|
|
heartbeatIntervalMs: 5000,
|
||
|
|
seedNodes: [],
|
||
|
|
});
|
||
|
|
|
||
|
|
await clusterManager.initialize();
|
||
|
|
|
||
|
|
assertEquals(clusterManager.getAllNodes().map((node) => node.nodeName), ['control', 'worker-fresh']);
|
||
|
|
assertExists(clusterManager.getLocalNode());
|
||
|
|
assertEquals(clusterManager.getDesiredDeployments().length, 1);
|
||
|
|
assertEquals(clusterManager.getNodeSchedulerState('worker-fresh'), 'cordoned');
|
||
|
|
} finally {
|
||
|
|
(PATHS as { DATA_DIR: string }).DATA_DIR = originalDataDir;
|
||
|
|
await Deno.remove(tempDir, { recursive: true });
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
Deno.test('ClusterManager persists state only after initialization completes', async () => {
|
||
|
|
const originalDataDir = PATHS.DATA_DIR;
|
||
|
|
const tempDir = await Deno.makeTempDir();
|
||
|
|
(PATHS as { DATA_DIR: string }).DATA_DIR = tempDir;
|
||
|
|
|
||
|
|
try {
|
||
|
|
const clusterManager = new ClusterManager();
|
||
|
|
clusterManager.configure({
|
||
|
|
enabled: true,
|
||
|
|
nodeName: 'control',
|
||
|
|
role: 'control-plane',
|
||
|
|
bindHost: '0.0.0.0',
|
||
|
|
gossipPort: 7946,
|
||
|
|
heartbeatIntervalMs: 5000,
|
||
|
|
seedNodes: [],
|
||
|
|
});
|
||
|
|
|
||
|
|
clusterManager.updateLocalNode(createNode('control', Date.now()));
|
||
|
|
clusterManager.upsertDesiredDeployment('meta-llama/Llama-3.1-8B-Instruct', 1);
|
||
|
|
await waitForPersistence();
|
||
|
|
|
||
|
|
assertEquals(await Deno.stat(`${tempDir}/cluster-state.json`).catch(() => null), null);
|
||
|
|
assertEquals(await Deno.stat(`${tempDir}/cluster-control-state.json`).catch(() => null), null);
|
||
|
|
|
||
|
|
await clusterManager.initialize();
|
||
|
|
clusterManager.updateLocalNode(createNode('control', Date.now()));
|
||
|
|
clusterManager.setNodeSchedulerState('control', 'active');
|
||
|
|
clusterManager.upsertDesiredDeployment('meta-llama/Llama-3.1-8B-Instruct', 3);
|
||
|
|
await waitForPersistence();
|
||
|
|
|
||
|
|
const stateFile = JSON.parse(await Deno.readTextFile(`${tempDir}/cluster-state.json`));
|
||
|
|
const controlFile = JSON.parse(await Deno.readTextFile(`${tempDir}/cluster-control-state.json`));
|
||
|
|
|
||
|
|
assertEquals(stateFile.nodes.length, 1);
|
||
|
|
assertEquals(stateFile.nodes[0].nodeName, 'control');
|
||
|
|
assertEquals(controlFile.desiredDeployments[0].desiredReplicas, 3);
|
||
|
|
assertEquals(controlFile.nodeSchedulerStates.control, 'active');
|
||
|
|
} finally {
|
||
|
|
(PATHS as { DATA_DIR: string }).DATA_DIR = originalDataDir;
|
||
|
|
await Deno.remove(tempDir, { recursive: true });
|
||
|
|
}
|
||
|
|
});
|