Files
coreflow/ts/coreflow.classes.deploymentruntime.ts

391 lines
16 KiB
TypeScript

import * as plugins from './coreflow.plugins.js';
import type { Coreflow } from './coreflow.classes.coreflow.js';
import type * as servezoneInterfaces from '@serve.zone/interfaces';
type TCoreflowDeploymentRequest =
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_GetServiceDeployments
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadFile
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceWriteFile
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadDir
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceMkdir
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceRm
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExists
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec;
export class CoreflowDeploymentRuntimeManager {
constructor(private coreflowRef: Coreflow) {}
public async start() {
const router = this.coreflowRef.cloudlyConnector.cloudlyApiClient.typedrouter as any;
const addHandler = <TRequest extends TCoreflowDeploymentRequest>(
methodArg: TRequest['method'],
handlerArg: (dataArg: TRequest['request']) => Promise<TRequest['response']>,
) => {
router.addTypedHandler(new plugins.typedrequest.TypedHandler<any>(methodArg, handlerArg as any) as any);
};
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_GetServiceDeployments>('coreflowGetServiceDeployments', async (dataArg) => {
return {
deployments: await this.getServiceDeployments(dataArg.service),
};
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment>('coreflowRestartDeployment', async (dataArg) => {
return await this.restartDeployment(dataArg.deploymentId);
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment>('coreflowKillDeployment', async (dataArg) => {
return await this.killDeployment(dataArg.deploymentId);
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadFile>('coreflowDeploymentWorkspaceReadFile', async (dataArg) => {
const result = await this.execInDeployment(dataArg.deploymentId, ['cat', dataArg.path]);
if (!result.found) return { found: false };
if (result.exitCode !== 0) {
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to read file');
}
return { found: true, content: result.stdout };
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceWriteFile>('coreflowDeploymentWorkspaceWriteFile', async (dataArg) => {
const contentArg = this.shellQuote(String(dataArg.content || ''));
const pathArg = this.shellQuote(dataArg.path);
const result = await this.execInDeployment(dataArg.deploymentId, [
'sh',
'-c',
`printf '%s' ${contentArg} > ${pathArg}`,
]);
if (!result.found) return { found: false };
if (result.exitCode !== 0) {
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to write file');
}
return { found: true };
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadDir>('coreflowDeploymentWorkspaceReadDir', async (dataArg) => {
const result = await this.execInDeployment(dataArg.deploymentId, ['ls', '-1', '-F', dataArg.path]);
if (!result.found) return { found: false };
if (result.exitCode !== 0) {
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to read directory');
}
const basePath = String(dataArg.path || '/').endsWith('/') ? dataArg.path : `${dataArg.path}/`;
const entries: servezoneInterfaces.requests.deployment.IDeploymentWorkspaceFileEntry[] = (result.stdout || '')
.split('\n')
.filter((lineArg) => lineArg.trim())
.map((lineArg) => {
const isDirectory = lineArg.endsWith('/');
const name = isDirectory ? lineArg.slice(0, -1) : lineArg.replace(/[*@=|]$/, '');
return {
type: isDirectory ? 'directory' as const : 'file' as const,
name,
path: `${basePath}${name}`,
};
});
return { found: true, entries };
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceMkdir>('coreflowDeploymentWorkspaceMkdir', async (dataArg) => {
const result = await this.execInDeployment(dataArg.deploymentId, ['mkdir', '-p', dataArg.path]);
if (!result.found) return { found: false };
if (result.exitCode !== 0) {
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to create directory');
}
return { found: true };
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceRm>('coreflowDeploymentWorkspaceRm', async (dataArg) => {
const result = await this.execInDeployment(dataArg.deploymentId, [
'rm',
dataArg.recursive ? '-rf' : '-f',
dataArg.path,
]);
if (!result.found) return { found: false };
if (result.exitCode !== 0) {
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to remove path');
}
return { found: true };
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExists>('coreflowDeploymentWorkspaceExists', async (dataArg) => {
const result = await this.execInDeployment(dataArg.deploymentId, ['test', '-e', dataArg.path]);
if (!result.found) return { found: false };
return { found: true, exists: result.exitCode === 0 };
});
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec>('coreflowDeploymentWorkspaceExec', async (dataArg) => {
const command = [dataArg.command, ...(dataArg.args || [])];
return await this.execInDeployment(dataArg.deploymentId, command);
});
}
public async stop() {}
private async getServiceDeployments(
serviceArg: plugins.servezoneInterfaces.data.IService,
): Promise<plugins.servezoneInterfaces.data.IDeployment[]> {
const dockerService = await this.getDockerServiceByNameOrNull(serviceArg.data.name);
if (!dockerService) {
return [];
}
const [tasks, nodeNames] = await Promise.all([
this.listTasksForService(dockerService.ID),
this.getNodeNameMap(),
]);
const deployments: plugins.servezoneInterfaces.data.IDeployment[] = [];
for (const task of tasks) {
deployments.push(await this.taskToDeployment(task, serviceArg, dockerService.ID, nodeNames));
}
return deployments;
}
private async restartDeployment(
deploymentIdArg: string,
): Promise<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment['response']> {
const resolved = await this.resolveContainerForDeployment(deploymentIdArg);
if (!resolved) return { found: false };
const response = await this.coreflowRef.dockerHost.request(
'POST',
`/containers/${resolved.container.Id}/restart`,
);
if (response.statusCode >= 300) {
throw new plugins.typedrequest.TypedResponseError(`Failed to restart container: ${response.statusCode}`);
}
return {
found: true,
deployment: await this.taskToDeployment(resolved.task, resolved.service, resolved.dockerServiceId, await this.getNodeNameMap()),
};
}
private async killDeployment(
deploymentIdArg: string,
): Promise<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment['response']> {
const resolved = await this.resolveContainerForDeployment(deploymentIdArg);
if (!resolved) return { found: false };
const response = await this.coreflowRef.dockerHost.request(
'POST',
`/containers/${resolved.container.Id}/kill`,
);
if (response.statusCode >= 300) {
throw new plugins.typedrequest.TypedResponseError(`Failed to kill container: ${response.statusCode}`);
}
return {
found: true,
deployment: await this.taskToDeployment(resolved.task, resolved.service, resolved.dockerServiceId, await this.getNodeNameMap()),
};
}
private async execInDeployment(
deploymentIdArg: string,
commandArg: string[],
): Promise<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec['response']> {
const resolved = await this.resolveContainerForDeployment(deploymentIdArg);
if (!resolved) return { found: false };
const exec = await resolved.container.exec(commandArg, {
tty: true,
attachStdin: false,
attachStdout: true,
attachStderr: true,
});
const stdout = await this.streamToString(exec.stream);
const inspect = await exec.inspect();
await exec.close();
return {
found: true,
stdout,
stderr: '',
exitCode: inspect.ExitCode ?? 0,
};
}
private shellQuote(valueArg: string) {
return `'${valueArg.replace(/'/g, `'\\''`)}'`;
}
private async resolveContainerForDeployment(deploymentIdArg: string) {
const containers = await this.coreflowRef.dockerHost.listContainers();
const directContainer = containers.find((containerArg) => {
return containerArg.Id === deploymentIdArg || containerArg.Id.startsWith(deploymentIdArg);
});
const services = await this.coreflowRef.cloudlyConnector.cloudlyApiClient.services.getServices() as unknown as plugins.servezoneInterfaces.data.IService[];
const tasks = await this.listTasks();
for (const service of services) {
const dockerService = await this.getDockerServiceByNameOrNull(service.data.name);
if (!dockerService) continue;
const task = tasks.find((taskArg) => {
return taskArg.ServiceID === dockerService.ID && (
taskArg.ID === deploymentIdArg ||
taskArg.ID?.startsWith(deploymentIdArg) ||
taskArg.Status?.ContainerStatus?.ContainerID === deploymentIdArg ||
taskArg.Status?.ContainerStatus?.ContainerID?.startsWith(deploymentIdArg)
);
});
const containerId = task?.Status?.ContainerStatus?.ContainerID;
const container = containerId
? containers.find((containerArg) => containerArg.Id === containerId || containerArg.Id.startsWith(containerId))
: directContainer;
if (task && container) {
return { service, task, dockerServiceId: dockerService.ID, container };
}
}
if (directContainer) {
const service = services.find((serviceArg) => {
return directContainer.Labels?.['com.docker.swarm.service.name'] === serviceArg.data.name;
});
if (service) {
const dockerService = await this.getDockerServiceByNameOrNull(service.data.name);
if (dockerService) {
const task = tasks.find((taskArg) => taskArg.Status?.ContainerStatus?.ContainerID === directContainer.Id) || {
ID: directContainer.Id,
ServiceID: dockerService.ID,
NodeID: '',
DesiredState: directContainer.State,
Status: {
State: directContainer.State,
ContainerStatus: { ContainerID: directContainer.Id },
},
CreatedAt: new Date((directContainer.Created || Date.now() / 1000) * 1000).toISOString(),
UpdatedAt: new Date().toISOString(),
};
return { service, task, dockerServiceId: dockerService.ID, container: directContainer };
}
}
}
return null;
}
private async taskToDeployment(
taskArg: any,
serviceArg: plugins.servezoneInterfaces.data.IService,
dockerServiceIdArg: string,
nodeNamesArg: Map<string, string>,
): Promise<plugins.servezoneInterfaces.data.IDeployment> {
const containerId = taskArg.Status?.ContainerStatus?.ContainerID;
let resourceUsage: plugins.servezoneInterfaces.data.IDeployment['resourceUsage'];
if (containerId) {
const container = await this.coreflowRef.dockerHost.getContainerById(containerId).catch(() => undefined);
if (container) {
resourceUsage = await this.getContainerResourceUsage(container).catch(() => undefined);
}
}
const deployedAt = Date.parse(taskArg.CreatedAt || '') || Date.now();
const updatedAt = Date.parse(taskArg.UpdatedAt || '') || deployedAt;
const status = this.mapTaskStatus(taskArg.Status?.State || taskArg.DesiredState);
return {
id: taskArg.ID,
taskId: taskArg.ID,
serviceId: serviceArg.id,
serviceName: serviceArg.data.name,
nodeId: taskArg.NodeID || '',
nodeName: nodeNamesArg.get(taskArg.NodeID) || taskArg.NodeID || '',
containerId,
usedImageId: serviceArg.data.imageId,
version: serviceArg.data.imageVersion || taskArg.Spec?.ContainerSpec?.Image || 'latest',
deployedAt,
updatedAt,
deploymentLog: [taskArg.Status?.Message || `Docker task ${taskArg.ID}`],
status,
healthStatus: status === 'running' ? 'healthy' : status === 'failed' ? 'unhealthy' : 'unknown',
resourceUsage,
slot: taskArg.Slot,
desiredState: taskArg.DesiredState,
dockerServiceId: dockerServiceIdArg,
};
}
private async getContainerResourceUsage(containerArg: plugins.docker.DockerContainer) {
const stats = await containerArg.stats({ stream: false });
const cpuDelta = (stats.cpu_stats?.cpu_usage?.total_usage || 0) - (stats.precpu_stats?.cpu_usage?.total_usage || 0);
const systemDelta = (stats.cpu_stats?.system_cpu_usage || 0) - (stats.precpu_stats?.system_cpu_usage || 0);
const onlineCpus = stats.cpu_stats?.online_cpus || stats.cpu_stats?.cpu_usage?.percpu_usage?.length || 1;
const cpuUsagePercent = systemDelta > 0 && cpuDelta > 0
? (cpuDelta / systemDelta) * onlineCpus * 100
: 0;
return {
cpuUsagePercent,
memoryUsedMB: Math.round((stats.memory_stats?.usage || 0) / 1024 / 1024),
lastUpdated: Date.now(),
};
}
private mapTaskStatus(statusArg: string): plugins.servezoneInterfaces.data.IDeployment['status'] {
switch (statusArg) {
case 'running':
return 'running';
case 'new':
case 'pending':
case 'assigned':
case 'accepted':
case 'preparing':
case 'ready':
case 'starting':
return 'starting';
case 'shutdown':
case 'complete':
case 'remove':
return 'stopped';
case 'failed':
case 'rejected':
case 'orphaned':
return 'failed';
default:
return 'scheduled';
}
}
private async getDockerServiceByNameOrNull(serviceNameArg: string) {
try {
return await this.coreflowRef.dockerHost.getServiceByName(serviceNameArg);
} catch (error) {
if ((error as Error).message === `Service not found: ${serviceNameArg}`) {
return null;
}
throw error;
}
}
private async listTasksForService(dockerServiceIdArg: string) {
const filters = encodeURIComponent(JSON.stringify({ service: [dockerServiceIdArg] }));
const response = await this.coreflowRef.dockerHost.request('GET', `/tasks?filters=${filters}`);
return Array.isArray(response.body) ? response.body : [];
}
private async listTasks() {
const response = await this.coreflowRef.dockerHost.request('GET', '/tasks');
return Array.isArray(response.body) ? response.body : [];
}
private async getNodeNameMap() {
const nodeNames = new Map<string, string>();
try {
const response = await this.coreflowRef.dockerHost.request('GET', '/nodes');
for (const node of response.body || []) {
nodeNames.set(node.ID, node.Description?.Hostname || node.Spec?.Name || node.ID);
}
} catch {
// Single-node Docker setups may not expose Swarm nodes before init.
}
return nodeNames;
}
private async streamToString(streamArg: plugins.smartstream.stream.Duplex) {
return await new Promise<string>((resolve, reject) => {
let result = '';
streamArg.on('data', (chunkArg) => {
result += Buffer.isBuffer(chunkArg) ? chunkArg.toString('utf8') : String(chunkArg);
});
streamArg.on('end', () => resolve(result));
streamArg.on('close', () => resolve(result));
streamArg.on('error', reject);
});
}
}