457 lines
13 KiB
TypeScript
457 lines
13 KiB
TypeScript
import os from 'node:os';
|
|
import * as fs from 'node:fs/promises';
|
|
import type { IModelCatalogEntry } from '../interfaces/catalog.ts';
|
|
import type {
|
|
IClusterConfig,
|
|
IClusterDesiredDeployment,
|
|
IClusterGpuTopologyGroup,
|
|
IClusterModelLocation,
|
|
IClusterNodeHeartbeat,
|
|
IClusterNodeStatus,
|
|
IClusterStatusResponse,
|
|
TClusterNodeSchedulerState,
|
|
} from '../interfaces/cluster.ts';
|
|
import { CLUSTER, PATHS } from '../constants.ts';
|
|
|
|
export class ClusterManager {
|
|
private config: IClusterConfig = {
|
|
enabled: false,
|
|
nodeName: os.hostname(),
|
|
role: 'standalone',
|
|
bindHost: CLUSTER.DEFAULT_BIND_HOST,
|
|
gossipPort: CLUSTER.DEFAULT_GOSSIP_PORT,
|
|
heartbeatIntervalMs: CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS,
|
|
seedNodes: [],
|
|
};
|
|
private localNode: IClusterNodeHeartbeat | null = null;
|
|
private knownNodes = new Map<string, IClusterNodeHeartbeat>();
|
|
private desiredDeployments = new Map<string, IClusterDesiredDeployment>();
|
|
private nodeSchedulerStates = new Map<string, TClusterNodeSchedulerState>();
|
|
private persistQueued = false;
|
|
private controlPersistQueued = false;
|
|
|
|
public async initialize(): Promise<void> {
|
|
try {
|
|
const stateContent = await fs.readFile(this.getStateFilePath(), 'utf-8');
|
|
const data = JSON.parse(stateContent) as { nodes?: IClusterNodeHeartbeat[] };
|
|
|
|
for (const node of data.nodes || []) {
|
|
this.knownNodes.set(node.nodeName, node);
|
|
if (node.nodeName === this.config.nodeName) {
|
|
this.localNode = node;
|
|
}
|
|
}
|
|
|
|
this.pruneStaleNodes();
|
|
} catch {
|
|
// No persisted cluster state yet.
|
|
}
|
|
|
|
try {
|
|
const controlStateContent = await fs.readFile(this.getControlStateFilePath(), 'utf-8');
|
|
const data = JSON.parse(controlStateContent) as {
|
|
desiredDeployments?: IClusterDesiredDeployment[];
|
|
nodeSchedulerStates?: Record<string, TClusterNodeSchedulerState>;
|
|
};
|
|
|
|
for (const deployment of data.desiredDeployments || []) {
|
|
this.desiredDeployments.set(deployment.modelId, deployment);
|
|
}
|
|
|
|
for (const [nodeName, schedulerState] of Object.entries(data.nodeSchedulerStates || {})) {
|
|
this.nodeSchedulerStates.set(nodeName, schedulerState);
|
|
}
|
|
} catch {
|
|
// No persisted control state yet.
|
|
}
|
|
}
|
|
|
|
public configure(config: IClusterConfig): void {
|
|
this.config = {
|
|
...config,
|
|
heartbeatIntervalMs: config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS,
|
|
seedNodes: config.seedNodes || [],
|
|
};
|
|
}
|
|
|
|
public getConfig(): IClusterConfig {
|
|
return this.config;
|
|
}
|
|
|
|
public isEnabled(): boolean {
|
|
return this.config.enabled;
|
|
}
|
|
|
|
public isControlPlane(): boolean {
|
|
return this.config.enabled && this.config.role === 'control-plane';
|
|
}
|
|
|
|
public isWorker(): boolean {
|
|
return this.config.enabled && this.config.role === 'worker';
|
|
}
|
|
|
|
public getModeLabel(): string {
|
|
if (!this.config.enabled) {
|
|
return 'standalone';
|
|
}
|
|
|
|
return this.config.role;
|
|
}
|
|
|
|
public getHeartbeatIntervalMs(): number {
|
|
return this.config.heartbeatIntervalMs || CLUSTER.DEFAULT_HEARTBEAT_INTERVAL_MS;
|
|
}
|
|
|
|
public getAdvertisedEndpoint(): string | undefined {
|
|
return this.localNode?.endpoint || this.config.advertiseUrl;
|
|
}
|
|
|
|
public getControlPlaneUrl(): string | undefined {
|
|
return this.config.controlPlaneUrl;
|
|
}
|
|
|
|
public getSharedSecret(): string | undefined {
|
|
return this.config.sharedSecret || undefined;
|
|
}
|
|
|
|
public updateLocalNode(heartbeat: IClusterNodeHeartbeat): void {
|
|
this.localNode = heartbeat;
|
|
this.knownNodes.set(heartbeat.nodeName, heartbeat);
|
|
this.schedulePersist();
|
|
}
|
|
|
|
public upsertNode(heartbeat: IClusterNodeHeartbeat): void {
|
|
this.knownNodes.set(heartbeat.nodeName, heartbeat);
|
|
this.schedulePersist();
|
|
}
|
|
|
|
public getLocalNodeStatus(): IClusterNodeStatus {
|
|
return {
|
|
nodeName: this.config.nodeName,
|
|
role: this.config.role,
|
|
endpoint: this.getAdvertisedEndpoint(),
|
|
healthy: true,
|
|
schedulerState: this.getNodeSchedulerState(this.config.nodeName),
|
|
};
|
|
}
|
|
|
|
public getLocalNode(): IClusterNodeHeartbeat | null {
|
|
return this.localNode;
|
|
}
|
|
|
|
public getNode(nodeName: string): IClusterNodeHeartbeat | null {
|
|
const node = this.knownNodes.get(nodeName);
|
|
if (!node) {
|
|
return null;
|
|
}
|
|
|
|
return this.decorateNode(node);
|
|
}
|
|
|
|
public pruneStaleNodes(): void {
|
|
const now = Date.now();
|
|
for (const [nodeName, node] of this.knownNodes) {
|
|
if (nodeName === this.config.nodeName) {
|
|
continue;
|
|
}
|
|
|
|
if (now - node.lastSeenAt > CLUSTER.NODE_STALE_AFTER_MS) {
|
|
this.knownNodes.delete(nodeName);
|
|
this.schedulePersist();
|
|
}
|
|
}
|
|
}
|
|
|
|
public getAllNodes(): IClusterNodeHeartbeat[] {
|
|
this.pruneStaleNodes();
|
|
return Array.from(this.knownNodes.values()).map((node) => this.decorateNode(node)).sort(
|
|
(left, right) => {
|
|
if (left.nodeName === this.config.nodeName) {
|
|
return -1;
|
|
}
|
|
if (right.nodeName === this.config.nodeName) {
|
|
return 1;
|
|
}
|
|
return left.nodeName.localeCompare(right.nodeName);
|
|
},
|
|
);
|
|
}
|
|
|
|
public getHealthyNodes(): IClusterNodeHeartbeat[] {
|
|
return this.getAllNodes().filter((node) => node.healthy);
|
|
}
|
|
|
|
public getNodeSchedulerState(nodeName: string): TClusterNodeSchedulerState {
|
|
return this.nodeSchedulerStates.get(nodeName) || 'active';
|
|
}
|
|
|
|
public setNodeSchedulerState(
|
|
nodeName: string,
|
|
schedulerState: TClusterNodeSchedulerState,
|
|
): TClusterNodeSchedulerState {
|
|
this.nodeSchedulerStates.set(nodeName, schedulerState);
|
|
this.scheduleControlPersist();
|
|
return schedulerState;
|
|
}
|
|
|
|
public getDesiredDeployments(): IClusterDesiredDeployment[] {
|
|
return Array.from(this.desiredDeployments.values()).sort((left, right) =>
|
|
left.modelId.localeCompare(right.modelId)
|
|
);
|
|
}
|
|
|
|
public getDesiredDeployment(modelId: string): IClusterDesiredDeployment | null {
|
|
return this.desiredDeployments.get(modelId) || null;
|
|
}
|
|
|
|
public upsertDesiredDeployment(
|
|
modelId: string,
|
|
desiredReplicas: number,
|
|
): IClusterDesiredDeployment {
|
|
const deployment: IClusterDesiredDeployment = {
|
|
modelId,
|
|
desiredReplicas,
|
|
updatedAt: Date.now(),
|
|
};
|
|
this.desiredDeployments.set(modelId, deployment);
|
|
this.scheduleControlPersist();
|
|
return deployment;
|
|
}
|
|
|
|
public removeDesiredDeployment(modelId: string): boolean {
|
|
const removed = this.desiredDeployments.delete(modelId);
|
|
if (removed) {
|
|
this.scheduleControlPersist();
|
|
}
|
|
return removed;
|
|
}
|
|
|
|
public getModelLocations(modelId: string): IClusterModelLocation[] {
|
|
const locations: IClusterModelLocation[] = [];
|
|
|
|
for (const node of this.getHealthyNodes()) {
|
|
for (const deployment of node.deployments) {
|
|
if (deployment.modelId !== modelId || !deployment.healthy) {
|
|
continue;
|
|
}
|
|
|
|
locations.push({
|
|
modelId,
|
|
nodeName: node.nodeName,
|
|
endpoint: deployment.endpoint,
|
|
healthy: deployment.healthy,
|
|
engine: deployment.engine,
|
|
containerId: deployment.containerId,
|
|
});
|
|
}
|
|
}
|
|
|
|
return locations;
|
|
}
|
|
|
|
public getActiveModelLocations(modelId: string): IClusterModelLocation[] {
|
|
return this.getModelLocations(modelId).filter((location) =>
|
|
this.getNodeSchedulerState(location.nodeName) === 'active'
|
|
);
|
|
}
|
|
|
|
public resolveModel(modelId: string): IClusterModelLocation | null {
|
|
const locations = this.getModelLocations(modelId);
|
|
if (locations.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
locations.sort((left, right) => {
|
|
const schedulerPreference = this.compareSchedulerState(
|
|
this.getNodeSchedulerState(left.nodeName),
|
|
this.getNodeSchedulerState(right.nodeName),
|
|
);
|
|
if (schedulerPreference !== 0) {
|
|
return schedulerPreference;
|
|
}
|
|
|
|
if (left.nodeName === this.config.nodeName) {
|
|
return -1;
|
|
}
|
|
if (right.nodeName === this.config.nodeName) {
|
|
return 1;
|
|
}
|
|
return left.nodeName.localeCompare(right.nodeName);
|
|
});
|
|
|
|
return locations[0];
|
|
}
|
|
|
|
public pickNodeForModel(
|
|
model: IModelCatalogEntry,
|
|
excludedNodeNames: string[] = [],
|
|
): IClusterNodeHeartbeat | null {
|
|
const requiredVram = model.requirements.minVramGb;
|
|
const minGpuCount = model.requirements.minGpuCount || 1;
|
|
const preferredTensorParallel = model.launchDefaults?.tensorParallelSize || minGpuCount;
|
|
|
|
const eligible = this.getHealthyNodes().filter((node) => {
|
|
if (excludedNodeNames.includes(node.nodeName)) {
|
|
return false;
|
|
}
|
|
|
|
if (node.role === 'standalone' && node.nodeName !== this.config.nodeName) {
|
|
return false;
|
|
}
|
|
|
|
if (node.schedulerState && node.schedulerState !== 'active') {
|
|
return false;
|
|
}
|
|
|
|
return node.resources.availableVramGb >= requiredVram &&
|
|
this.hasEligibleTopologyGroup(node.resources.topologyGroups, requiredVram, minGpuCount);
|
|
});
|
|
|
|
if (eligible.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
eligible.sort((left, right) => {
|
|
if (left.nodeName === this.config.nodeName) {
|
|
return -1;
|
|
}
|
|
if (right.nodeName === this.config.nodeName) {
|
|
return 1;
|
|
}
|
|
|
|
if (right.resources.availableVramGb !== left.resources.availableVramGb) {
|
|
return right.resources.availableVramGb - left.resources.availableVramGb;
|
|
}
|
|
|
|
const leftTopologyDelta = Math.abs(
|
|
left.resources.largestGpuGroupCount - preferredTensorParallel,
|
|
);
|
|
const rightTopologyDelta = Math.abs(
|
|
right.resources.largestGpuGroupCount - preferredTensorParallel,
|
|
);
|
|
if (leftTopologyDelta !== rightTopologyDelta) {
|
|
return leftTopologyDelta - rightTopologyDelta;
|
|
}
|
|
|
|
return left.resources.deploymentCount - right.resources.deploymentCount;
|
|
});
|
|
|
|
return eligible[0];
|
|
}
|
|
|
|
public getStatus(): IClusterStatusResponse {
|
|
const models: Record<string, IClusterModelLocation[]> = {};
|
|
for (const node of this.getHealthyNodes()) {
|
|
for (const deployment of node.deployments) {
|
|
if (!models[deployment.modelId]) {
|
|
models[deployment.modelId] = [];
|
|
}
|
|
|
|
models[deployment.modelId].push({
|
|
modelId: deployment.modelId,
|
|
nodeName: node.nodeName,
|
|
endpoint: deployment.endpoint,
|
|
healthy: deployment.healthy,
|
|
engine: deployment.engine,
|
|
containerId: deployment.containerId,
|
|
});
|
|
}
|
|
}
|
|
|
|
return {
|
|
localNode: this.localNode ? this.decorateNode(this.localNode) : null,
|
|
nodes: this.getAllNodes(),
|
|
models,
|
|
desiredDeployments: this.getDesiredDeployments(),
|
|
};
|
|
}
|
|
|
|
private hasEligibleTopologyGroup(
|
|
groups: IClusterGpuTopologyGroup[],
|
|
requiredVramGb: number,
|
|
minGpuCount: number,
|
|
): boolean {
|
|
return groups.some((group) =>
|
|
group.gpuCount >= minGpuCount && group.totalVramGb >= requiredVramGb
|
|
);
|
|
}
|
|
|
|
private getStateFilePath(): string {
|
|
return `${PATHS.DATA_DIR}/cluster-state.json`;
|
|
}
|
|
|
|
private getControlStateFilePath(): string {
|
|
return `${PATHS.DATA_DIR}/cluster-control-state.json`;
|
|
}
|
|
|
|
private schedulePersist(): void {
|
|
if (this.persistQueued) {
|
|
return;
|
|
}
|
|
|
|
this.persistQueued = true;
|
|
queueMicrotask(() => {
|
|
this.persistQueued = false;
|
|
void this.persistState();
|
|
});
|
|
}
|
|
|
|
private scheduleControlPersist(): void {
|
|
if (this.controlPersistQueued) {
|
|
return;
|
|
}
|
|
|
|
this.controlPersistQueued = true;
|
|
queueMicrotask(() => {
|
|
this.controlPersistQueued = false;
|
|
void this.persistControlState();
|
|
});
|
|
}
|
|
|
|
private async persistState(): Promise<void> {
|
|
try {
|
|
await fs.mkdir(PATHS.DATA_DIR, { recursive: true });
|
|
await fs.writeFile(
|
|
this.getStateFilePath(),
|
|
JSON.stringify({ nodes: Array.from(this.knownNodes.values()) }, null, 2),
|
|
);
|
|
} catch {
|
|
// Persistence failure should not break the control plane.
|
|
}
|
|
}
|
|
|
|
private async persistControlState(): Promise<void> {
|
|
try {
|
|
await fs.mkdir(PATHS.DATA_DIR, { recursive: true });
|
|
await fs.writeFile(
|
|
this.getControlStateFilePath(),
|
|
JSON.stringify(
|
|
{
|
|
desiredDeployments: this.getDesiredDeployments(),
|
|
nodeSchedulerStates: Object.fromEntries(this.nodeSchedulerStates.entries()),
|
|
},
|
|
null,
|
|
2,
|
|
),
|
|
);
|
|
} catch {
|
|
// Persistence failure should not break the control plane.
|
|
}
|
|
}
|
|
|
|
private decorateNode(node: IClusterNodeHeartbeat): IClusterNodeHeartbeat {
|
|
return {
|
|
...node,
|
|
schedulerState: this.getNodeSchedulerState(node.nodeName),
|
|
};
|
|
}
|
|
|
|
private compareSchedulerState(
|
|
left: TClusterNodeSchedulerState,
|
|
right: TClusterNodeSchedulerState,
|
|
): number {
|
|
const order: TClusterNodeSchedulerState[] = ['active', 'cordoned', 'draining'];
|
|
return order.indexOf(left) - order.indexOf(right);
|
|
}
|
|
}
|