From b6517e31f705cd78e4436c0653eafaa37e907c88 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 24 May 2026 12:47:05 +0000 Subject: [PATCH] feat(spark): add node telemetry heartbeat --- bin/spark-wrapper.js | 3 +- changelog.md | 19 ++- readme.md | 15 ++- scripts/install-binary.js | 9 +- test.simple.ts | 6 +- test/metrics.test.ts | 38 ++++++ test/updatemanager.test.ts | 50 +++++++ ts/index.ts | 4 +- ts/spark.classes.heartbeatmanager.ts | 182 +++++++++++++++++++++++++ ts/spark.classes.metricscollector.ts | 190 +++++++++++++++++++++++++++ ts/spark.classes.spark.ts | 10 +- ts/spark.classes.taskmanager.ts | 24 ++-- ts/spark.classes.updatemanager.ts | 120 ++++++++++------- ts/spark.cli.ts | 73 ++++++++-- 14 files changed, 653 insertions(+), 90 deletions(-) create mode 100644 test/metrics.test.ts create mode 100644 test/updatemanager.test.ts create mode 100644 ts/spark.classes.heartbeatmanager.ts create mode 100644 ts/spark.classes.metricscollector.ts diff --git a/bin/spark-wrapper.js b/bin/spark-wrapper.js index 5804005..bac846d 100755 --- a/bin/spark-wrapper.js +++ b/bin/spark-wrapper.js @@ -10,6 +10,7 @@ import { fileURLToPath } from 'url'; import { dirname, join } from 'path'; import { existsSync } from 'fs'; import { platform, arch } from 'os'; +import process from 'node:process'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); @@ -105,4 +106,4 @@ function executeBinary() { } // Execute -executeBinary(); \ No newline at end of file +executeBinary(); diff --git a/changelog.md b/changelog.md index 6f41b6a..423a8ca 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,22 @@ # Changelog +## Pending + +### Features + +- add Spark host telemetry and Cloudly heartbeat + - Adds local CPU, memory, disk, and Docker metrics collection + - Adds `spark metrics` for local telemetry inspection + - Adds Cloudly heartbeat reporting when node credentials are configured + +### Fixes + +- improve daemon service maintenance behavior + - Checks all managed services instead of returning after the first up-to-date service + - Awaits scheduled service update runs and prevents overlapping update loops + - Makes host package updates opt-in + - Corrects the Cloudly profile service name + ## 2026-02-04 - 1.2.5 - fix(deps) update Docker API usage to DockerHost facade, bump dependencies, and adjust tests/docs @@ -60,4 +77,4 @@ Routine updates and fixes to core functionality. Consistent updates made to improve core operations. - Updates focused on core functionality for improved performance -- Series of updates applied from versions 1.0.82 to 1.0.85 \ No newline at end of file +- Series of updates applied from versions 1.0.82 to 1.0.85 diff --git a/readme.md b/readme.md index ac8b804..a4c4650 100644 --- a/readme.md +++ b/readme.md @@ -15,7 +15,10 @@ The current implementation does four main things: - Installs and updates a `smartdaemon_spark` systemd service through `@push.rocks/smartdaemon`. - Runs in an explicit mode: `cloudly` or `coreflow-node`. - Activates Docker Swarm through `@apiclient.xyz/docker` when daemon mode starts. -- Schedules recurring tasks with `@push.rocks/taskbuffer` for Spark updates, host package updates, and managed Docker service updates. +- Collects host metrics for CPU, memory, disk, and Docker container counts. +- Sends Cloudly heartbeats when `cloudlyUrl`, `nodeId`, and `nodeToken` are configured. +- Schedules recurring tasks with `@push.rocks/taskbuffer` for Spark updates and managed Docker service updates. +- Optionally schedules host package updates when `--enableHostUpdates=true` has been configured. ## Installation @@ -82,6 +85,7 @@ spark [options] | `updatedaemon` | Reload the daemon service definition for the current Spark version. | | `asdaemon --mode cloudly` | Run the daemon loop with the Cloudly profile. | | `asdaemon --mode coreflow-node` | Run the daemon loop with the Coreflow node profile. | +| `metrics` | Print a JSON host metrics snapshot. | | `logs` | Follow `journalctl -u smartdaemon_spark -f`. | | `prune` | Stop Spark, remove Docker stacks/services/secrets, remove selected networks, prune Docker, restart Docker, and restart Spark. | @@ -89,10 +93,11 @@ spark [options] ## Daemon Behavior -`Spark.daemonStart()` starts two subsystems: +`Spark.daemonStart()` starts three subsystems: - `SparkServicesManager.start()` activates Docker Swarm. - `SparkTaskManager.start()` schedules recurring maintenance tasks. +- `SparkHeartbeatManager.start()` sends node metrics to Cloudly when node credentials are configured. Scheduled tasks: @@ -100,10 +105,12 @@ Scheduled tasks: | --- | --- | --- | | `updateServices` | Every 2 minutes at second 30 | Checks managed Docker services and recreates them when images change. | | `updateSpark` | Every minute | Checks for a newer Spark release and reloads the daemon after upgrade. | -| `updateHost` | Daily at midnight | Runs apt update/upgrade/autoremove/autoclean. | +| `updateHost` | Daily at midnight when enabled | Runs apt update/upgrade/autoremove/autoclean. | The managed service list is populated by the selected mode before daemon startup. Service updates use Docker images, Docker secrets, and published port mappings. +Cloudly heartbeat requires a Spark node token. The Cloudly jump flow provisions this token and passes it to Spark as `--nodeId` and `--nodeToken` during daemon installation. Existing installations without those values continue to run service maintenance without reporting node telemetry. + ## Programmatic Usage Spark exports the main `Spark` class from `mod.ts`: @@ -123,6 +130,8 @@ The public instance exposes: | `sparkConfig` | persisted mode/config key-value store. | | `sparkTaskManager` | taskbuffer scheduler and built-in maintenance tasks. | | `sparkUpdateManager` | Docker Swarm activation and managed service update logic. | +| `sparkMetricsCollector` | Host metrics collection for local output and Cloudly heartbeat payloads. | +| `sparkHeartbeatManager` | Cloudly heartbeat client for Spark node telemetry. | | `sparkInfo` | package metadata lookup. | ## Development diff --git a/scripts/install-binary.js b/scripts/install-binary.js index 62afca4..b09500f 100755 --- a/scripts/install-binary.js +++ b/scripts/install-binary.js @@ -6,21 +6,20 @@ */ import { platform, arch } from 'os'; -import { existsSync, mkdirSync, writeFileSync, chmodSync, unlinkSync } from 'fs'; +import { existsSync, mkdirSync, chmodSync, unlinkSync } from 'fs'; import { join, dirname } from 'path'; import { fileURLToPath } from 'url'; import https from 'https'; import { pipeline } from 'stream'; -import { promisify } from 'util'; import { createWriteStream } from 'fs'; +import process from 'node:process'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); -const streamPipeline = promisify(pipeline); // Configuration const REPO_BASE = 'https://code.foss.global/serve.zone/spark'; -const VERSION = process.env.npm_package_version || '1.2.2'; +const VERSION = process.env.npm_package_version || '1.2.5'; function getBinaryInfo() { const plat = platform(); @@ -176,7 +175,7 @@ async function main() { try { // Try fallback URL await downloadFile(fallbackUrl, binaryPath); - } catch (fallbackErr) { + } catch { console.error(`❌ Error: Failed to download binary`); console.error(` Primary URL: ${releaseUrl}`); console.error(` Fallback URL: ${fallbackUrl}`); diff --git a/test.simple.ts b/test.simple.ts index c0ce3fc..1b0e16f 100644 --- a/test.simple.ts +++ b/test.simple.ts @@ -14,13 +14,13 @@ console.log(`✅ Path operations work: ${testPath}`); // Test basic imports from plugins import * as smartdelay from '@push.rocks/smartdelay'; -console.log('✅ @push.rocks/smartdelay import works'); +console.log(`✅ @push.rocks/smartdelay import works: ${typeof smartdelay === 'object'}`); import * as smartlog from '@push.rocks/smartlog'; -console.log('✅ @push.rocks/smartlog import works'); +console.log(`✅ @push.rocks/smartlog import works: ${typeof smartlog === 'object'}`); console.log(''); console.log('Basic Deno functionality confirmed!'); console.log(''); console.log('Note: Full application may require additional dependency resolution'); -console.log('for complex packages like @serve.zone/api that have many transitive dependencies.'); \ No newline at end of file +console.log('for complex packages like @serve.zone/api that have many transitive dependencies.'); diff --git a/test/metrics.test.ts b/test/metrics.test.ts new file mode 100644 index 0000000..634c791 --- /dev/null +++ b/test/metrics.test.ts @@ -0,0 +1,38 @@ +import { assertEquals } from '@std/assert'; +import { + calculateCpuUsagePercent, + parseDfOutput, + parseDockerPsOutput, + parseProcStatCpuLine, +} from '../ts/spark.classes.metricscollector.ts'; + +Deno.test('should parse cpu samples from proc stat', () => { + const sample = parseProcStatCpuLine('cpu 100 20 30 400 50 0 0 0 0 0\n'); + assertEquals(sample, { + idle: 450, + total: 600, + }); +}); + +Deno.test('should calculate cpu usage percent', () => { + const usage = calculateCpuUsagePercent( + { idle: 100, total: 200 }, + { idle: 150, total: 400 }, + ); + assertEquals(usage, 75); +}); + +Deno.test('should parse df output', () => { + const disk = parseDfOutput(`Filesystem 1024-blocks Used Available Capacity Mounted on +/dev/root 10485760 2097152 8388608 20% / +`); + assertEquals(disk, { + diskUsedGB: 2, + diskAvailableGB: 8, + }); +}); + +Deno.test('should parse docker container count', () => { + assertEquals(parseDockerPsOutput('container-a\ncontainer-b\n'), 2); + assertEquals(parseDockerPsOutput('\n'), 0); +}); diff --git a/test/updatemanager.test.ts b/test/updatemanager.test.ts new file mode 100644 index 0000000..017ad5d --- /dev/null +++ b/test/updatemanager.test.ts @@ -0,0 +1,50 @@ +import { assertEquals } from '@std/assert'; +import { Spark } from '../ts/index.ts'; + +Deno.test({ + name: 'should continue checking services when one service does not need an update', + async fn() { + const spark = new Spark(); + const checkedServices: string[] = []; + spark.sparkUpdateManager.dockerHost = { + getServiceByName: (serviceNameArg: string) => Promise.resolve({ + needsUpdate: () => { + checkedServices.push(serviceNameArg); + return Promise.resolve(false); + }, + }), + getSecretByName: () => Promise.resolve(undefined), + createImageFromRegistry: () => Promise.reject( + new Error('should not pull images for up-to-date services'), + ), + } as unknown as typeof spark.sparkUpdateManager.dockerHost; + + spark.sparkUpdateManager.services = [ + { + name: 'service-one', + image: 'example/service-one', + url: 'service-one', + port: '3000', + environment: 'test', + secretJson: { + SERVEZONE_PORT: '3000', + }, + }, + { + name: 'service-two', + image: 'example/service-two', + url: 'service-two', + port: '3000', + environment: 'test', + secretJson: { + SERVEZONE_PORT: '3000', + }, + }, + ]; + + await spark.sparkUpdateManager.updateServices(); + assertEquals(checkedServices, ['service-one', 'service-two']); + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/ts/index.ts b/ts/index.ts index bd0e444..e87928b 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,7 +1,9 @@ export * from './spark.classes.spark.ts'; +export * from './spark.classes.metricscollector.ts'; +export * from './spark.classes.heartbeatmanager.ts'; import * as cli from './spark.cli.ts'; export const runCli = async () => { - cli.runCli(); + await cli.runCli(); }; diff --git a/ts/spark.classes.heartbeatmanager.ts b/ts/spark.classes.heartbeatmanager.ts new file mode 100644 index 0000000..b2a0b1e --- /dev/null +++ b/ts/spark.classes.heartbeatmanager.ts @@ -0,0 +1,182 @@ +import type { Spark } from './spark.classes.spark.ts'; +import { logger } from './spark.logging.ts'; + +export class SparkHeartbeatManager { + public sparkRef: Spark; + private heartbeatTimer?: number; + private cloudlyConnectionStatus: TSparkCloudlyConnectionStatus = 'not-configured'; + + constructor(sparkRefArg: Spark) { + this.sparkRef = sparkRefArg; + } + + public async start() { + const config = await this.readConfig(); + if (!config.cloudlyUrl || !config.nodeId || !config.nodeToken) { + this.cloudlyConnectionStatus = 'not-configured'; + logger.log('info', 'Spark heartbeat is not configured. nodeId/nodeToken/cloudlyUrl are required.'); + return; + } + + await this.sendHeartbeatOnce().catch((errorArg) => { + logger.log('warn', `initial Spark heartbeat failed: ${(errorArg as Error).message}`); + }); + + const intervalMs = await this.getHeartbeatIntervalMs(); + this.heartbeatTimer = setInterval(() => { + this.sendHeartbeatOnce().catch((errorArg) => { + logger.log('warn', `Spark heartbeat failed: ${(errorArg as Error).message}`); + }); + }, intervalMs); + } + + public stop() { + if (this.heartbeatTimer !== undefined) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = undefined; + } + } + + public async sendHeartbeatOnce(): Promise { + const config = await this.readConfig(); + if (!config.cloudlyUrl || !config.nodeId || !config.nodeToken) { + this.cloudlyConnectionStatus = 'not-configured'; + return { + accepted: false, + message: 'Spark heartbeat is not configured', + }; + } + + this.cloudlyConnectionStatus = 'connecting'; + const [metrics, runtimeInfo] = await Promise.all([ + this.sparkRef.sparkMetricsCollector.collectMetrics(), + this.getRuntimeInfo(config), + ]); + + const response = await this.postJson( + config.cloudlyUrl, + '/spark/v1/nodes/heartbeat', + { + nodeId: config.nodeId, + nodeToken: config.nodeToken, + metrics, + runtimeInfo, + }, + ); + + if (!response.accepted) { + this.cloudlyConnectionStatus = 'failed'; + throw new Error(response.message || 'Cloudly rejected Spark heartbeat'); + } + this.cloudlyConnectionStatus = 'connected'; + return response; + } + + private async getRuntimeInfo(configArg: ISparkHeartbeatConfig): Promise { + const dockerInfo = await this.sparkRef.sparkMetricsCollector.collectDockerInfo(); + const mode = await this.getMode(); + return { + runtime: 'spark', + nodeId: configArg.nodeId!, + mode, + hostname: Deno.hostname(), + platform: Deno.build.os, + arch: Deno.build.arch, + osRelease: this.getOsRelease(), + sparkVersion: this.sparkRef.sparkInfo.projectInfo.version, + cloudlyUrl: configArg.cloudlyUrl, + cloudlyConnectionStatus: this.cloudlyConnectionStatus, + dockerAvailable: dockerInfo.dockerAvailable, + swarmNodeId: dockerInfo.swarmNodeId, + checkedAt: Date.now(), + }; + } + + private async getMode(): Promise { + const mode = await this.sparkRef.sparkConfig.kvStore.readKey('mode'); + if (mode === 'cloudly' || mode === 'coreflow-node') { + return mode; + } + return undefined; + } + + private getOsRelease(): string | undefined { + try { + return Deno.osRelease(); + } catch { + return undefined; + } + } + + private async getHeartbeatIntervalMs() { + const configuredInterval = Number( + await this.sparkRef.sparkConfig.kvStore.readKey('heartbeatIntervalMs'), + ); + if (Number.isFinite(configuredInterval) && configuredInterval >= 5000) { + return configuredInterval; + } + return 60000; + } + + private async readConfig(): Promise { + return { + cloudlyUrl: await this.sparkRef.sparkConfig.kvStore.readKey('cloudlyUrl'), + nodeId: await this.sparkRef.sparkConfig.kvStore.readKey('nodeId'), + nodeToken: await this.sparkRef.sparkConfig.kvStore.readKey('nodeToken'), + }; + } + + private async postJson( + cloudlyUrlArg: string, + pathArg: string, + bodyArg: Record, + ): Promise { + const url = new URL( + pathArg, + cloudlyUrlArg.endsWith('/') ? cloudlyUrlArg : `${cloudlyUrlArg}/`, + ); + const response = await fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: JSON.stringify(bodyArg), + }); + if (!response.ok) { + throw new Error(`Cloudly request failed: ${pathArg} -> HTTP ${response.status}`); + } + return await response.json() as TResponse; + } +} + +interface ISparkHeartbeatConfig { + cloudlyUrl?: string; + nodeId?: string; + nodeToken?: string; +} + +type TSparkNodeMode = 'cloudly' | 'coreflow-node'; + +type TSparkCloudlyConnectionStatus = 'not-configured' | 'connecting' | 'connected' | 'failed'; + +interface ISparkNodeRuntimeInfo { + runtime: 'spark'; + nodeId: string; + mode?: TSparkNodeMode; + hostname?: string; + platform: string; + arch: string; + osRelease?: string; + sparkVersion: string; + cloudlyUrl?: string; + cloudlyConnectionStatus: TSparkCloudlyConnectionStatus; + dockerAvailable: boolean; + swarmNodeId?: string; + checkedAt: number; + lastError?: string; +} + +interface ISparkHeartbeatResponse { + accepted: boolean; + message?: string; +} diff --git a/ts/spark.classes.metricscollector.ts b/ts/spark.classes.metricscollector.ts new file mode 100644 index 0000000..5206c14 --- /dev/null +++ b/ts/spark.classes.metricscollector.ts @@ -0,0 +1,190 @@ +import * as plugins from './spark.plugins.ts'; + +export interface ICpuSample { + idle: number; + total: number; +} + +export interface IDiskSample { + diskUsedGB: number; + diskAvailableGB: number; +} + +export interface IDockerRuntimeInfo { + dockerAvailable: boolean; + containerCount: number; + swarmNodeId?: string; +} + +interface ICommandResult { + success: boolean; + stdout: string; +} + +export class SparkMetricsCollector { + private previousCpuSample?: ICpuSample; + + public async collectMetrics(): Promise { + const [cpuUsagePercent, memoryInfo, diskInfo, dockerInfo] = await Promise.all([ + this.collectCpuUsagePercent(), + this.collectMemoryInfo(), + this.collectDiskInfo(), + this.collectDockerInfo(), + ]); + + return { + cpuUsagePercent, + memoryUsedMB: memoryInfo.memoryUsedMB, + memoryAvailableMB: memoryInfo.memoryAvailableMB, + diskUsedGB: diskInfo.diskUsedGB, + diskAvailableGB: diskInfo.diskAvailableGB, + containerCount: dockerInfo.containerCount, + timestamp: Date.now(), + }; + } + + public async collectDockerInfo(): Promise { + const containerResult = await this.runCommand('docker', ['ps', '-q']); + const swarmNodeResult = await this.runCommand('docker', ['info', '--format', '{{.Swarm.NodeID}}']); + const rawSwarmNodeId = swarmNodeResult.stdout.trim(); + const swarmNodeId = rawSwarmNodeId === '' ? '' : rawSwarmNodeId; + return { + dockerAvailable: containerResult.success, + containerCount: containerResult.success ? parseDockerPsOutput(containerResult.stdout) : 0, + swarmNodeId: swarmNodeId || undefined, + }; + } + + private async collectCpuUsagePercent(): Promise { + const currentSample = await this.readCpuSample(); + if (!currentSample) { + return 0; + } + + if (!this.previousCpuSample) { + this.previousCpuSample = currentSample; + await new Promise((resolve) => setTimeout(resolve, 250)); + const nextSample = await this.readCpuSample(); + if (!nextSample) { + return 0; + } + this.previousCpuSample = nextSample; + return calculateCpuUsagePercent(currentSample, nextSample); + } + + const cpuUsagePercent = calculateCpuUsagePercent(this.previousCpuSample, currentSample); + this.previousCpuSample = currentSample; + return cpuUsagePercent; + } + + private collectMemoryInfo() { + const systemMemoryInfo = Deno.systemMemoryInfo(); + const memoryUsedBytes = Math.max( + systemMemoryInfo.total - systemMemoryInfo.available, + 0, + ); + return { + memoryUsedMB: bytesToMegabytes(memoryUsedBytes), + memoryAvailableMB: bytesToMegabytes(systemMemoryInfo.available), + }; + } + + private async collectDiskInfo(): Promise { + const dfOutput = await this.runCommand('df', ['-Pk', '/']); + return parseDfOutput(dfOutput.stdout); + } + + private async readCpuSample(): Promise { + try { + return parseProcStatCpuLine(await Deno.readTextFile('/proc/stat')); + } catch { + return undefined; + } + } + + private async runCommand(commandArg: string, argsArg: string[]): Promise { + try { + const command = new Deno.Command(commandArg, { + args: argsArg, + stdout: 'piped', + stderr: 'null', + }); + const output = await command.output(); + return { + success: output.success, + stdout: new TextDecoder().decode(output.stdout), + }; + } catch { + return { + success: false, + stdout: '', + }; + } + } +} + +export const parseProcStatCpuLine = (procStatTextArg: string): ICpuSample | undefined => { + const cpuLine = procStatTextArg.split('\n').find((lineArg) => lineArg.startsWith('cpu ')); + if (!cpuLine) { + return undefined; + } + const values = cpuLine.trim().split(/\s+/).slice(1).map((valueArg) => Number(valueArg)); + if (values.some((valueArg) => !Number.isFinite(valueArg))) { + return undefined; + } + + const idle = (values[3] || 0) + (values[4] || 0); + const total = values.reduce((sumArg, valueArg) => sumArg + valueArg, 0); + return { idle, total }; +}; + +export const calculateCpuUsagePercent = (previousArg: ICpuSample, currentArg: ICpuSample): number => { + const idleDelta = currentArg.idle - previousArg.idle; + const totalDelta = currentArg.total - previousArg.total; + if (totalDelta <= 0) { + return 0; + } + return roundMetric(((totalDelta - idleDelta) / totalDelta) * 100); +}; + +export const parseDfOutput = (dfOutputArg: string): IDiskSample => { + const dataLine = dfOutputArg.trim().split('\n')[1]; + if (!dataLine) { + return { + diskUsedGB: 0, + diskAvailableGB: 0, + }; + } + const parts = dataLine.trim().split(/\s+/); + const usedKilobytes = Number(parts[2]); + const availableKilobytes = Number(parts[3]); + return { + diskUsedGB: kilobytesToGigabytes(usedKilobytes), + diskAvailableGB: kilobytesToGigabytes(availableKilobytes), + }; +}; + +export const parseDockerPsOutput = (dockerPsOutputArg: string): number => { + if (!dockerPsOutputArg.trim()) { + return 0; + } + return dockerPsOutputArg.trim().split('\n').filter(Boolean).length; +}; + +const bytesToMegabytes = (bytesArg: number): number => { + return roundMetric(bytesArg / 1024 / 1024); +}; + +const kilobytesToGigabytes = (kilobytesArg: number): number => { + if (!Number.isFinite(kilobytesArg)) { + return 0; + } + return roundMetric(kilobytesArg / 1024 / 1024); +}; + +const roundMetric = (valueArg: number): number => { + if (!Number.isFinite(valueArg)) { + return 0; + } + return Math.round(valueArg * 100) / 100; +}; diff --git a/ts/spark.classes.spark.ts b/ts/spark.classes.spark.ts index e006a31..3222349 100644 --- a/ts/spark.classes.spark.ts +++ b/ts/spark.classes.spark.ts @@ -2,8 +2,9 @@ import * as plugins from './spark.plugins.ts'; import { SparkTaskManager } from './spark.classes.taskmanager.ts'; import { SparkInfo } from './spark.classes.info.ts'; import { SparkServicesManager } from './spark.classes.updatemanager.ts'; -import { logger } from './spark.logging.ts'; import { SparkConfig } from './spark.classes.config.ts'; +import { SparkMetricsCollector } from './spark.classes.metricscollector.ts'; +import { SparkHeartbeatManager } from './spark.classes.heartbeatmanager.ts'; export class Spark { public smartdaemon: plugins.smartdaemon.SmartDaemon; @@ -11,6 +12,8 @@ export class Spark { public sparkTaskManager: SparkTaskManager; public sparkInfo: SparkInfo; public sparkUpdateManager: SparkServicesManager; + public sparkMetricsCollector: SparkMetricsCollector; + public sparkHeartbeatManager: SparkHeartbeatManager; constructor() { this.smartdaemon = new plugins.smartdaemon.SmartDaemon(); @@ -18,10 +21,13 @@ export class Spark { this.sparkInfo = new SparkInfo(this); this.sparkTaskManager = new SparkTaskManager(this); this.sparkUpdateManager = new SparkServicesManager(this); + this.sparkMetricsCollector = new SparkMetricsCollector(); + this.sparkHeartbeatManager = new SparkHeartbeatManager(this); } public async daemonStart() { await this.sparkUpdateManager.start(); - this.sparkTaskManager.start(); + await this.sparkTaskManager.start(); + await this.sparkHeartbeatManager.start(); } } diff --git a/ts/spark.classes.taskmanager.ts b/ts/spark.classes.taskmanager.ts index 0b37882..edcb960 100644 --- a/ts/spark.classes.taskmanager.ts +++ b/ts/spark.classes.taskmanager.ts @@ -1,6 +1,5 @@ import * as plugins from './spark.plugins.ts'; import { Spark } from './index.ts'; -import * as paths from './spark.paths.ts'; import { logger } from './spark.logging.ts'; export class SparkTaskManager { @@ -30,12 +29,14 @@ export class SparkTaskManager { this.sparkRef.sparkInfo.projectInfo.version ); if (shouldUpdate) { - await this.stop(); + this.stop(); const smartshellInstance = new plugins.smartshell.Smartshell({ executor: 'bash', }); - await smartshellInstance.exec(`cd / && npm upgrade -g && spark updatedaemon`); + await smartshellInstance.exec( + `cd / && pnpm add --global @serve.zone/spark@latest && spark updatedaemon`, + ); logger.log('info', 'Cooling off before restart...'); await plugins.smartdelay.delayFor(5000); logger.log('ok', '######## Trying to exit / Restart expected... ########'); @@ -47,7 +48,7 @@ export class SparkTaskManager { this.updateHost = new plugins.taskbuffer.Task({ name: 'updateHost', taskFunction: async () => { - await this.stop(); + this.stop(); const smartshellInstance = new plugins.smartshell.Smartshell({ executor: 'bash', }); @@ -62,10 +63,10 @@ export class SparkTaskManager { * only being run when mode is cloudly */ this.updateServices = new plugins.taskbuffer.Task({ - name: 'updateCloudly', + name: 'updateServices', taskFunction: async () => { - logger.log('info', 'now running updateCloudly task'); - this.sparkRef.sparkUpdateManager.updateServices(); + logger.log('info', 'now running updateServices task'); + await this.sparkRef.sparkUpdateManager.updateServices(); }, }); } @@ -76,14 +77,19 @@ export class SparkTaskManager { public async start() { this.taskmanager.addAndScheduleTask(this.updateServices, '30 */2 * * * *'); this.taskmanager.addAndScheduleTask(this.updateSpark, '0 * * * * *'); - this.taskmanager.addAndScheduleTask(this.updateHost, '0 0 0 * * *'); + const enableHostUpdates = await this.sparkRef.sparkConfig.kvStore.readKey('enableHostUpdates'); + if (enableHostUpdates === 'true') { + this.taskmanager.addAndScheduleTask(this.updateHost, '0 0 0 * * *'); + } else { + logger.log('info', 'host package updates are disabled. Pass --enableHostUpdates=true to enable them.'); + } this.taskmanager.start(); } /** * stops the taskmanager */ - public async stop() { + public stop() { this.taskmanager.descheduleTask(this.updateSpark); this.taskmanager.descheduleTask(this.updateHost); this.taskmanager.descheduleTask(this.updateServices); diff --git a/ts/spark.classes.updatemanager.ts b/ts/spark.classes.updatemanager.ts index 7daa59c..de36af5 100644 --- a/ts/spark.classes.updatemanager.ts +++ b/ts/spark.classes.updatemanager.ts @@ -1,8 +1,16 @@ import * as plugins from './spark.plugins.ts'; -import * as paths from './spark.paths.ts'; import { Spark } from './spark.classes.spark.ts'; import { logger } from './spark.logging.ts'; +export interface ISparkManagedService { + name: string; + image: string; + url: string; + port: string; + environment: string; + secretJson: Record; +} + /** * this class takes care of updating the services that are managed by spark */ @@ -10,18 +18,12 @@ export class SparkServicesManager { public sparkRef: Spark; public dockerHost: plugins.docker.DockerHost; public smartupdate: plugins.smartupdate.SmartUpdate; + private updateServicesRunning = false; /** * the services that are managed by spark */ - services: Array<{ - name: string; - image: string; - url: string; - port: string; - environment: string; - secretJson: any; - }> = []; + services: ISparkManagedService[] = []; constructor(sparkrefArg: Spark) { this.sparkRef = sparkrefArg; @@ -37,49 +39,65 @@ export class SparkServicesManager { } public async updateServices() { - for (const service of this.services) { - const existingService = await this.dockerHost.getServiceByName(service.name); - const existingServiceSecret = await this.dockerHost.getSecretByName(`${service.name}Secret`); - if (existingService) { - const needsUpdate: boolean = await existingService.needsUpdate(); - if (!needsUpdate) { - logger.log('info', `service >>${service.name}<< not needing update.`); - // we simply return here to end the functions - return; - } - // continuing here means we need to update the service - logger.log('ok', `${service.name} needs to be updated!`); - await existingService.remove(); - if (existingServiceSecret) { - await existingServiceSecret.remove(); - } - } - if (!existingService && existingServiceSecret) { - await existingServiceSecret.remove(); - } - - const newServiceImage = await this.dockerHost.createImageFromRegistry({ - imageUrl: service.image, - }); - const newServiceSecret = await this.dockerHost.createSecret({ - name: `${service.name}Secret`, - contentArg: plugins.smartjson.stringify(service.secretJson), - version: await newServiceImage.getVersion(), - labels: {}, - }); - - const newService = await this.dockerHost.createService({ - image: newServiceImage, - labels: {}, - name: service.name, - networkAlias: service.name, - networks: [], - secrets: [newServiceSecret], - ports: [`${service.port}:${service.secretJson.SERVEZONE_PORT}`], - }); - logger.log('ok', `updated service >>${newService.Spec.Name}<>${service.name}<< not needing update.`); + continue; + } + // continuing here means we need to update the service + logger.log('ok', `${service.name} needs to be updated!`); + await existingService.remove(); + if (existingServiceSecret) { + await existingServiceSecret.remove(); + } + } + if (!existingService && existingServiceSecret) { + await existingServiceSecret.remove(); + } - logger.log('success', `updated ${this.services.length} services!`); + const newServiceImage = await this.dockerHost.createImageFromRegistry({ + imageUrl: service.image, + }); + const newServiceSecret = await this.dockerHost.createSecret({ + name: `${service.name}Secret`, + contentArg: plugins.smartjson.stringify(service.secretJson), + version: await newServiceImage.getVersion(), + labels: {}, + }); + + const newService = await this.dockerHost.createService({ + image: newServiceImage, + labels: { + managedBy: 'spark', + }, + name: service.name, + networkAlias: service.name, + networks: [], + secrets: [newServiceSecret], + ports: [`${service.port}:${service.secretJson.SERVEZONE_PORT}`], + }); + updatedServices++; + logger.log('ok', `updated service >>${newService.Spec.Name}<>${service.name}<<: ${(error as Error).message}`); + } + } + + logger.log('success', `updated ${updatedServices} services!`); + } finally { + this.updateServicesRunning = false; + } } } diff --git a/ts/spark.cli.ts b/ts/spark.cli.ts index dfb31aa..2289e5e 100644 --- a/ts/spark.cli.ts +++ b/ts/spark.cli.ts @@ -3,15 +3,18 @@ import * as paths from './spark.paths.ts'; import { Spark } from './spark.classes.spark.ts'; import { logger } from './spark.logging.ts'; -export const runCli = async () => { +type TSparkCliArgv = Record; + +export const runCli = () => { const smartshellInstance = new plugins.smartshell.Smartshell({ executor: 'bash', }); const sparkInstance = new Spark(); const smartcliInstance = new plugins.smartcli.Smartcli(); - smartcliInstance.standardCommand().subscribe(async () => { + smartcliInstance.standardCommand().subscribe(() => { logger.log('info', 'no action specified! you can type:'); logger.log('info', '* installdaemon'); + logger.log('info', '* metrics'); }); smartcliInstance.addCommand('installdaemon').subscribe(async (argvArg) => { @@ -51,7 +54,7 @@ export const runCli = async () => { await persistDaemonOptions(sparkInstance, argvArg); // lets determine the mode if specified - let mode = argvArg.mode; + let mode = readArg(argvArg, 'mode'); if (mode === 'cloudly') { await sparkInstance.sparkConfig.kvStore.writeKey('mode', 'cloudly'); } else if (mode === 'coreflow-node') { @@ -69,7 +72,7 @@ export const runCli = async () => { Deno.exit(1); } else if (mode === 'cloudly') { sparkInstance.sparkUpdateManager.services.push({ - name: `coreflow`, + name: `cloudly`, image: `code.foss.global/serve.zone/cloudly`, url: `cloudly`, environment: `production`, @@ -104,11 +107,16 @@ export const runCli = async () => { await sparkInstance.daemonStart(); }); - smartcliInstance.addCommand('logs').subscribe(async (argvArg) => { - smartshellInstance.exec(`journalctl -u smartdaemon_spark -f`); + smartcliInstance.addCommand('metrics').subscribe(async () => { + const metrics = await sparkInstance.sparkMetricsCollector.collectMetrics(); + console.log(plugins.smartjson.stringify(metrics)); }); - smartcliInstance.addCommand('prune').subscribe(async (argvArg) => { + smartcliInstance.addCommand('logs').subscribe(async () => { + await smartshellInstance.exec(`journalctl -u smartdaemon_spark -f`); + }); + + smartcliInstance.addCommand('prune').subscribe(async () => { // daemon await smartshellInstance.exec(`systemctl stop smartdaemon_spark`); logger.log('ok', 'stopped serverconfig daemon'); @@ -150,33 +158,70 @@ export const runCli = async () => { smartcliInstance.startParse(); }; -const persistDaemonOptions = async (sparkInstance: Spark, argvArg: any) => { - const mode = argvArg.mode; +const persistDaemonOptions = async (sparkInstance: Spark, argvArg: TSparkCliArgv) => { + const mode = readArg(argvArg, 'mode'); if (mode) { await sparkInstance.sparkConfig.kvStore.writeKey('mode', mode); } - const cloudlyUrl = argvArg.cloudlyUrl || argvArg['cloudly-url'] || Deno.env.get('CLOUDLY_URL'); + const cloudlyUrl = readArg(argvArg, 'cloudlyUrl') + || readArg(argvArg, 'cloudly-url') + || Deno.env.get('CLOUDLY_URL'); if (cloudlyUrl) { await sparkInstance.sparkConfig.kvStore.writeKey('cloudlyUrl', cloudlyUrl); } - const jumpcode = argvArg.jumpcode || argvArg.jumpCode || Deno.env.get('JUMPCODE'); + const jumpcode = readArg(argvArg, 'jumpcode') || readArg(argvArg, 'jumpCode') || Deno.env.get('JUMPCODE'); if (jumpcode) { await sparkInstance.sparkConfig.kvStore.writeKey('jumpcode', jumpcode); } + + const nodeId = readArg(argvArg, 'nodeId') || readArg(argvArg, 'node-id') || Deno.env.get('SPARK_NODE_ID'); + if (nodeId) { + await sparkInstance.sparkConfig.kvStore.writeKey('nodeId', nodeId); + } + + const nodeToken = readArg(argvArg, 'nodeToken') + || readArg(argvArg, 'node-token') + || Deno.env.get('SPARK_NODE_TOKEN'); + if (nodeToken) { + await sparkInstance.sparkConfig.kvStore.writeKey('nodeToken', nodeToken); + } + + const enableHostUpdates = readArg(argvArg, 'enableHostUpdates') + || readArg(argvArg, 'enable-host-updates'); + if (typeof enableHostUpdates !== 'undefined') { + await sparkInstance.sparkConfig.kvStore.writeKey('enableHostUpdates', String(enableHostUpdates)); + } + + const heartbeatIntervalMs = readArg(argvArg, 'heartbeatIntervalMs') + || readArg(argvArg, 'heartbeat-interval-ms'); + if (heartbeatIntervalMs) { + await sparkInstance.sparkConfig.kvStore.writeKey('heartbeatIntervalMs', String(heartbeatIntervalMs)); + } }; const readOption = async ( sparkInstance: Spark, - argvArg: any, + argvArg: TSparkCliArgv, primaryKey: string, secondaryKey: string, envKey: string, ) => { - return argvArg[primaryKey] - || argvArg[secondaryKey] + return readArg(argvArg, primaryKey) + || readArg(argvArg, secondaryKey) || Deno.env.get(envKey) || await sparkInstance.sparkConfig.kvStore.readKey(primaryKey) || await sparkInstance.sparkConfig.kvStore.readKey(secondaryKey); }; + +const readArg = (argvArg: TSparkCliArgv, keyArg: string) => { + const value = argvArg[keyArg]; + if (typeof value === 'string') { + return value; + } + if (typeof value === 'boolean') { + return String(value); + } + return undefined; +};