import type { IClusterDesiredDeployment, IClusterEnsureResponse, IClusterNodeHeartbeat, IClusterNodeResources, IClusterStatusResponse, TClusterNodeSchedulerState, } from '../interfaces/cluster.ts'; import { ContainerManager } from '../containers/container-manager.ts'; import { GpuDetector } from '../hardware/gpu-detector.ts'; import { logger } from '../logger.ts'; import { ModelRegistry } from '../models/registry.ts'; import { ModelLoader } from '../models/loader.ts'; import { CLUSTER } from '../constants.ts'; import { filterOutUsedGpus, summarizeGpuTopologyGroups } from './placement.ts'; import { ClusterManager } from './cluster-manager.ts'; export class ClusterCoordinator { private clusterManager: ClusterManager; private containerManager: ContainerManager; private modelRegistry: ModelRegistry; private modelLoader: ModelLoader; private gpuDetector: GpuDetector; constructor( clusterManager: ClusterManager, containerManager: ContainerManager, modelRegistry: ModelRegistry, modelLoader: ModelLoader, ) { this.clusterManager = clusterManager; this.containerManager = containerManager; this.modelRegistry = modelRegistry; this.modelLoader = modelLoader; this.gpuDetector = new GpuDetector(); } public async buildLocalHeartbeat(endpoint: string): Promise { const [gpus, statuses, models] = await Promise.all([ this.gpuDetector.detectGpus(), this.containerManager.getAllStatus(), this.containerManager.getAllAvailableModels(), ]); const deploymentCount = Array.from(statuses.values()).filter((status) => status.running).length; const runningContainers = this.containerManager.getAllContainers().filter((container) => { const status = statuses.get(container.getConfig().id); return status?.running === true; }); const resources = await this.buildResourceSummary( gpus, deploymentCount, models, runningContainers, ); return { nodeName: this.clusterManager.getConfig().nodeName, role: this.clusterManager.getConfig().role, endpoint, healthy: true, resources, deployments: Array.from(models.entries()).map(([modelId, endpoints]) => ({ modelId, engine: 'vllm' as const, endpoint, healthy: endpoints.some((entry) => entry.healthy), containerId: endpoints[0]?.containerId, })), lastSeenAt: Date.now(), }; } public async syncLocalState(endpoint: string): Promise { const heartbeat = await this.buildLocalHeartbeat(endpoint); this.clusterManager.updateLocalNode(heartbeat); return heartbeat; } public async sendHeartbeat(): Promise { if (!this.clusterManager.isEnabled()) { return; } const endpoint = this.clusterManager.getAdvertisedEndpoint(); const controlPlaneUrl = this.clusterManager.getControlPlaneUrl(); if (!endpoint || !controlPlaneUrl) { return; } if (controlPlaneUrl === endpoint) { return; } const heartbeat = await this.syncLocalState(endpoint); try { await fetch(`${controlPlaneUrl}/_cluster/nodes/heartbeat`, { method: 'POST', headers: this.buildClusterHeaders(), body: JSON.stringify(heartbeat), }); } catch (error) { logger.warn( `Cluster heartbeat failed: ${error instanceof Error ? error.message : String(error)}`, ); } } public acceptHeartbeat(heartbeat: IClusterNodeHeartbeat): void { this.clusterManager.upsertNode(heartbeat); } public getStatus(): IClusterStatusResponse { return this.clusterManager.getStatus(); } public getDesiredDeployments(): IClusterDesiredDeployment[] { return this.clusterManager.getDesiredDeployments(); } public getLocalNodeName(): string { return this.clusterManager.getConfig().nodeName; } public getSharedSecret(): string | undefined { return this.clusterManager.getSharedSecret(); } public setNodeSchedulerState( nodeName: string, schedulerState: TClusterNodeSchedulerState, ): TClusterNodeSchedulerState { return this.clusterManager.setNodeSchedulerState(nodeName, schedulerState); } public async setDesiredReplicas( modelName: string, desiredReplicas: number, ): Promise { const model = await this.modelRegistry.getModel(modelName); if (!model) { return null; } if (desiredReplicas <= 0) { this.clusterManager.removeDesiredDeployment(model.id); return { modelId: model.id, desiredReplicas: 0, updatedAt: Date.now(), }; } return this.clusterManager.upsertDesiredDeployment(model.id, Math.max(desiredReplicas, 0)); } public async clearDesiredDeployment(modelName: string): Promise { const model = await this.modelRegistry.getModel(modelName); if (!model) { return false; } return this.clusterManager.removeDesiredDeployment(model.id); } public shouldDeployLocallyFirst(): boolean { if (!this.clusterManager.isEnabled()) { return true; } return this.clusterManager.isControlPlane() || !this.clusterManager.getControlPlaneUrl(); } public canManageClusterState(): boolean { return !this.clusterManager.isEnabled() || this.clusterManager.isControlPlane(); } public async resolveModel(modelName: string): Promise { const model = await this.modelRegistry.getModel(modelName); if (!model) { return null; } const location = this.clusterManager.resolveModel(model.id); if (!location) { return null; } return { model: model.id, location, created: false, }; } public async ensureModel(modelName: string): Promise { const model = await this.modelRegistry.getModel(modelName); if (!model) { return null; } this.rememberDesiredDeployment(model.id, model.launchDefaults?.replicas || 1); const existing = this.clusterManager.getActiveModelLocations(model.id)[0]; if (existing) { return { model: model.id, location: existing, created: false, }; } if (!this.clusterManager.isEnabled() || !this.clusterManager.isControlPlane()) { const local = await this.deployModelLocally(model.id); if (!local) { return null; } return local; } const targetNode = this.clusterManager.pickNodeForModel(model); if (!targetNode) { return null; } if (targetNode.nodeName === this.clusterManager.getConfig().nodeName) { return this.deployModelLocally(model.id); } return this.requestRemoteDeployment(targetNode.endpoint, model.id); } public async ensureModelViaControlPlane( modelName: string, ): Promise { const controlPlaneUrl = this.clusterManager.getControlPlaneUrl(); const localEndpoint = this.clusterManager.getAdvertisedEndpoint(); if (!controlPlaneUrl || controlPlaneUrl === localEndpoint) { return this.ensureModel(modelName); } try { const response = await fetch(`${controlPlaneUrl}/_cluster/models/ensure`, { method: 'POST', headers: this.buildClusterHeaders(), body: JSON.stringify({ model: modelName }), }); if (!response.ok) { return null; } return await response.json() as IClusterEnsureResponse; } catch { return null; } } public async deployModelLocally(modelName: string): Promise { const model = await this.modelRegistry.getModel(modelName); if (model) { this.rememberDesiredDeployment(model.id, model.launchDefaults?.replicas || 1); } const result = await this.modelLoader.loadModel(modelName); if (!result.success) { return null; } const endpoint = this.clusterManager.getAdvertisedEndpoint(); if (endpoint) { await this.syncLocalState(endpoint); } const resolved = await this.resolveModel(result.model); if (!resolved) { return null; } return { ...resolved, created: !result.alreadyLoaded, }; } public async reconcileDesiredReplicas(): Promise { if (this.clusterManager.isEnabled() && !this.clusterManager.isControlPlane()) { return; } const desiredDeployments = this.clusterManager.getDesiredDeployments(); for (const desiredDeployment of desiredDeployments) { if (desiredDeployment.desiredReplicas <= 0) { continue; } const model = await this.modelRegistry.getModel(desiredDeployment.modelId); if (!model) { continue; } const existingLocations = this.clusterManager.getActiveModelLocations(model.id); const missingReplicas = desiredDeployment.desiredReplicas - existingLocations.length; if (missingReplicas <= 0) { continue; } for (let index = 0; index < missingReplicas; index++) { const targetNode = this.clusterManager.pickNodeForModel(model); if (!targetNode) { break; } const replicaOrdinal = existingLocations.length + index; const result = targetNode.nodeName === this.clusterManager.getConfig().nodeName ? await this.deployReplicaLocally(model.id, replicaOrdinal) : await this.requestRemoteDeployment(targetNode.endpoint, model.id, replicaOrdinal); if (!result) { break; } } } } public async deployReplicaLocally( modelName: string, replicaOrdinal?: number, ): Promise { const model = await this.modelRegistry.getModel(modelName); if (model) { this.rememberDesiredDeployment( model.id, Math.max((replicaOrdinal ?? 0) + 1, model.launchDefaults?.replicas || 1), ); } const result = await this.modelLoader.deployReplica(modelName, replicaOrdinal); if (!result.success) { return null; } const endpoint = this.clusterManager.getAdvertisedEndpoint(); if (endpoint) { await this.syncLocalState(endpoint); } const resolved = await this.resolveModel(result.model); if (!resolved) { return null; } return { ...resolved, created: !result.alreadyLoaded, }; } private async requestRemoteDeployment( nodeEndpoint: string, modelName: string, replicaOrdinal?: number, ): Promise { try { const response = await fetch(`${nodeEndpoint}/_cluster/deployments`, { method: 'POST', headers: this.buildClusterHeaders(), body: JSON.stringify({ model: modelName, replicaOrdinal }), }); if (!response.ok) { return null; } return await response.json() as IClusterEnsureResponse; } catch { return null; } } private async buildResourceSummary( gpus: Awaited>, deploymentCount: number, _models: Awaited>, runningContainers: ReturnType, ): Promise { const totalVramGb = Math.round(gpus.reduce((sum, gpu) => sum + gpu.vram, 0) / 1024); const usedGpuIds = runningContainers.flatMap((container) => container.getConfig().gpuIds); const availableGpus = filterOutUsedGpus(gpus, usedGpuIds); const topologyGroups = summarizeGpuTopologyGroups(availableGpus); const availableVramGb = Math.round( availableGpus.reduce((sum, gpu) => sum + gpu.vram, 0) / 1024, ); const maxSingleGpuVramGb = availableGpus.length > 0 ? Math.max(...availableGpus.map((gpu) => Math.round(gpu.vram / 1024))) : 0; const largestGpuGroupCount = topologyGroups.length > 0 ? Math.max(...topologyGroups.map((group) => group.gpuCount)) : 0; const largestGpuGroupVramGb = topologyGroups.length > 0 ? Math.max(...topologyGroups.map((group) => group.totalVramGb)) : 0; return { gpuCount: gpus.length, totalVramGb, availableVramGb, maxSingleGpuVramGb, largestGpuGroupCount, largestGpuGroupVramGb, deploymentCount, topologyGroups, }; } private buildClusterHeaders(): Record { const headers: Record = { 'Content-Type': 'application/json', }; const sharedSecret = this.clusterManager.getSharedSecret(); if (sharedSecret) { headers[CLUSTER.AUTH_HEADER_NAME] = sharedSecret; } return headers; } private rememberDesiredDeployment(modelId: string, minimumReplicas: number): void { const existing = this.clusterManager.getDesiredDeployment(modelId); const desiredReplicas = Math.max(existing?.desiredReplicas || 0, minimumReplicas, 1); this.clusterManager.upsertDesiredDeployment(modelId, desiredReplicas); } }