import * as plugins from './coreflow.plugins.js'; import type { Coreflow } from './coreflow.classes.coreflow.js'; import type * as servezoneInterfaces from '@serve.zone/interfaces'; type TCoreflowDeploymentRequest = | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_GetServiceDeployments | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadFile | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceWriteFile | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadDir | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceMkdir | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceRm | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExists | servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec; export class CoreflowDeploymentRuntimeManager { constructor(private coreflowRef: Coreflow) {} public async start() { const router = this.coreflowRef.cloudlyConnector.cloudlyApiClient.typedrouter as any; const addHandler = ( methodArg: TRequest['method'], handlerArg: (dataArg: TRequest['request']) => Promise, ) => { router.addTypedHandler(new plugins.typedrequest.TypedHandler(methodArg, handlerArg as any) as any); }; addHandler('coreflowGetServiceDeployments', async (dataArg) => { return { deployments: await this.getServiceDeployments(dataArg.service), }; }); addHandler('coreflowRestartDeployment', async (dataArg) => { return await this.restartDeployment(dataArg.deploymentId); }); addHandler('coreflowKillDeployment', async (dataArg) => { return await this.killDeployment(dataArg.deploymentId); }); addHandler('coreflowDeploymentWorkspaceReadFile', async (dataArg) => { const result = await this.execInDeployment(dataArg.deploymentId, ['cat', dataArg.path]); if (!result.found) return { found: false }; if (result.exitCode !== 0) { throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to read file'); } return { found: true, content: result.stdout }; }); addHandler('coreflowDeploymentWorkspaceWriteFile', async (dataArg) => { const contentArg = this.shellQuote(String(dataArg.content || '')); const pathArg = this.shellQuote(dataArg.path); const result = await this.execInDeployment(dataArg.deploymentId, [ 'sh', '-c', `printf '%s' ${contentArg} > ${pathArg}`, ]); if (!result.found) return { found: false }; if (result.exitCode !== 0) { throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to write file'); } return { found: true }; }); addHandler('coreflowDeploymentWorkspaceReadDir', async (dataArg) => { const result = await this.execInDeployment(dataArg.deploymentId, ['ls', '-1', '-F', dataArg.path]); if (!result.found) return { found: false }; if (result.exitCode !== 0) { throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to read directory'); } const basePath = String(dataArg.path || '/').endsWith('/') ? dataArg.path : `${dataArg.path}/`; const entries: servezoneInterfaces.requests.deployment.IDeploymentWorkspaceFileEntry[] = (result.stdout || '') .split('\n') .filter((lineArg) => lineArg.trim()) .map((lineArg) => { const isDirectory = lineArg.endsWith('/'); const name = isDirectory ? lineArg.slice(0, -1) : lineArg.replace(/[*@=|]$/, ''); return { type: isDirectory ? 'directory' as const : 'file' as const, name, path: `${basePath}${name}`, }; }); return { found: true, entries }; }); addHandler('coreflowDeploymentWorkspaceMkdir', async (dataArg) => { const result = await this.execInDeployment(dataArg.deploymentId, ['mkdir', '-p', dataArg.path]); if (!result.found) return { found: false }; if (result.exitCode !== 0) { throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to create directory'); } return { found: true }; }); addHandler('coreflowDeploymentWorkspaceRm', async (dataArg) => { const result = await this.execInDeployment(dataArg.deploymentId, [ 'rm', dataArg.recursive ? '-rf' : '-f', dataArg.path, ]); if (!result.found) return { found: false }; if (result.exitCode !== 0) { throw new plugins.typedrequest.TypedResponseError(result.stderr || 'Failed to remove path'); } return { found: true }; }); addHandler('coreflowDeploymentWorkspaceExists', async (dataArg) => { const result = await this.execInDeployment(dataArg.deploymentId, ['test', '-e', dataArg.path]); if (!result.found) return { found: false }; return { found: true, exists: result.exitCode === 0 }; }); addHandler('coreflowDeploymentWorkspaceExec', async (dataArg) => { const command = [dataArg.command, ...(dataArg.args || [])]; return await this.execInDeployment(dataArg.deploymentId, command); }); } public async stop() {} private async getServiceDeployments( serviceArg: plugins.servezoneInterfaces.data.IService, ): Promise { const dockerService = await this.getDockerServiceByNameOrNull(serviceArg.data.name); if (!dockerService) { return []; } const [tasks, nodeNames] = await Promise.all([ this.listTasksForService(dockerService.ID), this.getNodeNameMap(), ]); const deployments: plugins.servezoneInterfaces.data.IDeployment[] = []; for (const task of tasks) { deployments.push(await this.taskToDeployment(task, serviceArg, dockerService.ID, nodeNames)); } return deployments; } private async restartDeployment( deploymentIdArg: string, ): Promise { const resolved = await this.resolveContainerForDeployment(deploymentIdArg); if (!resolved) return { found: false }; const response = await this.coreflowRef.dockerHost.request( 'POST', `/containers/${resolved.container.Id}/restart`, ); if (response.statusCode >= 300) { throw new plugins.typedrequest.TypedResponseError(`Failed to restart container: ${response.statusCode}`); } return { found: true, deployment: await this.taskToDeployment(resolved.task, resolved.service, resolved.dockerServiceId, await this.getNodeNameMap()), }; } private async killDeployment( deploymentIdArg: string, ): Promise { const resolved = await this.resolveContainerForDeployment(deploymentIdArg); if (!resolved) return { found: false }; const response = await this.coreflowRef.dockerHost.request( 'POST', `/containers/${resolved.container.Id}/kill`, ); if (response.statusCode >= 300) { throw new plugins.typedrequest.TypedResponseError(`Failed to kill container: ${response.statusCode}`); } return { found: true, deployment: await this.taskToDeployment(resolved.task, resolved.service, resolved.dockerServiceId, await this.getNodeNameMap()), }; } private async execInDeployment( deploymentIdArg: string, commandArg: string[], ): Promise { const resolved = await this.resolveContainerForDeployment(deploymentIdArg); if (!resolved) return { found: false }; const exec = await resolved.container.exec(commandArg, { tty: true, attachStdin: false, attachStdout: true, attachStderr: true, }); const stdout = await this.streamToString(exec.stream); const inspect = await exec.inspect(); await exec.close(); return { found: true, stdout, stderr: '', exitCode: inspect.ExitCode ?? 0, }; } private shellQuote(valueArg: string) { return `'${valueArg.replace(/'/g, `'\\''`)}'`; } private async resolveContainerForDeployment(deploymentIdArg: string) { const containers = await this.coreflowRef.dockerHost.listContainers(); const directContainer = containers.find((containerArg) => { return containerArg.Id === deploymentIdArg || containerArg.Id.startsWith(deploymentIdArg); }); const services = await this.coreflowRef.cloudlyConnector.cloudlyApiClient.services.getServices() as unknown as plugins.servezoneInterfaces.data.IService[]; const tasks = await this.listTasks(); for (const service of services) { const dockerService = await this.getDockerServiceByNameOrNull(service.data.name); if (!dockerService) continue; const task = tasks.find((taskArg) => { return taskArg.ServiceID === dockerService.ID && ( taskArg.ID === deploymentIdArg || taskArg.ID?.startsWith(deploymentIdArg) || taskArg.Status?.ContainerStatus?.ContainerID === deploymentIdArg || taskArg.Status?.ContainerStatus?.ContainerID?.startsWith(deploymentIdArg) ); }); const containerId = task?.Status?.ContainerStatus?.ContainerID; const container = containerId ? containers.find((containerArg) => containerArg.Id === containerId || containerArg.Id.startsWith(containerId)) : directContainer; if (task && container) { return { service, task, dockerServiceId: dockerService.ID, container }; } } if (directContainer) { const service = services.find((serviceArg) => { return directContainer.Labels?.['com.docker.swarm.service.name'] === serviceArg.data.name; }); if (service) { const dockerService = await this.getDockerServiceByNameOrNull(service.data.name); if (dockerService) { const task = tasks.find((taskArg) => taskArg.Status?.ContainerStatus?.ContainerID === directContainer.Id) || { ID: directContainer.Id, ServiceID: dockerService.ID, NodeID: '', DesiredState: directContainer.State, Status: { State: directContainer.State, ContainerStatus: { ContainerID: directContainer.Id }, }, CreatedAt: new Date((directContainer.Created || Date.now() / 1000) * 1000).toISOString(), UpdatedAt: new Date().toISOString(), }; return { service, task, dockerServiceId: dockerService.ID, container: directContainer }; } } } return null; } private async taskToDeployment( taskArg: any, serviceArg: plugins.servezoneInterfaces.data.IService, dockerServiceIdArg: string, nodeNamesArg: Map, ): Promise { const containerId = taskArg.Status?.ContainerStatus?.ContainerID; let resourceUsage: plugins.servezoneInterfaces.data.IDeployment['resourceUsage']; if (containerId) { const container = await this.coreflowRef.dockerHost.getContainerById(containerId).catch(() => undefined); if (container) { resourceUsage = await this.getContainerResourceUsage(container).catch(() => undefined); } } const deployedAt = Date.parse(taskArg.CreatedAt || '') || Date.now(); const updatedAt = Date.parse(taskArg.UpdatedAt || '') || deployedAt; const status = this.mapTaskStatus(taskArg.Status?.State || taskArg.DesiredState); return { id: taskArg.ID, taskId: taskArg.ID, serviceId: serviceArg.id, serviceName: serviceArg.data.name, nodeId: taskArg.NodeID || '', nodeName: nodeNamesArg.get(taskArg.NodeID) || taskArg.NodeID || '', containerId, usedImageId: serviceArg.data.imageId, version: serviceArg.data.imageVersion || taskArg.Spec?.ContainerSpec?.Image || 'latest', deployedAt, updatedAt, deploymentLog: [taskArg.Status?.Message || `Docker task ${taskArg.ID}`], status, healthStatus: status === 'running' ? 'healthy' : status === 'failed' ? 'unhealthy' : 'unknown', resourceUsage, slot: taskArg.Slot, desiredState: taskArg.DesiredState, dockerServiceId: dockerServiceIdArg, }; } private async getContainerResourceUsage(containerArg: plugins.docker.DockerContainer) { const stats = await containerArg.stats({ stream: false }); const cpuDelta = (stats.cpu_stats?.cpu_usage?.total_usage || 0) - (stats.precpu_stats?.cpu_usage?.total_usage || 0); const systemDelta = (stats.cpu_stats?.system_cpu_usage || 0) - (stats.precpu_stats?.system_cpu_usage || 0); const onlineCpus = stats.cpu_stats?.online_cpus || stats.cpu_stats?.cpu_usage?.percpu_usage?.length || 1; const cpuUsagePercent = systemDelta > 0 && cpuDelta > 0 ? (cpuDelta / systemDelta) * onlineCpus * 100 : 0; return { cpuUsagePercent, memoryUsedMB: Math.round((stats.memory_stats?.usage || 0) / 1024 / 1024), lastUpdated: Date.now(), }; } private mapTaskStatus(statusArg: string): plugins.servezoneInterfaces.data.IDeployment['status'] { switch (statusArg) { case 'running': return 'running'; case 'new': case 'pending': case 'assigned': case 'accepted': case 'preparing': case 'ready': case 'starting': return 'starting'; case 'shutdown': case 'complete': case 'remove': return 'stopped'; case 'failed': case 'rejected': case 'orphaned': return 'failed'; default: return 'scheduled'; } } private async getDockerServiceByNameOrNull(serviceNameArg: string) { try { return await this.coreflowRef.dockerHost.getServiceByName(serviceNameArg); } catch (error) { if ((error as Error).message === `Service not found: ${serviceNameArg}`) { return null; } throw error; } } private async listTasksForService(dockerServiceIdArg: string) { const filters = encodeURIComponent(JSON.stringify({ service: [dockerServiceIdArg] })); const response = await this.coreflowRef.dockerHost.request('GET', `/tasks?filters=${filters}`); return Array.isArray(response.body) ? response.body : []; } private async listTasks() { const response = await this.coreflowRef.dockerHost.request('GET', '/tasks'); return Array.isArray(response.body) ? response.body : []; } private async getNodeNameMap() { const nodeNames = new Map(); try { const response = await this.coreflowRef.dockerHost.request('GET', '/nodes'); for (const node of response.body || []) { nodeNames.set(node.ID, node.Description?.Hostname || node.Spec?.Name || node.ID); } } catch { // Single-node Docker setups may not expose Swarm nodes before init. } return nodeNames; } private async streamToString(streamArg: plugins.smartstream.stream.Duplex) { return await new Promise((resolve, reject) => { let result = ''; streamArg.on('data', (chunkArg) => { result += Buffer.isBuffer(chunkArg) ? chunkArg.toString('utf8') : String(chunkArg); }); streamArg.on('end', () => resolve(result)); streamArg.on('close', () => resolve(result)); streamArg.on('error', reject); }); } }