255 lines
8.9 KiB
TypeScript
255 lines
8.9 KiB
TypeScript
import { CloudlyConnector } from './classes.cloudlyconnector.ts';
|
|
import { SupervisorClient } from './classes.supervisorclient.ts';
|
|
import type {
|
|
IBaseOsRuntimeInfo,
|
|
IBaseOsDesiredState,
|
|
IBaseRunnerConfig,
|
|
IBaseRunnerState,
|
|
TCloudlyConnectionStatus,
|
|
} from './types.ts';
|
|
|
|
const defaultStatePath = '/data/baseos/state.json';
|
|
|
|
export class BaseRunner {
|
|
public readonly config: IBaseRunnerConfig;
|
|
public readonly supervisorClient: SupervisorClient;
|
|
public readonly cloudlyConnector: CloudlyConnector;
|
|
|
|
private state?: IBaseRunnerState;
|
|
private heartbeatTimer: number | undefined;
|
|
private cloudlyConnectionStatus: TCloudlyConnectionStatus = 'not-configured';
|
|
|
|
public static fromEnv() {
|
|
return new BaseRunner({
|
|
cloudlyUrl: Deno.env.get('BASEOS_CLOUDLY_URL') || undefined,
|
|
joinToken: Deno.env.get('BASEOS_JOIN_TOKEN') || undefined,
|
|
nodeToken: Deno.env.get('BASEOS_NODE_TOKEN') || undefined,
|
|
nodeId: Deno.env.get('BASEOS_NODE_ID') || undefined,
|
|
statePath: Deno.env.get('BASEOS_STATE_PATH') || defaultStatePath,
|
|
heartbeatIntervalMs: Number(Deno.env.get('BASEOS_HEARTBEAT_INTERVAL_MS') || '60000'),
|
|
supervisorAddress: Deno.env.get('BALENA_SUPERVISOR_ADDRESS') || undefined,
|
|
supervisorApiKey: Deno.env.get('BALENA_SUPERVISOR_API_KEY') || undefined,
|
|
preloadTargetStatePath: Deno.env.get('BASEOS_PRELOAD_TARGET_STATE_PATH') || undefined,
|
|
});
|
|
}
|
|
|
|
constructor(configArg: IBaseRunnerConfig) {
|
|
this.config = configArg;
|
|
this.supervisorClient = new SupervisorClient({
|
|
address: configArg.supervisorAddress,
|
|
apiKey: configArg.supervisorApiKey,
|
|
});
|
|
this.cloudlyConnector = new CloudlyConnector({
|
|
cloudlyUrl: configArg.cloudlyUrl,
|
|
joinToken: configArg.joinToken,
|
|
nodeToken: configArg.nodeToken,
|
|
});
|
|
if (this.cloudlyConnector.isConfigured()) {
|
|
this.cloudlyConnectionStatus = 'connecting';
|
|
}
|
|
}
|
|
|
|
public async init() {
|
|
this.state = await this.loadState();
|
|
if (!this.state) {
|
|
this.state = {
|
|
nodeId: this.config.nodeId || crypto.randomUUID(),
|
|
nodeToken: this.config.nodeToken,
|
|
createdAt: Date.now(),
|
|
updatedAt: Date.now(),
|
|
};
|
|
await this.saveState();
|
|
}
|
|
this.cloudlyConnector.setNodeToken(this.state.nodeToken || this.config.nodeToken);
|
|
}
|
|
|
|
public async start() {
|
|
await this.init();
|
|
await this.syncCloudlyOnce();
|
|
this.heartbeatTimer = setInterval(async () => {
|
|
await this.syncCloudlyOnce().catch((errorArg) => {
|
|
console.error(`Cloudly heartbeat failed: ${(errorArg as Error).message}`);
|
|
});
|
|
}, this.config.heartbeatIntervalMs);
|
|
console.log(`BaseRunner started for node ${this.state?.nodeId}`);
|
|
}
|
|
|
|
public stop() {
|
|
if (this.heartbeatTimer !== undefined) {
|
|
clearInterval(this.heartbeatTimer);
|
|
this.heartbeatTimer = undefined;
|
|
}
|
|
}
|
|
|
|
public async getRuntimeInfo(): Promise<IBaseOsRuntimeInfo> {
|
|
await this.ensureInitialized();
|
|
const supervisorAvailable = await this.supervisorClient.isAvailable();
|
|
const info: IBaseOsRuntimeInfo = {
|
|
runtime: 'baseos',
|
|
runtimeLevel: 'app-layer',
|
|
nodeId: this.state!.nodeId,
|
|
cloudlyUrl: this.config.cloudlyUrl,
|
|
cloudlyConnectionStatus: this.cloudlyConnector.isConfigured()
|
|
? this.cloudlyConnectionStatus
|
|
: 'not-configured',
|
|
supervisorAvailable,
|
|
supervisorAddress: this.supervisorClient.address,
|
|
checkedAt: Date.now(),
|
|
};
|
|
if (supervisorAvailable) {
|
|
try {
|
|
info.deviceState = await this.supervisorClient.getDeviceState();
|
|
} catch (errorArg) {
|
|
console.warn(`Could not read Balena device state: ${(errorArg as Error).message}`);
|
|
}
|
|
try {
|
|
info.stateStatus = await this.supervisorClient.getStateStatus();
|
|
} catch (errorArg) {
|
|
console.warn(`Could not read Balena state status: ${(errorArg as Error).message}`);
|
|
}
|
|
}
|
|
return info;
|
|
}
|
|
|
|
public async requestSelfUpdate(optionsArg: { force?: boolean; cancel?: boolean } = {}) {
|
|
if (!(await this.supervisorClient.isAvailable())) {
|
|
throw new Error('Balena Supervisor API is not available');
|
|
}
|
|
await this.supervisorClient.triggerUpdate(optionsArg);
|
|
}
|
|
|
|
private async syncCloudlyOnce() {
|
|
await this.ensureInitialized();
|
|
await this.applyPreloadTargetStateIfPresent();
|
|
if (!this.cloudlyConnector.isConfigured()) {
|
|
this.cloudlyConnectionStatus = 'not-configured';
|
|
return;
|
|
}
|
|
const status = await this.getRuntimeInfo();
|
|
this.cloudlyConnectionStatus = 'connecting';
|
|
if (!this.state!.nodeToken) {
|
|
const result = await this.cloudlyConnector.registerNode(status);
|
|
if (result.accepted && result.nodeToken) {
|
|
this.state!.nodeToken = result.nodeToken;
|
|
this.state!.updatedAt = Date.now();
|
|
this.cloudlyConnector.setNodeToken(result.nodeToken);
|
|
await this.saveState();
|
|
} else if (!result.accepted) {
|
|
throw new Error(result.message || 'Cloudly did not accept node registration');
|
|
}
|
|
await this.applyDesiredState(result.desiredState);
|
|
} else {
|
|
const result = await this.cloudlyConnector.sendHeartbeat(status);
|
|
if (!result.accepted) {
|
|
throw new Error(result.message || 'Cloudly did not accept node heartbeat');
|
|
}
|
|
await this.applyDesiredState(result.desiredState);
|
|
}
|
|
this.cloudlyConnectionStatus = 'connected';
|
|
}
|
|
|
|
private async applyPreloadTargetStateIfPresent() {
|
|
if (!this.config.preloadTargetStatePath) {
|
|
return;
|
|
}
|
|
let preloadText: string;
|
|
try {
|
|
preloadText = await Deno.readTextFile(this.config.preloadTargetStatePath);
|
|
} catch (errorArg) {
|
|
if (errorArg instanceof Deno.errors.NotFound) {
|
|
return;
|
|
}
|
|
throw errorArg;
|
|
}
|
|
const targetState = this.normalizeTargetState(JSON.parse(preloadText) as Record<string, unknown>);
|
|
await this.applyTargetState(targetState, 'preload target state');
|
|
}
|
|
|
|
private async applyDesiredState(desiredStateArg?: IBaseOsDesiredState) {
|
|
if (!desiredStateArg) {
|
|
return;
|
|
}
|
|
if (desiredStateArg.targetState) {
|
|
await this.applyTargetState(desiredStateArg.targetState, 'Cloudly desired target state');
|
|
}
|
|
if (desiredStateArg.release && desiredStateArg.release !== this.state!.lastRelease) {
|
|
if (!(await this.supervisorClient.isAvailable())) {
|
|
console.warn('Skipping desired release update because Balena Supervisor API is unavailable');
|
|
return;
|
|
}
|
|
await this.supervisorClient.triggerUpdate({ force: true });
|
|
this.state!.lastRelease = desiredStateArg.release;
|
|
this.state!.lastReleaseUpdateTriggeredAt = Date.now();
|
|
this.state!.updatedAt = Date.now();
|
|
await this.saveState();
|
|
}
|
|
}
|
|
|
|
private async applyTargetState(targetStateArg: Record<string, unknown>, sourceArg: string) {
|
|
const targetStateHash = await this.hashJson(targetStateArg);
|
|
if (this.state!.lastTargetStateHash === targetStateHash) {
|
|
return;
|
|
}
|
|
if (!(await this.supervisorClient.isAvailable())) {
|
|
console.warn(`Skipping ${sourceArg} because Balena Supervisor API is unavailable`);
|
|
return;
|
|
}
|
|
await this.supervisorClient.setLocalTargetState(targetStateArg);
|
|
this.state!.lastTargetStateHash = targetStateHash;
|
|
this.state!.lastTargetStateAppliedAt = Date.now();
|
|
this.state!.updatedAt = Date.now();
|
|
await this.saveState();
|
|
console.log(`Applied ${sourceArg}`);
|
|
}
|
|
|
|
private normalizeTargetState(targetStateArg: Record<string, unknown>) {
|
|
if (this.isRecord(targetStateArg.local)) {
|
|
return targetStateArg;
|
|
}
|
|
if (this.isRecord(targetStateArg.targetState)) {
|
|
return targetStateArg.targetState;
|
|
}
|
|
throw new Error('BaseOS target state must contain a local target state object');
|
|
}
|
|
|
|
private async hashJson(valueArg: unknown) {
|
|
const encoded = new TextEncoder().encode(JSON.stringify(valueArg));
|
|
const digest = await crypto.subtle.digest('SHA-256', encoded);
|
|
return Array.from(new Uint8Array(digest))
|
|
.map((byteArg) => byteArg.toString(16).padStart(2, '0'))
|
|
.join('');
|
|
}
|
|
|
|
private isRecord(valueArg: unknown): valueArg is Record<string, unknown> {
|
|
return Boolean(valueArg) && typeof valueArg === 'object' && !Array.isArray(valueArg);
|
|
}
|
|
|
|
private async ensureInitialized() {
|
|
if (!this.state) {
|
|
await this.init();
|
|
}
|
|
}
|
|
|
|
private async loadState(): Promise<IBaseRunnerState | undefined> {
|
|
try {
|
|
const text = await Deno.readTextFile(this.config.statePath);
|
|
return JSON.parse(text) as IBaseRunnerState;
|
|
} catch (errorArg) {
|
|
if (errorArg instanceof Deno.errors.NotFound) {
|
|
return undefined;
|
|
}
|
|
throw errorArg;
|
|
}
|
|
}
|
|
|
|
private async saveState() {
|
|
if (!this.state) {
|
|
return;
|
|
}
|
|
const stateUrl = new URL(`file://${this.config.statePath}`);
|
|
const directory = stateUrl.pathname.split('/').slice(0, -1).join('/') || '/';
|
|
await Deno.mkdir(directory, { recursive: true });
|
|
await Deno.writeTextFile(this.config.statePath, `${JSON.stringify(this.state, null, 2)}\n`);
|
|
}
|
|
}
|