import os from 'node:os'; import * as fs from 'node:fs/promises'; import type { IModelCatalogEntry } from '../interfaces/catalog.ts'; import type { IClusterConfig, IClusterDesiredDeployment, IClusterGpuTopologyGroup, IClusterModelLocation, IClusterNodeHeartbeat, IClusterNodeStatus, IClusterStatusResponse, TClusterNodeSchedulerState, } from '../interfaces/cluster.ts'; import { CLUSTER, PATHS } from '../constants.ts'; export class ClusterManager { private config: IClusterConfig = { enabled: false, nodeName: os.hostname(), role: 'standalone', bindHost: CLUSTER.DEFAULT_BIND_HOST, gossipPort: CLUSTER.DEFAULT_GOSSIP_PORT, heartbeatIntervalMs: CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS, seedNodes: [], }; private localNode: IClusterNodeHeartbeat | null = null; private knownNodes = new Map(); private desiredDeployments = new Map(); private nodeSchedulerStates = new Map(); private persistQueued = false; private controlPersistQueued = false; public async initialize(): Promise { try { const stateContent = await fs.readFile(this.getStateFilePath(), 'utf-8'); const data = JSON.parse(stateContent) as { nodes?: IClusterNodeHeartbeat[] }; for (const node of data.nodes || []) { this.knownNodes.set(node.nodeName, node); if (node.nodeName === this.config.nodeName) { this.localNode = node; } } this.pruneStaleNodes(); } catch { // No persisted cluster state yet. } try { const controlStateContent = await fs.readFile(this.getControlStateFilePath(), 'utf-8'); const data = JSON.parse(controlStateContent) as { desiredDeployments?: IClusterDesiredDeployment[]; nodeSchedulerStates?: Record; }; for (const deployment of data.desiredDeployments || []) { this.desiredDeployments.set(deployment.modelId, deployment); } for (const [nodeName, schedulerState] of Object.entries(data.nodeSchedulerStates || {})) { this.nodeSchedulerStates.set(nodeName, schedulerState); } } catch { // No persisted control state yet. } } public configure(config: IClusterConfig): void { this.config = { ...config, heartbeatIntervalMs: config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS, seedNodes: config.seedNodes || [], }; } public getConfig(): IClusterConfig { return this.config; } public isEnabled(): boolean { return this.config.enabled; } public isControlPlane(): boolean { return this.config.enabled && this.config.role === 'control-plane'; } public isWorker(): boolean { return this.config.enabled && this.config.role === 'worker'; } public getModeLabel(): string { if (!this.config.enabled) { return 'standalone'; } return this.config.role; } public getHeartbeatIntervalMs(): number { return this.config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS; } public getAdvertisedEndpoint(): string | undefined { return this.localNode?.endpoint || this.config.advertiseUrl; } public getControlPlaneUrl(): string | undefined { return this.config.controlPlaneUrl; } public getSharedSecret(): string | undefined { return this.config.sharedSecret || undefined; } public updateLocalNode(heartbeat: IClusterNodeHeartbeat): void { this.localNode = heartbeat; this.knownNodes.set(heartbeat.nodeName, heartbeat); this.schedulePersist(); } public upsertNode(heartbeat: IClusterNodeHeartbeat): void { this.knownNodes.set(heartbeat.nodeName, heartbeat); this.schedulePersist(); } public getLocalNodeStatus(): IClusterNodeStatus { return { nodeName: this.config.nodeName, role: this.config.role, endpoint: this.getAdvertisedEndpoint(), healthy: true, schedulerState: this.getNodeSchedulerState(this.config.nodeName), }; } public getLocalNode(): IClusterNodeHeartbeat | null { return this.localNode; } public getNode(nodeName: string): IClusterNodeHeartbeat | null { const node = this.knownNodes.get(nodeName); if (!node) { return null; } return this.decorateNode(node); } public pruneStaleNodes(): void { const now = Date.now(); for (const [nodeName, node] of this.knownNodes) { if (nodeName === this.config.nodeName) { continue; } if (now - node.lastSeenAt > CLUSTER.NODE_STALE_AFTER_MS) { this.knownNodes.delete(nodeName); this.schedulePersist(); } } } public getAllNodes(): IClusterNodeHeartbeat[] { this.pruneStaleNodes(); return Array.from(this.knownNodes.values()).map((node) => this.decorateNode(node)).sort( (left, right) => { if (left.nodeName === this.config.nodeName) { return -1; } if (right.nodeName === this.config.nodeName) { return 1; } return left.nodeName.localeCompare(right.nodeName); }, ); } public getHealthyNodes(): IClusterNodeHeartbeat[] { return this.getAllNodes().filter((node) => node.healthy); } public getNodeSchedulerState(nodeName: string): TClusterNodeSchedulerState { return this.nodeSchedulerStates.get(nodeName) || 'active'; } public setNodeSchedulerState( nodeName: string, schedulerState: TClusterNodeSchedulerState, ): TClusterNodeSchedulerState { this.nodeSchedulerStates.set(nodeName, schedulerState); this.scheduleControlPersist(); return schedulerState; } public getDesiredDeployments(): IClusterDesiredDeployment[] { return Array.from(this.desiredDeployments.values()).sort((left, right) => left.modelId.localeCompare(right.modelId) ); } public getDesiredDeployment(modelId: string): IClusterDesiredDeployment | null { return this.desiredDeployments.get(modelId) || null; } public upsertDesiredDeployment( modelId: string, desiredReplicas: number, ): IClusterDesiredDeployment { const deployment: IClusterDesiredDeployment = { modelId, desiredReplicas, updatedAt: Date.now(), }; this.desiredDeployments.set(modelId, deployment); this.scheduleControlPersist(); return deployment; } public removeDesiredDeployment(modelId: string): boolean { const removed = this.desiredDeployments.delete(modelId); if (removed) { this.scheduleControlPersist(); } return removed; } public getModelLocations(modelId: string): IClusterModelLocation[] { const locations: IClusterModelLocation[] = []; for (const node of this.getHealthyNodes()) { for (const deployment of node.deployments) { if (deployment.modelId !== modelId || !deployment.healthy) { continue; } locations.push({ modelId, nodeName: node.nodeName, endpoint: deployment.endpoint, healthy: deployment.healthy, engine: deployment.engine, containerId: deployment.containerId, }); } } return locations; } public getActiveModelLocations(modelId: string): IClusterModelLocation[] { return this.getModelLocations(modelId).filter((location) => this.getNodeSchedulerState(location.nodeName) === 'active' ); } public resolveModel(modelId: string): IClusterModelLocation | null { const locations = this.getModelLocations(modelId); if (locations.length === 0) { return null; } locations.sort((left, right) => { const schedulerPreference = this.compareSchedulerState( this.getNodeSchedulerState(left.nodeName), this.getNodeSchedulerState(right.nodeName), ); if (schedulerPreference !== 0) { return schedulerPreference; } if (left.nodeName === this.config.nodeName) { return -1; } if (right.nodeName === this.config.nodeName) { return 1; } return left.nodeName.localeCompare(right.nodeName); }); return locations[0]; } public pickNodeForModel( model: IModelCatalogEntry, excludedNodeNames: string[] = [], ): IClusterNodeHeartbeat | null { const requiredVram = model.requirements.minVramGb; const minGpuCount = model.requirements.minGpuCount || 1; const preferredTensorParallel = model.launchDefaults?.tensorParallelSize || minGpuCount; const eligible = this.getHealthyNodes().filter((node) => { if (excludedNodeNames.includes(node.nodeName)) { return false; } if (node.role === 'standalone' && node.nodeName !== this.config.nodeName) { return false; } if (node.schedulerState && node.schedulerState !== 'active') { return false; } return node.resources.availableVramGb >= requiredVram && this.hasEligibleTopologyGroup(node.resources.topologyGroups, requiredVram, minGpuCount); }); if (eligible.length === 0) { return null; } eligible.sort((left, right) => { if (left.nodeName === this.config.nodeName) { return -1; } if (right.nodeName === this.config.nodeName) { return 1; } if (right.resources.availableVramGb !== left.resources.availableVramGb) { return right.resources.availableVramGb - left.resources.availableVramGb; } const leftTopologyDelta = Math.abs( left.resources.largestGpuGroupCount - preferredTensorParallel, ); const rightTopologyDelta = Math.abs( right.resources.largestGpuGroupCount - preferredTensorParallel, ); if (leftTopologyDelta !== rightTopologyDelta) { return leftTopologyDelta - rightTopologyDelta; } return left.resources.deploymentCount - right.resources.deploymentCount; }); return eligible[0]; } public getStatus(): IClusterStatusResponse { const models: Record = {}; for (const node of this.getHealthyNodes()) { for (const deployment of node.deployments) { if (!models[deployment.modelId]) { models[deployment.modelId] = []; } models[deployment.modelId].push({ modelId: deployment.modelId, nodeName: node.nodeName, endpoint: deployment.endpoint, healthy: deployment.healthy, engine: deployment.engine, containerId: deployment.containerId, }); } } return { localNode: this.localNode ? this.decorateNode(this.localNode) : null, nodes: this.getAllNodes(), models, desiredDeployments: this.getDesiredDeployments(), }; } private hasEligibleTopologyGroup( groups: IClusterGpuTopologyGroup[], requiredVramGb: number, minGpuCount: number, ): boolean { return groups.some((group) => group.gpuCount >= minGpuCount && group.totalVramGb >= requiredVramGb ); } private getStateFilePath(): string { return `${PATHS.DATA_DIR}/cluster-state.json`; } private getControlStateFilePath(): string { return `${PATHS.DATA_DIR}/cluster-control-state.json`; } private schedulePersist(): void { if (this.persistQueued) { return; } this.persistQueued = true; queueMicrotask(() => { this.persistQueued = false; void this.persistState(); }); } private scheduleControlPersist(): void { if (this.controlPersistQueued) { return; } this.controlPersistQueued = true; queueMicrotask(() => { this.controlPersistQueued = false; void this.persistControlState(); }); } private async persistState(): Promise { try { await fs.mkdir(PATHS.DATA_DIR, { recursive: true }); await fs.writeFile( this.getStateFilePath(), JSON.stringify({ nodes: Array.from(this.knownNodes.values()) }, null, 2), ); } catch { // Persistence failure should not break the control plane. } } private async persistControlState(): Promise { try { await fs.mkdir(PATHS.DATA_DIR, { recursive: true }); await fs.writeFile( this.getControlStateFilePath(), JSON.stringify( { desiredDeployments: this.getDesiredDeployments(), nodeSchedulerStates: Object.fromEntries(this.nodeSchedulerStates.entries()), }, null, 2, ), ); } catch { // Persistence failure should not break the control plane. } } private decorateNode(node: IClusterNodeHeartbeat): IClusterNodeHeartbeat { return { ...node, schedulerState: this.getNodeSchedulerState(node.nodeName), }; } private compareSchedulerState( left: TClusterNodeSchedulerState, right: TClusterNodeSchedulerState, ): number { const order: TClusterNodeSchedulerState[] = ['active', 'cordoned', 'draining']; return order.indexOf(left) - order.indexOf(right); } }