9f7308498c
schedulePersist and scheduleControlPersist can fire from configure() and the public scheduling paths before initialize() has completed. Without a guard, those queued microtasks call persistState/persistControlState, which try to mkdir PATHS.DATA_DIR and write state files from tests and short-lived scripts that never meant to touch the data directory. That produced async-leak warnings in the Cluster manager unit tests and left orphan directories on hosts that only constructed a ClusterManager to inspect it. Add an `initialized` flag set at the end of initialize() and early-return from both schedulers when it is false. Real runtime paths always call initialize() during Daemon startup, so this changes no production behavior.
467 lines
13 KiB
TypeScript
467 lines
13 KiB
TypeScript
import * as fs from 'node:fs/promises';
|
|
import type { IModelCatalogEntry } from '../interfaces/catalog.ts';
|
|
import type {
|
|
IClusterConfig,
|
|
IClusterDesiredDeployment,
|
|
IClusterGpuTopologyGroup,
|
|
IClusterModelLocation,
|
|
IClusterNodeHeartbeat,
|
|
IClusterNodeStatus,
|
|
IClusterStatusResponse,
|
|
TClusterNodeSchedulerState,
|
|
} from '../interfaces/cluster.ts';
|
|
import { CLUSTER, PATHS } from '../constants.ts';
|
|
|
|
export class ClusterManager {
|
|
private initialized = false;
|
|
private config: IClusterConfig = {
|
|
enabled: false,
|
|
nodeName: 'modelgrid-local',
|
|
role: 'standalone',
|
|
bindHost: CLUSTER.DEFAULT_BIND_HOST,
|
|
gossipPort: CLUSTER.DEFAULT_GOSSIP_PORT,
|
|
heartbeatIntervalMs: CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS,
|
|
seedNodes: [],
|
|
};
|
|
private localNode: IClusterNodeHeartbeat | null = null;
|
|
private knownNodes = new Map<string, IClusterNodeHeartbeat>();
|
|
private desiredDeployments = new Map<string, IClusterDesiredDeployment>();
|
|
private nodeSchedulerStates = new Map<string, TClusterNodeSchedulerState>();
|
|
private persistQueued = false;
|
|
private controlPersistQueued = false;
|
|
|
|
public async initialize(): Promise<void> {
|
|
try {
|
|
const stateContent = await fs.readFile(this.getStateFilePath(), 'utf-8');
|
|
const data = JSON.parse(stateContent) as { nodes?: IClusterNodeHeartbeat[] };
|
|
|
|
for (const node of data.nodes || []) {
|
|
this.knownNodes.set(node.nodeName, node);
|
|
if (node.nodeName === this.config.nodeName) {
|
|
this.localNode = node;
|
|
}
|
|
}
|
|
|
|
this.pruneStaleNodes();
|
|
} catch {
|
|
// No persisted cluster state yet.
|
|
}
|
|
|
|
try {
|
|
const controlStateContent = await fs.readFile(this.getControlStateFilePath(), 'utf-8');
|
|
const data = JSON.parse(controlStateContent) as {
|
|
desiredDeployments?: IClusterDesiredDeployment[];
|
|
nodeSchedulerStates?: Record<string, TClusterNodeSchedulerState>;
|
|
};
|
|
|
|
for (const deployment of data.desiredDeployments || []) {
|
|
this.desiredDeployments.set(deployment.modelId, deployment);
|
|
}
|
|
|
|
for (const [nodeName, schedulerState] of Object.entries(data.nodeSchedulerStates || {})) {
|
|
this.nodeSchedulerStates.set(nodeName, schedulerState);
|
|
}
|
|
} catch {
|
|
// No persisted control state yet.
|
|
}
|
|
|
|
this.initialized = true;
|
|
}
|
|
|
|
public configure(config: IClusterConfig): void {
|
|
this.config = {
|
|
...config,
|
|
heartbeatIntervalMs: config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS,
|
|
seedNodes: config.seedNodes || [],
|
|
};
|
|
}
|
|
|
|
public getConfig(): IClusterConfig {
|
|
return this.config;
|
|
}
|
|
|
|
public isEnabled(): boolean {
|
|
return this.config.enabled;
|
|
}
|
|
|
|
public isControlPlane(): boolean {
|
|
return this.config.enabled && this.config.role === 'control-plane';
|
|
}
|
|
|
|
public isWorker(): boolean {
|
|
return this.config.enabled && this.config.role === 'worker';
|
|
}
|
|
|
|
public getModeLabel(): string {
|
|
if (!this.config.enabled) {
|
|
return 'standalone';
|
|
}
|
|
|
|
return this.config.role;
|
|
}
|
|
|
|
public getHeartbeatIntervalMs(): number {
|
|
return this.config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS;
|
|
}
|
|
|
|
public getAdvertisedEndpoint(): string | undefined {
|
|
return this.localNode?.endpoint || this.config.advertiseUrl;
|
|
}
|
|
|
|
public getControlPlaneUrl(): string | undefined {
|
|
return this.config.controlPlaneUrl;
|
|
}
|
|
|
|
public getSharedSecret(): string | undefined {
|
|
return this.config.sharedSecret || undefined;
|
|
}
|
|
|
|
public updateLocalNode(heartbeat: IClusterNodeHeartbeat): void {
|
|
this.localNode = heartbeat;
|
|
this.knownNodes.set(heartbeat.nodeName, heartbeat);
|
|
this.schedulePersist();
|
|
}
|
|
|
|
public upsertNode(heartbeat: IClusterNodeHeartbeat): void {
|
|
this.knownNodes.set(heartbeat.nodeName, heartbeat);
|
|
this.schedulePersist();
|
|
}
|
|
|
|
public getLocalNodeStatus(): IClusterNodeStatus {
|
|
return {
|
|
nodeName: this.config.nodeName,
|
|
role: this.config.role,
|
|
endpoint: this.getAdvertisedEndpoint(),
|
|
healthy: true,
|
|
schedulerState: this.getNodeSchedulerState(this.config.nodeName),
|
|
};
|
|
}
|
|
|
|
public getLocalNode(): IClusterNodeHeartbeat | null {
|
|
return this.localNode;
|
|
}
|
|
|
|
public getNode(nodeName: string): IClusterNodeHeartbeat | null {
|
|
const node = this.knownNodes.get(nodeName);
|
|
if (!node) {
|
|
return null;
|
|
}
|
|
|
|
return this.decorateNode(node);
|
|
}
|
|
|
|
public pruneStaleNodes(): void {
|
|
const now = Date.now();
|
|
for (const [nodeName, node] of this.knownNodes) {
|
|
if (nodeName === this.config.nodeName) {
|
|
continue;
|
|
}
|
|
|
|
if (now - node.lastSeenAt > CLUSTER.NODE_STALE_AFTER_MS) {
|
|
this.knownNodes.delete(nodeName);
|
|
this.schedulePersist();
|
|
}
|
|
}
|
|
}
|
|
|
|
public getAllNodes(): IClusterNodeHeartbeat[] {
|
|
this.pruneStaleNodes();
|
|
return Array.from(this.knownNodes.values()).map((node) => this.decorateNode(node)).sort(
|
|
(left, right) => {
|
|
if (left.nodeName === this.config.nodeName) {
|
|
return -1;
|
|
}
|
|
if (right.nodeName === this.config.nodeName) {
|
|
return 1;
|
|
}
|
|
return left.nodeName.localeCompare(right.nodeName);
|
|
},
|
|
);
|
|
}
|
|
|
|
public getHealthyNodes(): IClusterNodeHeartbeat[] {
|
|
return this.getAllNodes().filter((node) => node.healthy);
|
|
}
|
|
|
|
public getNodeSchedulerState(nodeName: string): TClusterNodeSchedulerState {
|
|
return this.nodeSchedulerStates.get(nodeName) || 'active';
|
|
}
|
|
|
|
public setNodeSchedulerState(
|
|
nodeName: string,
|
|
schedulerState: TClusterNodeSchedulerState,
|
|
): TClusterNodeSchedulerState {
|
|
this.nodeSchedulerStates.set(nodeName, schedulerState);
|
|
this.scheduleControlPersist();
|
|
return schedulerState;
|
|
}
|
|
|
|
public getDesiredDeployments(): IClusterDesiredDeployment[] {
|
|
return Array.from(this.desiredDeployments.values()).sort((left, right) =>
|
|
left.modelId.localeCompare(right.modelId)
|
|
);
|
|
}
|
|
|
|
public getDesiredDeployment(modelId: string): IClusterDesiredDeployment | null {
|
|
return this.desiredDeployments.get(modelId) || null;
|
|
}
|
|
|
|
public upsertDesiredDeployment(
|
|
modelId: string,
|
|
desiredReplicas: number,
|
|
): IClusterDesiredDeployment {
|
|
const deployment: IClusterDesiredDeployment = {
|
|
modelId,
|
|
desiredReplicas,
|
|
updatedAt: Date.now(),
|
|
};
|
|
this.desiredDeployments.set(modelId, deployment);
|
|
this.scheduleControlPersist();
|
|
return deployment;
|
|
}
|
|
|
|
public removeDesiredDeployment(modelId: string): boolean {
|
|
const removed = this.desiredDeployments.delete(modelId);
|
|
if (removed) {
|
|
this.scheduleControlPersist();
|
|
}
|
|
return removed;
|
|
}
|
|
|
|
public getModelLocations(modelId: string): IClusterModelLocation[] {
|
|
const locations: IClusterModelLocation[] = [];
|
|
|
|
for (const node of this.getHealthyNodes()) {
|
|
for (const deployment of node.deployments) {
|
|
if (deployment.modelId !== modelId || !deployment.healthy) {
|
|
continue;
|
|
}
|
|
|
|
locations.push({
|
|
modelId,
|
|
nodeName: node.nodeName,
|
|
endpoint: deployment.endpoint,
|
|
healthy: deployment.healthy,
|
|
engine: deployment.engine,
|
|
containerId: deployment.containerId,
|
|
});
|
|
}
|
|
}
|
|
|
|
return locations;
|
|
}
|
|
|
|
public getActiveModelLocations(modelId: string): IClusterModelLocation[] {
|
|
return this.getModelLocations(modelId).filter((location) =>
|
|
this.getNodeSchedulerState(location.nodeName) === 'active'
|
|
);
|
|
}
|
|
|
|
public resolveModel(modelId: string): IClusterModelLocation | null {
|
|
const locations = this.getModelLocations(modelId);
|
|
if (locations.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
locations.sort((left, right) => {
|
|
const schedulerPreference = this.compareSchedulerState(
|
|
this.getNodeSchedulerState(left.nodeName),
|
|
this.getNodeSchedulerState(right.nodeName),
|
|
);
|
|
if (schedulerPreference !== 0) {
|
|
return schedulerPreference;
|
|
}
|
|
|
|
if (left.nodeName === this.config.nodeName) {
|
|
return -1;
|
|
}
|
|
if (right.nodeName === this.config.nodeName) {
|
|
return 1;
|
|
}
|
|
return left.nodeName.localeCompare(right.nodeName);
|
|
});
|
|
|
|
return locations[0];
|
|
}
|
|
|
|
public pickNodeForModel(
|
|
model: IModelCatalogEntry,
|
|
excludedNodeNames: string[] = [],
|
|
): IClusterNodeHeartbeat | null {
|
|
const requiredVram = model.requirements.minVramGb;
|
|
const minGpuCount = model.requirements.minGpuCount || 1;
|
|
const preferredTensorParallel = model.launchDefaults?.tensorParallelSize || minGpuCount;
|
|
|
|
const eligible = this.getHealthyNodes().filter((node) => {
|
|
if (excludedNodeNames.includes(node.nodeName)) {
|
|
return false;
|
|
}
|
|
|
|
if (node.role === 'standalone' && node.nodeName !== this.config.nodeName) {
|
|
return false;
|
|
}
|
|
|
|
if (node.schedulerState && node.schedulerState !== 'active') {
|
|
return false;
|
|
}
|
|
|
|
return node.resources.availableVramGb >= requiredVram &&
|
|
this.hasEligibleTopologyGroup(node.resources.topologyGroups, requiredVram, minGpuCount);
|
|
});
|
|
|
|
if (eligible.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
eligible.sort((left, right) => {
|
|
if (left.nodeName === this.config.nodeName) {
|
|
return -1;
|
|
}
|
|
if (right.nodeName === this.config.nodeName) {
|
|
return 1;
|
|
}
|
|
|
|
if (right.resources.availableVramGb !== left.resources.availableVramGb) {
|
|
return right.resources.availableVramGb - left.resources.availableVramGb;
|
|
}
|
|
|
|
const leftTopologyDelta = Math.abs(
|
|
left.resources.largestGpuGroupCount - preferredTensorParallel,
|
|
);
|
|
const rightTopologyDelta = Math.abs(
|
|
right.resources.largestGpuGroupCount - preferredTensorParallel,
|
|
);
|
|
if (leftTopologyDelta !== rightTopologyDelta) {
|
|
return leftTopologyDelta - rightTopologyDelta;
|
|
}
|
|
|
|
return left.resources.deploymentCount - right.resources.deploymentCount;
|
|
});
|
|
|
|
return eligible[0];
|
|
}
|
|
|
|
public getStatus(): IClusterStatusResponse {
|
|
const models: Record<string, IClusterModelLocation[]> = {};
|
|
for (const node of this.getHealthyNodes()) {
|
|
for (const deployment of node.deployments) {
|
|
if (!models[deployment.modelId]) {
|
|
models[deployment.modelId] = [];
|
|
}
|
|
|
|
models[deployment.modelId].push({
|
|
modelId: deployment.modelId,
|
|
nodeName: node.nodeName,
|
|
endpoint: deployment.endpoint,
|
|
healthy: deployment.healthy,
|
|
engine: deployment.engine,
|
|
containerId: deployment.containerId,
|
|
});
|
|
}
|
|
}
|
|
|
|
return {
|
|
localNode: this.localNode ? this.decorateNode(this.localNode) : null,
|
|
nodes: this.getAllNodes(),
|
|
models,
|
|
desiredDeployments: this.getDesiredDeployments(),
|
|
};
|
|
}
|
|
|
|
private hasEligibleTopologyGroup(
|
|
groups: IClusterGpuTopologyGroup[],
|
|
requiredVramGb: number,
|
|
minGpuCount: number,
|
|
): boolean {
|
|
return groups.some((group) =>
|
|
group.gpuCount >= minGpuCount && group.totalVramGb >= requiredVramGb
|
|
);
|
|
}
|
|
|
|
private getStateFilePath(): string {
|
|
return `${PATHS.DATA_DIR}/cluster-state.json`;
|
|
}
|
|
|
|
private getControlStateFilePath(): string {
|
|
return `${PATHS.DATA_DIR}/cluster-control-state.json`;
|
|
}
|
|
|
|
private schedulePersist(): void {
|
|
if (!this.initialized) {
|
|
return;
|
|
}
|
|
|
|
if (this.persistQueued) {
|
|
return;
|
|
}
|
|
|
|
this.persistQueued = true;
|
|
queueMicrotask(() => {
|
|
this.persistQueued = false;
|
|
void this.persistState();
|
|
});
|
|
}
|
|
|
|
private scheduleControlPersist(): void {
|
|
if (!this.initialized) {
|
|
return;
|
|
}
|
|
|
|
if (this.controlPersistQueued) {
|
|
return;
|
|
}
|
|
|
|
this.controlPersistQueued = true;
|
|
queueMicrotask(() => {
|
|
this.controlPersistQueued = false;
|
|
void this.persistControlState();
|
|
});
|
|
}
|
|
|
|
private async persistState(): Promise<void> {
|
|
try {
|
|
await fs.mkdir(PATHS.DATA_DIR, { recursive: true });
|
|
await fs.writeFile(
|
|
this.getStateFilePath(),
|
|
JSON.stringify({ nodes: Array.from(this.knownNodes.values()) }, null, 2),
|
|
);
|
|
} catch {
|
|
// Persistence failure should not break the control plane.
|
|
}
|
|
}
|
|
|
|
private async persistControlState(): Promise<void> {
|
|
try {
|
|
await fs.mkdir(PATHS.DATA_DIR, { recursive: true });
|
|
await fs.writeFile(
|
|
this.getControlStateFilePath(),
|
|
JSON.stringify(
|
|
{
|
|
desiredDeployments: this.getDesiredDeployments(),
|
|
nodeSchedulerStates: Object.fromEntries(this.nodeSchedulerStates.entries()),
|
|
},
|
|
null,
|
|
2,
|
|
),
|
|
);
|
|
} catch {
|
|
// Persistence failure should not break the control plane.
|
|
}
|
|
}
|
|
|
|
private decorateNode(node: IClusterNodeHeartbeat): IClusterNodeHeartbeat {
|
|
return {
|
|
...node,
|
|
schedulerState: this.getNodeSchedulerState(node.nodeName),
|
|
};
|
|
}
|
|
|
|
private compareSchedulerState(
|
|
left: TClusterNodeSchedulerState,
|
|
right: TClusterNodeSchedulerState,
|
|
): number {
|
|
const order: TClusterNodeSchedulerState[] = ['active', 'cordoned', 'draining'];
|
|
return order.indexOf(left) - order.indexOf(right);
|
|
}
|
|
}
|