feat(spark): add node telemetry heartbeat

This commit is contained in:
2026-05-24 12:47:05 +00:00
parent 24fc803b09
commit b6517e31f7
14 changed files with 653 additions and 90 deletions
+1
View File
@@ -10,6 +10,7 @@ import { fileURLToPath } from 'url';
import { dirname, join } from 'path';
import { existsSync } from 'fs';
import { platform, arch } from 'os';
import process from 'node:process';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
+17
View File
@@ -1,5 +1,22 @@
# Changelog
## Pending
### Features
- add Spark host telemetry and Cloudly heartbeat
- Adds local CPU, memory, disk, and Docker metrics collection
- Adds `spark metrics` for local telemetry inspection
- Adds Cloudly heartbeat reporting when node credentials are configured
### Fixes
- improve daemon service maintenance behavior
- Checks all managed services instead of returning after the first up-to-date service
- Awaits scheduled service update runs and prevents overlapping update loops
- Makes host package updates opt-in
- Corrects the Cloudly profile service name
## 2026-02-04 - 1.2.5 - fix(deps)
update Docker API usage to DockerHost facade, bump dependencies, and adjust tests/docs
+12 -3
View File
@@ -15,7 +15,10 @@ The current implementation does four main things:
- Installs and updates a `smartdaemon_spark` systemd service through `@push.rocks/smartdaemon`.
- Runs in an explicit mode: `cloudly` or `coreflow-node`.
- Activates Docker Swarm through `@apiclient.xyz/docker` when daemon mode starts.
- Schedules recurring tasks with `@push.rocks/taskbuffer` for Spark updates, host package updates, and managed Docker service updates.
- Collects host metrics for CPU, memory, disk, and Docker container counts.
- Sends Cloudly heartbeats when `cloudlyUrl`, `nodeId`, and `nodeToken` are configured.
- Schedules recurring tasks with `@push.rocks/taskbuffer` for Spark updates and managed Docker service updates.
- Optionally schedules host package updates when `--enableHostUpdates=true` has been configured.
## Installation
@@ -82,6 +85,7 @@ spark <command> [options]
| `updatedaemon` | Reload the daemon service definition for the current Spark version. |
| `asdaemon --mode cloudly` | Run the daemon loop with the Cloudly profile. |
| `asdaemon --mode coreflow-node` | Run the daemon loop with the Coreflow node profile. |
| `metrics` | Print a JSON host metrics snapshot. |
| `logs` | Follow `journalctl -u smartdaemon_spark -f`. |
| `prune` | Stop Spark, remove Docker stacks/services/secrets, remove selected networks, prune Docker, restart Docker, and restart Spark. |
@@ -89,10 +93,11 @@ spark <command> [options]
## Daemon Behavior
`Spark.daemonStart()` starts two subsystems:
`Spark.daemonStart()` starts three subsystems:
- `SparkServicesManager.start()` activates Docker Swarm.
- `SparkTaskManager.start()` schedules recurring maintenance tasks.
- `SparkHeartbeatManager.start()` sends node metrics to Cloudly when node credentials are configured.
Scheduled tasks:
@@ -100,10 +105,12 @@ Scheduled tasks:
| --- | --- | --- |
| `updateServices` | Every 2 minutes at second 30 | Checks managed Docker services and recreates them when images change. |
| `updateSpark` | Every minute | Checks for a newer Spark release and reloads the daemon after upgrade. |
| `updateHost` | Daily at midnight | Runs apt update/upgrade/autoremove/autoclean. |
| `updateHost` | Daily at midnight when enabled | Runs apt update/upgrade/autoremove/autoclean. |
The managed service list is populated by the selected mode before daemon startup. Service updates use Docker images, Docker secrets, and published port mappings.
Cloudly heartbeat requires a Spark node token. The Cloudly jump flow provisions this token and passes it to Spark as `--nodeId` and `--nodeToken` during daemon installation. Existing installations without those values continue to run service maintenance without reporting node telemetry.
## Programmatic Usage
Spark exports the main `Spark` class from `mod.ts`:
@@ -123,6 +130,8 @@ The public instance exposes:
| `sparkConfig` | persisted mode/config key-value store. |
| `sparkTaskManager` | taskbuffer scheduler and built-in maintenance tasks. |
| `sparkUpdateManager` | Docker Swarm activation and managed service update logic. |
| `sparkMetricsCollector` | Host metrics collection for local output and Cloudly heartbeat payloads. |
| `sparkHeartbeatManager` | Cloudly heartbeat client for Spark node telemetry. |
| `sparkInfo` | package metadata lookup. |
## Development
+4 -5
View File
@@ -6,21 +6,20 @@
*/
import { platform, arch } from 'os';
import { existsSync, mkdirSync, writeFileSync, chmodSync, unlinkSync } from 'fs';
import { existsSync, mkdirSync, chmodSync, unlinkSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import https from 'https';
import { pipeline } from 'stream';
import { promisify } from 'util';
import { createWriteStream } from 'fs';
import process from 'node:process';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const streamPipeline = promisify(pipeline);
// Configuration
const REPO_BASE = 'https://code.foss.global/serve.zone/spark';
const VERSION = process.env.npm_package_version || '1.2.2';
const VERSION = process.env.npm_package_version || '1.2.5';
function getBinaryInfo() {
const plat = platform();
@@ -176,7 +175,7 @@ async function main() {
try {
// Try fallback URL
await downloadFile(fallbackUrl, binaryPath);
} catch (fallbackErr) {
} catch {
console.error(`❌ Error: Failed to download binary`);
console.error(` Primary URL: ${releaseUrl}`);
console.error(` Fallback URL: ${fallbackUrl}`);
+2 -2
View File
@@ -14,10 +14,10 @@ console.log(`✅ Path operations work: ${testPath}`);
// Test basic imports from plugins
import * as smartdelay from '@push.rocks/smartdelay';
console.log('✅ @push.rocks/smartdelay import works');
console.log(`✅ @push.rocks/smartdelay import works: ${typeof smartdelay === 'object'}`);
import * as smartlog from '@push.rocks/smartlog';
console.log('✅ @push.rocks/smartlog import works');
console.log(`✅ @push.rocks/smartlog import works: ${typeof smartlog === 'object'}`);
console.log('');
console.log('Basic Deno functionality confirmed!');
+38
View File
@@ -0,0 +1,38 @@
import { assertEquals } from '@std/assert';
import {
calculateCpuUsagePercent,
parseDfOutput,
parseDockerPsOutput,
parseProcStatCpuLine,
} from '../ts/spark.classes.metricscollector.ts';
Deno.test('should parse cpu samples from proc stat', () => {
const sample = parseProcStatCpuLine('cpu 100 20 30 400 50 0 0 0 0 0\n');
assertEquals(sample, {
idle: 450,
total: 600,
});
});
Deno.test('should calculate cpu usage percent', () => {
const usage = calculateCpuUsagePercent(
{ idle: 100, total: 200 },
{ idle: 150, total: 400 },
);
assertEquals(usage, 75);
});
Deno.test('should parse df output', () => {
const disk = parseDfOutput(`Filesystem 1024-blocks Used Available Capacity Mounted on
/dev/root 10485760 2097152 8388608 20% /
`);
assertEquals(disk, {
diskUsedGB: 2,
diskAvailableGB: 8,
});
});
Deno.test('should parse docker container count', () => {
assertEquals(parseDockerPsOutput('container-a\ncontainer-b\n'), 2);
assertEquals(parseDockerPsOutput('\n'), 0);
});
+50
View File
@@ -0,0 +1,50 @@
import { assertEquals } from '@std/assert';
import { Spark } from '../ts/index.ts';
Deno.test({
name: 'should continue checking services when one service does not need an update',
async fn() {
const spark = new Spark();
const checkedServices: string[] = [];
spark.sparkUpdateManager.dockerHost = {
getServiceByName: (serviceNameArg: string) => Promise.resolve({
needsUpdate: () => {
checkedServices.push(serviceNameArg);
return Promise.resolve(false);
},
}),
getSecretByName: () => Promise.resolve(undefined),
createImageFromRegistry: () => Promise.reject(
new Error('should not pull images for up-to-date services'),
),
} as unknown as typeof spark.sparkUpdateManager.dockerHost;
spark.sparkUpdateManager.services = [
{
name: 'service-one',
image: 'example/service-one',
url: 'service-one',
port: '3000',
environment: 'test',
secretJson: {
SERVEZONE_PORT: '3000',
},
},
{
name: 'service-two',
image: 'example/service-two',
url: 'service-two',
port: '3000',
environment: 'test',
secretJson: {
SERVEZONE_PORT: '3000',
},
},
];
await spark.sparkUpdateManager.updateServices();
assertEquals(checkedServices, ['service-one', 'service-two']);
},
sanitizeResources: false,
sanitizeOps: false,
});
+3 -1
View File
@@ -1,7 +1,9 @@
export * from './spark.classes.spark.ts';
export * from './spark.classes.metricscollector.ts';
export * from './spark.classes.heartbeatmanager.ts';
import * as cli from './spark.cli.ts';
export const runCli = async () => {
cli.runCli();
await cli.runCli();
};
+182
View File
@@ -0,0 +1,182 @@
import type { Spark } from './spark.classes.spark.ts';
import { logger } from './spark.logging.ts';
export class SparkHeartbeatManager {
public sparkRef: Spark;
private heartbeatTimer?: number;
private cloudlyConnectionStatus: TSparkCloudlyConnectionStatus = 'not-configured';
constructor(sparkRefArg: Spark) {
this.sparkRef = sparkRefArg;
}
public async start() {
const config = await this.readConfig();
if (!config.cloudlyUrl || !config.nodeId || !config.nodeToken) {
this.cloudlyConnectionStatus = 'not-configured';
logger.log('info', 'Spark heartbeat is not configured. nodeId/nodeToken/cloudlyUrl are required.');
return;
}
await this.sendHeartbeatOnce().catch((errorArg) => {
logger.log('warn', `initial Spark heartbeat failed: ${(errorArg as Error).message}`);
});
const intervalMs = await this.getHeartbeatIntervalMs();
this.heartbeatTimer = setInterval(() => {
this.sendHeartbeatOnce().catch((errorArg) => {
logger.log('warn', `Spark heartbeat failed: ${(errorArg as Error).message}`);
});
}, intervalMs);
}
public stop() {
if (this.heartbeatTimer !== undefined) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
}
}
public async sendHeartbeatOnce(): Promise<ISparkHeartbeatResponse> {
const config = await this.readConfig();
if (!config.cloudlyUrl || !config.nodeId || !config.nodeToken) {
this.cloudlyConnectionStatus = 'not-configured';
return {
accepted: false,
message: 'Spark heartbeat is not configured',
};
}
this.cloudlyConnectionStatus = 'connecting';
const [metrics, runtimeInfo] = await Promise.all([
this.sparkRef.sparkMetricsCollector.collectMetrics(),
this.getRuntimeInfo(config),
]);
const response = await this.postJson<ISparkHeartbeatResponse>(
config.cloudlyUrl,
'/spark/v1/nodes/heartbeat',
{
nodeId: config.nodeId,
nodeToken: config.nodeToken,
metrics,
runtimeInfo,
},
);
if (!response.accepted) {
this.cloudlyConnectionStatus = 'failed';
throw new Error(response.message || 'Cloudly rejected Spark heartbeat');
}
this.cloudlyConnectionStatus = 'connected';
return response;
}
private async getRuntimeInfo(configArg: ISparkHeartbeatConfig): Promise<ISparkNodeRuntimeInfo> {
const dockerInfo = await this.sparkRef.sparkMetricsCollector.collectDockerInfo();
const mode = await this.getMode();
return {
runtime: 'spark',
nodeId: configArg.nodeId!,
mode,
hostname: Deno.hostname(),
platform: Deno.build.os,
arch: Deno.build.arch,
osRelease: this.getOsRelease(),
sparkVersion: this.sparkRef.sparkInfo.projectInfo.version,
cloudlyUrl: configArg.cloudlyUrl,
cloudlyConnectionStatus: this.cloudlyConnectionStatus,
dockerAvailable: dockerInfo.dockerAvailable,
swarmNodeId: dockerInfo.swarmNodeId,
checkedAt: Date.now(),
};
}
private async getMode(): Promise<TSparkNodeMode | undefined> {
const mode = await this.sparkRef.sparkConfig.kvStore.readKey('mode');
if (mode === 'cloudly' || mode === 'coreflow-node') {
return mode;
}
return undefined;
}
private getOsRelease(): string | undefined {
try {
return Deno.osRelease();
} catch {
return undefined;
}
}
private async getHeartbeatIntervalMs() {
const configuredInterval = Number(
await this.sparkRef.sparkConfig.kvStore.readKey('heartbeatIntervalMs'),
);
if (Number.isFinite(configuredInterval) && configuredInterval >= 5000) {
return configuredInterval;
}
return 60000;
}
private async readConfig(): Promise<ISparkHeartbeatConfig> {
return {
cloudlyUrl: await this.sparkRef.sparkConfig.kvStore.readKey('cloudlyUrl'),
nodeId: await this.sparkRef.sparkConfig.kvStore.readKey('nodeId'),
nodeToken: await this.sparkRef.sparkConfig.kvStore.readKey('nodeToken'),
};
}
private async postJson<TResponse>(
cloudlyUrlArg: string,
pathArg: string,
bodyArg: Record<string, unknown>,
): Promise<TResponse> {
const url = new URL(
pathArg,
cloudlyUrlArg.endsWith('/') ? cloudlyUrlArg : `${cloudlyUrlArg}/`,
);
const response = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify(bodyArg),
});
if (!response.ok) {
throw new Error(`Cloudly request failed: ${pathArg} -> HTTP ${response.status}`);
}
return await response.json() as TResponse;
}
}
interface ISparkHeartbeatConfig {
cloudlyUrl?: string;
nodeId?: string;
nodeToken?: string;
}
type TSparkNodeMode = 'cloudly' | 'coreflow-node';
type TSparkCloudlyConnectionStatus = 'not-configured' | 'connecting' | 'connected' | 'failed';
interface ISparkNodeRuntimeInfo {
runtime: 'spark';
nodeId: string;
mode?: TSparkNodeMode;
hostname?: string;
platform: string;
arch: string;
osRelease?: string;
sparkVersion: string;
cloudlyUrl?: string;
cloudlyConnectionStatus: TSparkCloudlyConnectionStatus;
dockerAvailable: boolean;
swarmNodeId?: string;
checkedAt: number;
lastError?: string;
}
interface ISparkHeartbeatResponse {
accepted: boolean;
message?: string;
}
+190
View File
@@ -0,0 +1,190 @@
import * as plugins from './spark.plugins.ts';
export interface ICpuSample {
idle: number;
total: number;
}
export interface IDiskSample {
diskUsedGB: number;
diskAvailableGB: number;
}
export interface IDockerRuntimeInfo {
dockerAvailable: boolean;
containerCount: number;
swarmNodeId?: string;
}
interface ICommandResult {
success: boolean;
stdout: string;
}
export class SparkMetricsCollector {
private previousCpuSample?: ICpuSample;
public async collectMetrics(): Promise<plugins.servezoneInterfaces.data.IClusterNodeMetrics> {
const [cpuUsagePercent, memoryInfo, diskInfo, dockerInfo] = await Promise.all([
this.collectCpuUsagePercent(),
this.collectMemoryInfo(),
this.collectDiskInfo(),
this.collectDockerInfo(),
]);
return {
cpuUsagePercent,
memoryUsedMB: memoryInfo.memoryUsedMB,
memoryAvailableMB: memoryInfo.memoryAvailableMB,
diskUsedGB: diskInfo.diskUsedGB,
diskAvailableGB: diskInfo.diskAvailableGB,
containerCount: dockerInfo.containerCount,
timestamp: Date.now(),
};
}
public async collectDockerInfo(): Promise<IDockerRuntimeInfo> {
const containerResult = await this.runCommand('docker', ['ps', '-q']);
const swarmNodeResult = await this.runCommand('docker', ['info', '--format', '{{.Swarm.NodeID}}']);
const rawSwarmNodeId = swarmNodeResult.stdout.trim();
const swarmNodeId = rawSwarmNodeId === '<no value>' ? '' : rawSwarmNodeId;
return {
dockerAvailable: containerResult.success,
containerCount: containerResult.success ? parseDockerPsOutput(containerResult.stdout) : 0,
swarmNodeId: swarmNodeId || undefined,
};
}
private async collectCpuUsagePercent(): Promise<number> {
const currentSample = await this.readCpuSample();
if (!currentSample) {
return 0;
}
if (!this.previousCpuSample) {
this.previousCpuSample = currentSample;
await new Promise((resolve) => setTimeout(resolve, 250));
const nextSample = await this.readCpuSample();
if (!nextSample) {
return 0;
}
this.previousCpuSample = nextSample;
return calculateCpuUsagePercent(currentSample, nextSample);
}
const cpuUsagePercent = calculateCpuUsagePercent(this.previousCpuSample, currentSample);
this.previousCpuSample = currentSample;
return cpuUsagePercent;
}
private collectMemoryInfo() {
const systemMemoryInfo = Deno.systemMemoryInfo();
const memoryUsedBytes = Math.max(
systemMemoryInfo.total - systemMemoryInfo.available,
0,
);
return {
memoryUsedMB: bytesToMegabytes(memoryUsedBytes),
memoryAvailableMB: bytesToMegabytes(systemMemoryInfo.available),
};
}
private async collectDiskInfo(): Promise<IDiskSample> {
const dfOutput = await this.runCommand('df', ['-Pk', '/']);
return parseDfOutput(dfOutput.stdout);
}
private async readCpuSample(): Promise<ICpuSample | undefined> {
try {
return parseProcStatCpuLine(await Deno.readTextFile('/proc/stat'));
} catch {
return undefined;
}
}
private async runCommand(commandArg: string, argsArg: string[]): Promise<ICommandResult> {
try {
const command = new Deno.Command(commandArg, {
args: argsArg,
stdout: 'piped',
stderr: 'null',
});
const output = await command.output();
return {
success: output.success,
stdout: new TextDecoder().decode(output.stdout),
};
} catch {
return {
success: false,
stdout: '',
};
}
}
}
export const parseProcStatCpuLine = (procStatTextArg: string): ICpuSample | undefined => {
const cpuLine = procStatTextArg.split('\n').find((lineArg) => lineArg.startsWith('cpu '));
if (!cpuLine) {
return undefined;
}
const values = cpuLine.trim().split(/\s+/).slice(1).map((valueArg) => Number(valueArg));
if (values.some((valueArg) => !Number.isFinite(valueArg))) {
return undefined;
}
const idle = (values[3] || 0) + (values[4] || 0);
const total = values.reduce((sumArg, valueArg) => sumArg + valueArg, 0);
return { idle, total };
};
export const calculateCpuUsagePercent = (previousArg: ICpuSample, currentArg: ICpuSample): number => {
const idleDelta = currentArg.idle - previousArg.idle;
const totalDelta = currentArg.total - previousArg.total;
if (totalDelta <= 0) {
return 0;
}
return roundMetric(((totalDelta - idleDelta) / totalDelta) * 100);
};
export const parseDfOutput = (dfOutputArg: string): IDiskSample => {
const dataLine = dfOutputArg.trim().split('\n')[1];
if (!dataLine) {
return {
diskUsedGB: 0,
diskAvailableGB: 0,
};
}
const parts = dataLine.trim().split(/\s+/);
const usedKilobytes = Number(parts[2]);
const availableKilobytes = Number(parts[3]);
return {
diskUsedGB: kilobytesToGigabytes(usedKilobytes),
diskAvailableGB: kilobytesToGigabytes(availableKilobytes),
};
};
export const parseDockerPsOutput = (dockerPsOutputArg: string): number => {
if (!dockerPsOutputArg.trim()) {
return 0;
}
return dockerPsOutputArg.trim().split('\n').filter(Boolean).length;
};
const bytesToMegabytes = (bytesArg: number): number => {
return roundMetric(bytesArg / 1024 / 1024);
};
const kilobytesToGigabytes = (kilobytesArg: number): number => {
if (!Number.isFinite(kilobytesArg)) {
return 0;
}
return roundMetric(kilobytesArg / 1024 / 1024);
};
const roundMetric = (valueArg: number): number => {
if (!Number.isFinite(valueArg)) {
return 0;
}
return Math.round(valueArg * 100) / 100;
};
+8 -2
View File
@@ -2,8 +2,9 @@ import * as plugins from './spark.plugins.ts';
import { SparkTaskManager } from './spark.classes.taskmanager.ts';
import { SparkInfo } from './spark.classes.info.ts';
import { SparkServicesManager } from './spark.classes.updatemanager.ts';
import { logger } from './spark.logging.ts';
import { SparkConfig } from './spark.classes.config.ts';
import { SparkMetricsCollector } from './spark.classes.metricscollector.ts';
import { SparkHeartbeatManager } from './spark.classes.heartbeatmanager.ts';
export class Spark {
public smartdaemon: plugins.smartdaemon.SmartDaemon;
@@ -11,6 +12,8 @@ export class Spark {
public sparkTaskManager: SparkTaskManager;
public sparkInfo: SparkInfo;
public sparkUpdateManager: SparkServicesManager;
public sparkMetricsCollector: SparkMetricsCollector;
public sparkHeartbeatManager: SparkHeartbeatManager;
constructor() {
this.smartdaemon = new plugins.smartdaemon.SmartDaemon();
@@ -18,10 +21,13 @@ export class Spark {
this.sparkInfo = new SparkInfo(this);
this.sparkTaskManager = new SparkTaskManager(this);
this.sparkUpdateManager = new SparkServicesManager(this);
this.sparkMetricsCollector = new SparkMetricsCollector();
this.sparkHeartbeatManager = new SparkHeartbeatManager(this);
}
public async daemonStart() {
await this.sparkUpdateManager.start();
this.sparkTaskManager.start();
await this.sparkTaskManager.start();
await this.sparkHeartbeatManager.start();
}
}
+15 -9
View File
@@ -1,6 +1,5 @@
import * as plugins from './spark.plugins.ts';
import { Spark } from './index.ts';
import * as paths from './spark.paths.ts';
import { logger } from './spark.logging.ts';
export class SparkTaskManager {
@@ -30,12 +29,14 @@ export class SparkTaskManager {
this.sparkRef.sparkInfo.projectInfo.version
);
if (shouldUpdate) {
await this.stop();
this.stop();
const smartshellInstance = new plugins.smartshell.Smartshell({
executor: 'bash',
});
await smartshellInstance.exec(`cd / && npm upgrade -g && spark updatedaemon`);
await smartshellInstance.exec(
`cd / && pnpm add --global @serve.zone/spark@latest && spark updatedaemon`,
);
logger.log('info', 'Cooling off before restart...');
await plugins.smartdelay.delayFor(5000);
logger.log('ok', '######## Trying to exit / Restart expected... ########');
@@ -47,7 +48,7 @@ export class SparkTaskManager {
this.updateHost = new plugins.taskbuffer.Task({
name: 'updateHost',
taskFunction: async () => {
await this.stop();
this.stop();
const smartshellInstance = new plugins.smartshell.Smartshell({
executor: 'bash',
});
@@ -62,10 +63,10 @@ export class SparkTaskManager {
* only being run when mode is cloudly
*/
this.updateServices = new plugins.taskbuffer.Task({
name: 'updateCloudly',
name: 'updateServices',
taskFunction: async () => {
logger.log('info', 'now running updateCloudly task');
this.sparkRef.sparkUpdateManager.updateServices();
logger.log('info', 'now running updateServices task');
await this.sparkRef.sparkUpdateManager.updateServices();
},
});
}
@@ -76,14 +77,19 @@ export class SparkTaskManager {
public async start() {
this.taskmanager.addAndScheduleTask(this.updateServices, '30 */2 * * * *');
this.taskmanager.addAndScheduleTask(this.updateSpark, '0 * * * * *');
this.taskmanager.addAndScheduleTask(this.updateHost, '0 0 0 * * *');
const enableHostUpdates = await this.sparkRef.sparkConfig.kvStore.readKey('enableHostUpdates');
if (enableHostUpdates === 'true') {
this.taskmanager.addAndScheduleTask(this.updateHost, '0 0 0 * * *');
} else {
logger.log('info', 'host package updates are disabled. Pass --enableHostUpdates=true to enable them.');
}
this.taskmanager.start();
}
/**
* stops the taskmanager
*/
public async stop() {
public stop() {
this.taskmanager.descheduleTask(this.updateSpark);
this.taskmanager.descheduleTask(this.updateHost);
this.taskmanager.descheduleTask(this.updateServices);
+69 -51
View File
@@ -1,8 +1,16 @@
import * as plugins from './spark.plugins.ts';
import * as paths from './spark.paths.ts';
import { Spark } from './spark.classes.spark.ts';
import { logger } from './spark.logging.ts';
export interface ISparkManagedService {
name: string;
image: string;
url: string;
port: string;
environment: string;
secretJson: Record<string, unknown>;
}
/**
* this class takes care of updating the services that are managed by spark
*/
@@ -10,18 +18,12 @@ export class SparkServicesManager {
public sparkRef: Spark;
public dockerHost: plugins.docker.DockerHost;
public smartupdate: plugins.smartupdate.SmartUpdate;
private updateServicesRunning = false;
/**
* the services that are managed by spark
*/
services: Array<{
name: string;
image: string;
url: string;
port: string;
environment: string;
secretJson: any;
}> = [];
services: ISparkManagedService[] = [];
constructor(sparkrefArg: Spark) {
this.sparkRef = sparkrefArg;
@@ -37,49 +39,65 @@ export class SparkServicesManager {
}
public async updateServices() {
for (const service of this.services) {
const existingService = await this.dockerHost.getServiceByName(service.name);
const existingServiceSecret = await this.dockerHost.getSecretByName(`${service.name}Secret`);
if (existingService) {
const needsUpdate: boolean = await existingService.needsUpdate();
if (!needsUpdate) {
logger.log('info', `service >>${service.name}<< not needing update.`);
// we simply return here to end the functions
return;
}
// continuing here means we need to update the service
logger.log('ok', `${service.name} needs to be updated!`);
await existingService.remove();
if (existingServiceSecret) {
await existingServiceSecret.remove();
}
}
if (!existingService && existingServiceSecret) {
await existingServiceSecret.remove();
}
const newServiceImage = await this.dockerHost.createImageFromRegistry({
imageUrl: service.image,
});
const newServiceSecret = await this.dockerHost.createSecret({
name: `${service.name}Secret`,
contentArg: plugins.smartjson.stringify(service.secretJson),
version: await newServiceImage.getVersion(),
labels: {},
});
const newService = await this.dockerHost.createService({
image: newServiceImage,
labels: {},
name: service.name,
networkAlias: service.name,
networks: [],
secrets: [newServiceSecret],
ports: [`${service.port}:${service.secretJson.SERVEZONE_PORT}`],
});
logger.log('ok', `updated service >>${newService.Spec.Name}<<!`);
if (this.updateServicesRunning) {
logger.log('info', 'service update already running, skipping overlapping run.');
return;
}
this.updateServicesRunning = true;
let updatedServices = 0;
try {
for (const service of this.services) {
try {
const existingService = await this.dockerHost.getServiceByName(service.name);
const existingServiceSecret = await this.dockerHost.getSecretByName(`${service.name}Secret`);
if (existingService) {
const needsUpdate: boolean = await existingService.needsUpdate();
if (!needsUpdate) {
logger.log('info', `service >>${service.name}<< not needing update.`);
continue;
}
// continuing here means we need to update the service
logger.log('ok', `${service.name} needs to be updated!`);
await existingService.remove();
if (existingServiceSecret) {
await existingServiceSecret.remove();
}
}
if (!existingService && existingServiceSecret) {
await existingServiceSecret.remove();
}
logger.log('success', `updated ${this.services.length} services!`);
const newServiceImage = await this.dockerHost.createImageFromRegistry({
imageUrl: service.image,
});
const newServiceSecret = await this.dockerHost.createSecret({
name: `${service.name}Secret`,
contentArg: plugins.smartjson.stringify(service.secretJson),
version: await newServiceImage.getVersion(),
labels: {},
});
const newService = await this.dockerHost.createService({
image: newServiceImage,
labels: {
managedBy: 'spark',
},
name: service.name,
networkAlias: service.name,
networks: [],
secrets: [newServiceSecret],
ports: [`${service.port}:${service.secretJson.SERVEZONE_PORT}`],
});
updatedServices++;
logger.log('ok', `updated service >>${newService.Spec.Name}<<!`);
} catch (error) {
logger.log('error', `failed updating service >>${service.name}<<: ${(error as Error).message}`);
}
}
logger.log('success', `updated ${updatedServices} services!`);
} finally {
this.updateServicesRunning = false;
}
}
}
+59 -14
View File
@@ -3,15 +3,18 @@ import * as paths from './spark.paths.ts';
import { Spark } from './spark.classes.spark.ts';
import { logger } from './spark.logging.ts';
export const runCli = async () => {
type TSparkCliArgv = Record<string, string | boolean | undefined>;
export const runCli = () => {
const smartshellInstance = new plugins.smartshell.Smartshell({
executor: 'bash',
});
const sparkInstance = new Spark();
const smartcliInstance = new plugins.smartcli.Smartcli();
smartcliInstance.standardCommand().subscribe(async () => {
smartcliInstance.standardCommand().subscribe(() => {
logger.log('info', 'no action specified! you can type:');
logger.log('info', '* installdaemon');
logger.log('info', '* metrics');
});
smartcliInstance.addCommand('installdaemon').subscribe(async (argvArg) => {
@@ -51,7 +54,7 @@ export const runCli = async () => {
await persistDaemonOptions(sparkInstance, argvArg);
// lets determine the mode if specified
let mode = argvArg.mode;
let mode = readArg(argvArg, 'mode');
if (mode === 'cloudly') {
await sparkInstance.sparkConfig.kvStore.writeKey('mode', 'cloudly');
} else if (mode === 'coreflow-node') {
@@ -69,7 +72,7 @@ export const runCli = async () => {
Deno.exit(1);
} else if (mode === 'cloudly') {
sparkInstance.sparkUpdateManager.services.push({
name: `coreflow`,
name: `cloudly`,
image: `code.foss.global/serve.zone/cloudly`,
url: `cloudly`,
environment: `production`,
@@ -104,11 +107,16 @@ export const runCli = async () => {
await sparkInstance.daemonStart();
});
smartcliInstance.addCommand('logs').subscribe(async (argvArg) => {
smartshellInstance.exec(`journalctl -u smartdaemon_spark -f`);
smartcliInstance.addCommand('metrics').subscribe(async () => {
const metrics = await sparkInstance.sparkMetricsCollector.collectMetrics();
console.log(plugins.smartjson.stringify(metrics));
});
smartcliInstance.addCommand('prune').subscribe(async (argvArg) => {
smartcliInstance.addCommand('logs').subscribe(async () => {
await smartshellInstance.exec(`journalctl -u smartdaemon_spark -f`);
});
smartcliInstance.addCommand('prune').subscribe(async () => {
// daemon
await smartshellInstance.exec(`systemctl stop smartdaemon_spark`);
logger.log('ok', 'stopped serverconfig daemon');
@@ -150,33 +158,70 @@ export const runCli = async () => {
smartcliInstance.startParse();
};
const persistDaemonOptions = async (sparkInstance: Spark, argvArg: any) => {
const mode = argvArg.mode;
const persistDaemonOptions = async (sparkInstance: Spark, argvArg: TSparkCliArgv) => {
const mode = readArg(argvArg, 'mode');
if (mode) {
await sparkInstance.sparkConfig.kvStore.writeKey('mode', mode);
}
const cloudlyUrl = argvArg.cloudlyUrl || argvArg['cloudly-url'] || Deno.env.get('CLOUDLY_URL');
const cloudlyUrl = readArg(argvArg, 'cloudlyUrl')
|| readArg(argvArg, 'cloudly-url')
|| Deno.env.get('CLOUDLY_URL');
if (cloudlyUrl) {
await sparkInstance.sparkConfig.kvStore.writeKey('cloudlyUrl', cloudlyUrl);
}
const jumpcode = argvArg.jumpcode || argvArg.jumpCode || Deno.env.get('JUMPCODE');
const jumpcode = readArg(argvArg, 'jumpcode') || readArg(argvArg, 'jumpCode') || Deno.env.get('JUMPCODE');
if (jumpcode) {
await sparkInstance.sparkConfig.kvStore.writeKey('jumpcode', jumpcode);
}
const nodeId = readArg(argvArg, 'nodeId') || readArg(argvArg, 'node-id') || Deno.env.get('SPARK_NODE_ID');
if (nodeId) {
await sparkInstance.sparkConfig.kvStore.writeKey('nodeId', nodeId);
}
const nodeToken = readArg(argvArg, 'nodeToken')
|| readArg(argvArg, 'node-token')
|| Deno.env.get('SPARK_NODE_TOKEN');
if (nodeToken) {
await sparkInstance.sparkConfig.kvStore.writeKey('nodeToken', nodeToken);
}
const enableHostUpdates = readArg(argvArg, 'enableHostUpdates')
|| readArg(argvArg, 'enable-host-updates');
if (typeof enableHostUpdates !== 'undefined') {
await sparkInstance.sparkConfig.kvStore.writeKey('enableHostUpdates', String(enableHostUpdates));
}
const heartbeatIntervalMs = readArg(argvArg, 'heartbeatIntervalMs')
|| readArg(argvArg, 'heartbeat-interval-ms');
if (heartbeatIntervalMs) {
await sparkInstance.sparkConfig.kvStore.writeKey('heartbeatIntervalMs', String(heartbeatIntervalMs));
}
};
const readOption = async (
sparkInstance: Spark,
argvArg: any,
argvArg: TSparkCliArgv,
primaryKey: string,
secondaryKey: string,
envKey: string,
) => {
return argvArg[primaryKey]
|| argvArg[secondaryKey]
return readArg(argvArg, primaryKey)
|| readArg(argvArg, secondaryKey)
|| Deno.env.get(envKey)
|| await sparkInstance.sparkConfig.kvStore.readKey(primaryKey)
|| await sparkInstance.sparkConfig.kvStore.readKey(secondaryKey);
};
const readArg = (argvArg: TSparkCliArgv, keyArg: string) => {
const value = argvArg[keyArg];
if (typeof value === 'string') {
return value;
}
if (typeof value === 'boolean') {
return String(value);
}
return undefined;
};