Files
modelgrid/ts/cluster/cluster-manager.ts
T

456 lines
13 KiB
TypeScript

import * as fs from 'node:fs/promises';
import type { IModelCatalogEntry } from '../interfaces/catalog.ts';
import type {
IClusterConfig,
IClusterDesiredDeployment,
IClusterGpuTopologyGroup,
IClusterModelLocation,
IClusterNodeHeartbeat,
IClusterNodeStatus,
IClusterStatusResponse,
TClusterNodeSchedulerState,
} from '../interfaces/cluster.ts';
import { CLUSTER, PATHS } from '../constants.ts';
export class ClusterManager {
private config: IClusterConfig = {
enabled: false,
nodeName: 'modelgrid-local',
role: 'standalone',
bindHost: CLUSTER.DEFAULT_BIND_HOST,
gossipPort: CLUSTER.DEFAULT_GOSSIP_PORT,
heartbeatIntervalMs: CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS,
seedNodes: [],
};
private localNode: IClusterNodeHeartbeat | null = null;
private knownNodes = new Map<string, IClusterNodeHeartbeat>();
private desiredDeployments = new Map<string, IClusterDesiredDeployment>();
private nodeSchedulerStates = new Map<string, TClusterNodeSchedulerState>();
private persistQueued = false;
private controlPersistQueued = false;
public async initialize(): Promise<void> {
try {
const stateContent = await fs.readFile(this.getStateFilePath(), 'utf-8');
const data = JSON.parse(stateContent) as { nodes?: IClusterNodeHeartbeat[] };
for (const node of data.nodes || []) {
this.knownNodes.set(node.nodeName, node);
if (node.nodeName === this.config.nodeName) {
this.localNode = node;
}
}
this.pruneStaleNodes();
} catch {
// No persisted cluster state yet.
}
try {
const controlStateContent = await fs.readFile(this.getControlStateFilePath(), 'utf-8');
const data = JSON.parse(controlStateContent) as {
desiredDeployments?: IClusterDesiredDeployment[];
nodeSchedulerStates?: Record<string, TClusterNodeSchedulerState>;
};
for (const deployment of data.desiredDeployments || []) {
this.desiredDeployments.set(deployment.modelId, deployment);
}
for (const [nodeName, schedulerState] of Object.entries(data.nodeSchedulerStates || {})) {
this.nodeSchedulerStates.set(nodeName, schedulerState);
}
} catch {
// No persisted control state yet.
}
}
public configure(config: IClusterConfig): void {
this.config = {
...config,
heartbeatIntervalMs: config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS,
seedNodes: config.seedNodes || [],
};
}
public getConfig(): IClusterConfig {
return this.config;
}
public isEnabled(): boolean {
return this.config.enabled;
}
public isControlPlane(): boolean {
return this.config.enabled && this.config.role === 'control-plane';
}
public isWorker(): boolean {
return this.config.enabled && this.config.role === 'worker';
}
public getModeLabel(): string {
if (!this.config.enabled) {
return 'standalone';
}
return this.config.role;
}
public getHeartbeatIntervalMs(): number {
return this.config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS;
}
public getAdvertisedEndpoint(): string | undefined {
return this.localNode?.endpoint || this.config.advertiseUrl;
}
public getControlPlaneUrl(): string | undefined {
return this.config.controlPlaneUrl;
}
public getSharedSecret(): string | undefined {
return this.config.sharedSecret || undefined;
}
public updateLocalNode(heartbeat: IClusterNodeHeartbeat): void {
this.localNode = heartbeat;
this.knownNodes.set(heartbeat.nodeName, heartbeat);
this.schedulePersist();
}
public upsertNode(heartbeat: IClusterNodeHeartbeat): void {
this.knownNodes.set(heartbeat.nodeName, heartbeat);
this.schedulePersist();
}
public getLocalNodeStatus(): IClusterNodeStatus {
return {
nodeName: this.config.nodeName,
role: this.config.role,
endpoint: this.getAdvertisedEndpoint(),
healthy: true,
schedulerState: this.getNodeSchedulerState(this.config.nodeName),
};
}
public getLocalNode(): IClusterNodeHeartbeat | null {
return this.localNode;
}
public getNode(nodeName: string): IClusterNodeHeartbeat | null {
const node = this.knownNodes.get(nodeName);
if (!node) {
return null;
}
return this.decorateNode(node);
}
public pruneStaleNodes(): void {
const now = Date.now();
for (const [nodeName, node] of this.knownNodes) {
if (nodeName === this.config.nodeName) {
continue;
}
if (now - node.lastSeenAt > CLUSTER.NODE_STALE_AFTER_MS) {
this.knownNodes.delete(nodeName);
this.schedulePersist();
}
}
}
public getAllNodes(): IClusterNodeHeartbeat[] {
this.pruneStaleNodes();
return Array.from(this.knownNodes.values()).map((node) => this.decorateNode(node)).sort(
(left, right) => {
if (left.nodeName === this.config.nodeName) {
return -1;
}
if (right.nodeName === this.config.nodeName) {
return 1;
}
return left.nodeName.localeCompare(right.nodeName);
},
);
}
public getHealthyNodes(): IClusterNodeHeartbeat[] {
return this.getAllNodes().filter((node) => node.healthy);
}
public getNodeSchedulerState(nodeName: string): TClusterNodeSchedulerState {
return this.nodeSchedulerStates.get(nodeName) || 'active';
}
public setNodeSchedulerState(
nodeName: string,
schedulerState: TClusterNodeSchedulerState,
): TClusterNodeSchedulerState {
this.nodeSchedulerStates.set(nodeName, schedulerState);
this.scheduleControlPersist();
return schedulerState;
}
public getDesiredDeployments(): IClusterDesiredDeployment[] {
return Array.from(this.desiredDeployments.values()).sort((left, right) =>
left.modelId.localeCompare(right.modelId)
);
}
public getDesiredDeployment(modelId: string): IClusterDesiredDeployment | null {
return this.desiredDeployments.get(modelId) || null;
}
public upsertDesiredDeployment(
modelId: string,
desiredReplicas: number,
): IClusterDesiredDeployment {
const deployment: IClusterDesiredDeployment = {
modelId,
desiredReplicas,
updatedAt: Date.now(),
};
this.desiredDeployments.set(modelId, deployment);
this.scheduleControlPersist();
return deployment;
}
public removeDesiredDeployment(modelId: string): boolean {
const removed = this.desiredDeployments.delete(modelId);
if (removed) {
this.scheduleControlPersist();
}
return removed;
}
public getModelLocations(modelId: string): IClusterModelLocation[] {
const locations: IClusterModelLocation[] = [];
for (const node of this.getHealthyNodes()) {
for (const deployment of node.deployments) {
if (deployment.modelId !== modelId || !deployment.healthy) {
continue;
}
locations.push({
modelId,
nodeName: node.nodeName,
endpoint: deployment.endpoint,
healthy: deployment.healthy,
engine: deployment.engine,
containerId: deployment.containerId,
});
}
}
return locations;
}
public getActiveModelLocations(modelId: string): IClusterModelLocation[] {
return this.getModelLocations(modelId).filter((location) =>
this.getNodeSchedulerState(location.nodeName) === 'active'
);
}
public resolveModel(modelId: string): IClusterModelLocation | null {
const locations = this.getModelLocations(modelId);
if (locations.length === 0) {
return null;
}
locations.sort((left, right) => {
const schedulerPreference = this.compareSchedulerState(
this.getNodeSchedulerState(left.nodeName),
this.getNodeSchedulerState(right.nodeName),
);
if (schedulerPreference !== 0) {
return schedulerPreference;
}
if (left.nodeName === this.config.nodeName) {
return -1;
}
if (right.nodeName === this.config.nodeName) {
return 1;
}
return left.nodeName.localeCompare(right.nodeName);
});
return locations[0];
}
public pickNodeForModel(
model: IModelCatalogEntry,
excludedNodeNames: string[] = [],
): IClusterNodeHeartbeat | null {
const requiredVram = model.requirements.minVramGb;
const minGpuCount = model.requirements.minGpuCount || 1;
const preferredTensorParallel = model.launchDefaults?.tensorParallelSize || minGpuCount;
const eligible = this.getHealthyNodes().filter((node) => {
if (excludedNodeNames.includes(node.nodeName)) {
return false;
}
if (node.role === 'standalone' && node.nodeName !== this.config.nodeName) {
return false;
}
if (node.schedulerState && node.schedulerState !== 'active') {
return false;
}
return node.resources.availableVramGb >= requiredVram &&
this.hasEligibleTopologyGroup(node.resources.topologyGroups, requiredVram, minGpuCount);
});
if (eligible.length === 0) {
return null;
}
eligible.sort((left, right) => {
if (left.nodeName === this.config.nodeName) {
return -1;
}
if (right.nodeName === this.config.nodeName) {
return 1;
}
if (right.resources.availableVramGb !== left.resources.availableVramGb) {
return right.resources.availableVramGb - left.resources.availableVramGb;
}
const leftTopologyDelta = Math.abs(
left.resources.largestGpuGroupCount - preferredTensorParallel,
);
const rightTopologyDelta = Math.abs(
right.resources.largestGpuGroupCount - preferredTensorParallel,
);
if (leftTopologyDelta !== rightTopologyDelta) {
return leftTopologyDelta - rightTopologyDelta;
}
return left.resources.deploymentCount - right.resources.deploymentCount;
});
return eligible[0];
}
public getStatus(): IClusterStatusResponse {
const models: Record<string, IClusterModelLocation[]> = {};
for (const node of this.getHealthyNodes()) {
for (const deployment of node.deployments) {
if (!models[deployment.modelId]) {
models[deployment.modelId] = [];
}
models[deployment.modelId].push({
modelId: deployment.modelId,
nodeName: node.nodeName,
endpoint: deployment.endpoint,
healthy: deployment.healthy,
engine: deployment.engine,
containerId: deployment.containerId,
});
}
}
return {
localNode: this.localNode ? this.decorateNode(this.localNode) : null,
nodes: this.getAllNodes(),
models,
desiredDeployments: this.getDesiredDeployments(),
};
}
private hasEligibleTopologyGroup(
groups: IClusterGpuTopologyGroup[],
requiredVramGb: number,
minGpuCount: number,
): boolean {
return groups.some((group) =>
group.gpuCount >= minGpuCount && group.totalVramGb >= requiredVramGb
);
}
private getStateFilePath(): string {
return `${PATHS.DATA_DIR}/cluster-state.json`;
}
private getControlStateFilePath(): string {
return `${PATHS.DATA_DIR}/cluster-control-state.json`;
}
private schedulePersist(): void {
if (this.persistQueued) {
return;
}
this.persistQueued = true;
queueMicrotask(() => {
this.persistQueued = false;
void this.persistState();
});
}
private scheduleControlPersist(): void {
if (this.controlPersistQueued) {
return;
}
this.controlPersistQueued = true;
queueMicrotask(() => {
this.controlPersistQueued = false;
void this.persistControlState();
});
}
private async persistState(): Promise<void> {
try {
await fs.mkdir(PATHS.DATA_DIR, { recursive: true });
await fs.writeFile(
this.getStateFilePath(),
JSON.stringify({ nodes: Array.from(this.knownNodes.values()) }, null, 2),
);
} catch {
// Persistence failure should not break the control plane.
}
}
private async persistControlState(): Promise<void> {
try {
await fs.mkdir(PATHS.DATA_DIR, { recursive: true });
await fs.writeFile(
this.getControlStateFilePath(),
JSON.stringify(
{
desiredDeployments: this.getDesiredDeployments(),
nodeSchedulerStates: Object.fromEntries(this.nodeSchedulerStates.entries()),
},
null,
2,
),
);
} catch {
// Persistence failure should not break the control plane.
}
}
private decorateNode(node: IClusterNodeHeartbeat): IClusterNodeHeartbeat {
return {
...node,
schedulerState: this.getNodeSchedulerState(node.nodeName),
};
}
private compareSchedulerState(
left: TClusterNodeSchedulerState,
right: TClusterNodeSchedulerState,
): number {
const order: TClusterNodeSchedulerState[] = ['active', 'cordoned', 'draining'];
return order.indexOf(left) - order.indexOf(right);
}
}