feat(coreflow): add deployment runtime operations
This commit is contained in:
@@ -0,0 +1,390 @@
|
||||
import * as plugins from './coreflow.plugins.js';
|
||||
import type { Coreflow } from './coreflow.classes.coreflow.js';
|
||||
import type * as servezoneInterfaces from '@serve.zone/interfaces';
|
||||
|
||||
type TCoreflowDeploymentRequest =
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_GetServiceDeployments
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadFile
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceWriteFile
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadDir
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceMkdir
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceRm
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExists
|
||||
| servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec;
|
||||
|
||||
export class CoreflowDeploymentRuntimeManager {
|
||||
constructor(private coreflowRef: Coreflow) {}
|
||||
|
||||
public async start() {
|
||||
const router = this.coreflowRef.cloudlyConnector.cloudlyApiClient.typedrouter as any;
|
||||
const addHandler = <TRequest extends TCoreflowDeploymentRequest>(
|
||||
methodArg: TRequest['method'],
|
||||
handlerArg: (dataArg: TRequest['request']) => Promise<TRequest['response']>,
|
||||
) => {
|
||||
router.addTypedHandler(new plugins.typedrequest.TypedHandler<any>(methodArg, handlerArg as any) as any);
|
||||
};
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_GetServiceDeployments>('coreflowGetServiceDeployments', async (dataArg) => {
|
||||
return {
|
||||
deployments: await this.getServiceDeployments(dataArg.service),
|
||||
};
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment>('coreflowRestartDeployment', async (dataArg) => {
|
||||
return await this.restartDeployment(dataArg.deploymentId);
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment>('coreflowKillDeployment', async (dataArg) => {
|
||||
return await this.killDeployment(dataArg.deploymentId);
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadFile>('coreflowDeploymentWorkspaceReadFile', async (dataArg) => {
|
||||
const result = await this.execInDeployment(dataArg.deploymentId, ['cat', dataArg.path]);
|
||||
if (!result.found) return { found: false };
|
||||
if (result.exitCode !== 0) {
|
||||
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to read file');
|
||||
}
|
||||
return { found: true, content: result.stdout };
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceWriteFile>('coreflowDeploymentWorkspaceWriteFile', async (dataArg) => {
|
||||
const contentArg = this.shellQuote(String(dataArg.content || ''));
|
||||
const pathArg = this.shellQuote(dataArg.path);
|
||||
const result = await this.execInDeployment(dataArg.deploymentId, [
|
||||
'sh',
|
||||
'-c',
|
||||
`printf '%s' ${contentArg} > ${pathArg}`,
|
||||
]);
|
||||
if (!result.found) return { found: false };
|
||||
if (result.exitCode !== 0) {
|
||||
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to write file');
|
||||
}
|
||||
return { found: true };
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadDir>('coreflowDeploymentWorkspaceReadDir', async (dataArg) => {
|
||||
const result = await this.execInDeployment(dataArg.deploymentId, ['ls', '-1', '-F', dataArg.path]);
|
||||
if (!result.found) return { found: false };
|
||||
if (result.exitCode !== 0) {
|
||||
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to read directory');
|
||||
}
|
||||
const basePath = String(dataArg.path || '/').endsWith('/') ? dataArg.path : `${dataArg.path}/`;
|
||||
const entries: servezoneInterfaces.requests.deployment.IDeploymentWorkspaceFileEntry[] = (result.stdout || '')
|
||||
.split('\n')
|
||||
.filter((lineArg) => lineArg.trim())
|
||||
.map((lineArg) => {
|
||||
const isDirectory = lineArg.endsWith('/');
|
||||
const name = isDirectory ? lineArg.slice(0, -1) : lineArg.replace(/[*@=|]$/, '');
|
||||
return {
|
||||
type: isDirectory ? 'directory' as const : 'file' as const,
|
||||
name,
|
||||
path: `${basePath}${name}`,
|
||||
};
|
||||
});
|
||||
return { found: true, entries };
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceMkdir>('coreflowDeploymentWorkspaceMkdir', async (dataArg) => {
|
||||
const result = await this.execInDeployment(dataArg.deploymentId, ['mkdir', '-p', dataArg.path]);
|
||||
if (!result.found) return { found: false };
|
||||
if (result.exitCode !== 0) {
|
||||
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to create directory');
|
||||
}
|
||||
return { found: true };
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceRm>('coreflowDeploymentWorkspaceRm', async (dataArg) => {
|
||||
const result = await this.execInDeployment(dataArg.deploymentId, [
|
||||
'rm',
|
||||
dataArg.recursive ? '-rf' : '-f',
|
||||
dataArg.path,
|
||||
]);
|
||||
if (!result.found) return { found: false };
|
||||
if (result.exitCode !== 0) {
|
||||
throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to remove path');
|
||||
}
|
||||
return { found: true };
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExists>('coreflowDeploymentWorkspaceExists', async (dataArg) => {
|
||||
const result = await this.execInDeployment(dataArg.deploymentId, ['test', '-e', dataArg.path]);
|
||||
if (!result.found) return { found: false };
|
||||
return { found: true, exists: result.exitCode === 0 };
|
||||
});
|
||||
|
||||
addHandler<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec>('coreflowDeploymentWorkspaceExec', async (dataArg) => {
|
||||
const command = [dataArg.command, ...(dataArg.args || [])];
|
||||
return await this.execInDeployment(dataArg.deploymentId, command);
|
||||
});
|
||||
}
|
||||
|
||||
public async stop() {}
|
||||
|
||||
private async getServiceDeployments(
|
||||
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||
): Promise<plugins.servezoneInterfaces.data.IDeployment[]> {
|
||||
const dockerService = await this.getDockerServiceByNameOrNull(serviceArg.data.name);
|
||||
if (!dockerService) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const [tasks, nodeNames] = await Promise.all([
|
||||
this.listTasksForService(dockerService.ID),
|
||||
this.getNodeNameMap(),
|
||||
]);
|
||||
|
||||
const deployments: plugins.servezoneInterfaces.data.IDeployment[] = [];
|
||||
for (const task of tasks) {
|
||||
deployments.push(await this.taskToDeployment(task, serviceArg, dockerService.ID, nodeNames));
|
||||
}
|
||||
return deployments;
|
||||
}
|
||||
|
||||
private async restartDeployment(
|
||||
deploymentIdArg: string,
|
||||
): Promise<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment['response']> {
|
||||
const resolved = await this.resolveContainerForDeployment(deploymentIdArg);
|
||||
if (!resolved) return { found: false };
|
||||
const response = await this.coreflowRef.dockerHost.request(
|
||||
'POST',
|
||||
`/containers/${resolved.container.Id}/restart`,
|
||||
);
|
||||
if (response.statusCode >= 300) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Failed to restart container: ${response.statusCode}`);
|
||||
}
|
||||
return {
|
||||
found: true,
|
||||
deployment: await this.taskToDeployment(resolved.task, resolved.service, resolved.dockerServiceId, await this.getNodeNameMap()),
|
||||
};
|
||||
}
|
||||
|
||||
private async killDeployment(
|
||||
deploymentIdArg: string,
|
||||
): Promise<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment['response']> {
|
||||
const resolved = await this.resolveContainerForDeployment(deploymentIdArg);
|
||||
if (!resolved) return { found: false };
|
||||
const response = await this.coreflowRef.dockerHost.request(
|
||||
'POST',
|
||||
`/containers/${resolved.container.Id}/kill`,
|
||||
);
|
||||
if (response.statusCode >= 300) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Failed to kill container: ${response.statusCode}`);
|
||||
}
|
||||
return {
|
||||
found: true,
|
||||
deployment: await this.taskToDeployment(resolved.task, resolved.service, resolved.dockerServiceId, await this.getNodeNameMap()),
|
||||
};
|
||||
}
|
||||
|
||||
private async execInDeployment(
|
||||
deploymentIdArg: string,
|
||||
commandArg: string[],
|
||||
): Promise<servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec['response']> {
|
||||
const resolved = await this.resolveContainerForDeployment(deploymentIdArg);
|
||||
if (!resolved) return { found: false };
|
||||
const exec = await resolved.container.exec(commandArg, {
|
||||
tty: true,
|
||||
attachStdin: false,
|
||||
attachStdout: true,
|
||||
attachStderr: true,
|
||||
});
|
||||
const stdout = await this.streamToString(exec.stream);
|
||||
const inspect = await exec.inspect();
|
||||
await exec.close();
|
||||
return {
|
||||
found: true,
|
||||
stdout,
|
||||
stderr: '',
|
||||
exitCode: inspect.ExitCode ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
private shellQuote(valueArg: string) {
|
||||
return `'${valueArg.replace(/'/g, `'\\''`)}'`;
|
||||
}
|
||||
|
||||
private async resolveContainerForDeployment(deploymentIdArg: string) {
|
||||
const containers = await this.coreflowRef.dockerHost.listContainers();
|
||||
const directContainer = containers.find((containerArg) => {
|
||||
return containerArg.Id === deploymentIdArg || containerArg.Id.startsWith(deploymentIdArg);
|
||||
});
|
||||
|
||||
const services = await this.coreflowRef.cloudlyConnector.cloudlyApiClient.services.getServices() as unknown as plugins.servezoneInterfaces.data.IService[];
|
||||
const tasks = await this.listTasks();
|
||||
|
||||
for (const service of services) {
|
||||
const dockerService = await this.getDockerServiceByNameOrNull(service.data.name);
|
||||
if (!dockerService) continue;
|
||||
const task = tasks.find((taskArg) => {
|
||||
return taskArg.ServiceID === dockerService.ID && (
|
||||
taskArg.ID === deploymentIdArg ||
|
||||
taskArg.ID?.startsWith(deploymentIdArg) ||
|
||||
taskArg.Status?.ContainerStatus?.ContainerID === deploymentIdArg ||
|
||||
taskArg.Status?.ContainerStatus?.ContainerID?.startsWith(deploymentIdArg)
|
||||
);
|
||||
});
|
||||
const containerId = task?.Status?.ContainerStatus?.ContainerID;
|
||||
const container = containerId
|
||||
? containers.find((containerArg) => containerArg.Id === containerId || containerArg.Id.startsWith(containerId))
|
||||
: directContainer;
|
||||
if (task && container) {
|
||||
return { service, task, dockerServiceId: dockerService.ID, container };
|
||||
}
|
||||
}
|
||||
|
||||
if (directContainer) {
|
||||
const service = services.find((serviceArg) => {
|
||||
return directContainer.Labels?.['com.docker.swarm.service.name'] === serviceArg.data.name;
|
||||
});
|
||||
if (service) {
|
||||
const dockerService = await this.getDockerServiceByNameOrNull(service.data.name);
|
||||
if (dockerService) {
|
||||
const task = tasks.find((taskArg) => taskArg.Status?.ContainerStatus?.ContainerID === directContainer.Id) || {
|
||||
ID: directContainer.Id,
|
||||
ServiceID: dockerService.ID,
|
||||
NodeID: '',
|
||||
DesiredState: directContainer.State,
|
||||
Status: {
|
||||
State: directContainer.State,
|
||||
ContainerStatus: { ContainerID: directContainer.Id },
|
||||
},
|
||||
CreatedAt: new Date((directContainer.Created || Date.now() / 1000) * 1000).toISOString(),
|
||||
UpdatedAt: new Date().toISOString(),
|
||||
};
|
||||
return { service, task, dockerServiceId: dockerService.ID, container: directContainer };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private async taskToDeployment(
|
||||
taskArg: any,
|
||||
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||
dockerServiceIdArg: string,
|
||||
nodeNamesArg: Map<string, string>,
|
||||
): Promise<plugins.servezoneInterfaces.data.IDeployment> {
|
||||
const containerId = taskArg.Status?.ContainerStatus?.ContainerID;
|
||||
let resourceUsage: plugins.servezoneInterfaces.data.IDeployment['resourceUsage'];
|
||||
if (containerId) {
|
||||
const container = await this.coreflowRef.dockerHost.getContainerById(containerId).catch(() => undefined);
|
||||
if (container) {
|
||||
resourceUsage = await this.getContainerResourceUsage(container).catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
const deployedAt = Date.parse(taskArg.CreatedAt || '') || Date.now();
|
||||
const updatedAt = Date.parse(taskArg.UpdatedAt || '') || deployedAt;
|
||||
const status = this.mapTaskStatus(taskArg.Status?.State || taskArg.DesiredState);
|
||||
return {
|
||||
id: taskArg.ID,
|
||||
taskId: taskArg.ID,
|
||||
serviceId: serviceArg.id,
|
||||
serviceName: serviceArg.data.name,
|
||||
nodeId: taskArg.NodeID || '',
|
||||
nodeName: nodeNamesArg.get(taskArg.NodeID) || taskArg.NodeID || '',
|
||||
containerId,
|
||||
usedImageId: serviceArg.data.imageId,
|
||||
version: serviceArg.data.imageVersion || taskArg.Spec?.ContainerSpec?.Image || 'latest',
|
||||
deployedAt,
|
||||
updatedAt,
|
||||
deploymentLog: [taskArg.Status?.Message || `Docker task ${taskArg.ID}`],
|
||||
status,
|
||||
healthStatus: status === 'running' ? 'healthy' : status === 'failed' ? 'unhealthy' : 'unknown',
|
||||
resourceUsage,
|
||||
slot: taskArg.Slot,
|
||||
desiredState: taskArg.DesiredState,
|
||||
dockerServiceId: dockerServiceIdArg,
|
||||
};
|
||||
}
|
||||
|
||||
private async getContainerResourceUsage(containerArg: plugins.docker.DockerContainer) {
|
||||
const stats = await containerArg.stats({ stream: false });
|
||||
const cpuDelta = (stats.cpu_stats?.cpu_usage?.total_usage || 0) - (stats.precpu_stats?.cpu_usage?.total_usage || 0);
|
||||
const systemDelta = (stats.cpu_stats?.system_cpu_usage || 0) - (stats.precpu_stats?.system_cpu_usage || 0);
|
||||
const onlineCpus = stats.cpu_stats?.online_cpus || stats.cpu_stats?.cpu_usage?.percpu_usage?.length || 1;
|
||||
const cpuUsagePercent = systemDelta > 0 && cpuDelta > 0
|
||||
? (cpuDelta / systemDelta) * onlineCpus * 100
|
||||
: 0;
|
||||
return {
|
||||
cpuUsagePercent,
|
||||
memoryUsedMB: Math.round((stats.memory_stats?.usage || 0) / 1024 / 1024),
|
||||
lastUpdated: Date.now(),
|
||||
};
|
||||
}
|
||||
|
||||
private mapTaskStatus(statusArg: string): plugins.servezoneInterfaces.data.IDeployment['status'] {
|
||||
switch (statusArg) {
|
||||
case 'running':
|
||||
return 'running';
|
||||
case 'new':
|
||||
case 'pending':
|
||||
case 'assigned':
|
||||
case 'accepted':
|
||||
case 'preparing':
|
||||
case 'ready':
|
||||
case 'starting':
|
||||
return 'starting';
|
||||
case 'shutdown':
|
||||
case 'complete':
|
||||
case 'remove':
|
||||
return 'stopped';
|
||||
case 'failed':
|
||||
case 'rejected':
|
||||
case 'orphaned':
|
||||
return 'failed';
|
||||
default:
|
||||
return 'scheduled';
|
||||
}
|
||||
}
|
||||
|
||||
private async getDockerServiceByNameOrNull(serviceNameArg: string) {
|
||||
try {
|
||||
return await this.coreflowRef.dockerHost.getServiceByName(serviceNameArg);
|
||||
} catch (error) {
|
||||
if ((error as Error).message === `Service not found: ${serviceNameArg}`) {
|
||||
return null;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async listTasksForService(dockerServiceIdArg: string) {
|
||||
const filters = encodeURIComponent(JSON.stringify({ service: [dockerServiceIdArg] }));
|
||||
const response = await this.coreflowRef.dockerHost.request('GET', `/tasks?filters=${filters}`);
|
||||
return Array.isArray(response.body) ? response.body : [];
|
||||
}
|
||||
|
||||
private async listTasks() {
|
||||
const response = await this.coreflowRef.dockerHost.request('GET', '/tasks');
|
||||
return Array.isArray(response.body) ? response.body : [];
|
||||
}
|
||||
|
||||
private async getNodeNameMap() {
|
||||
const nodeNames = new Map<string, string>();
|
||||
try {
|
||||
const response = await this.coreflowRef.dockerHost.request('GET', '/nodes');
|
||||
for (const node of response.body || []) {
|
||||
nodeNames.set(node.ID, node.Description?.Hostname || node.Spec?.Name || node.ID);
|
||||
}
|
||||
} catch {
|
||||
// Single-node Docker setups may not expose Swarm nodes before init.
|
||||
}
|
||||
return nodeNames;
|
||||
}
|
||||
|
||||
private async streamToString(streamArg: plugins.smartstream.stream.Duplex) {
|
||||
return await new Promise<string>((resolve, reject) => {
|
||||
let result = '';
|
||||
streamArg.on('data', (chunkArg) => {
|
||||
result += Buffer.isBuffer(chunkArg) ? chunkArg.toString('utf8') : String(chunkArg);
|
||||
});
|
||||
streamArg.on('end', () => resolve(result));
|
||||
streamArg.on('close', () => resolve(result));
|
||||
streamArg.on('error', reject);
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user