feat: orchestrate service backups
This commit is contained in:
@@ -0,0 +1,320 @@
|
||||
import type { Cloudly } from '../classes.cloudly.js';
|
||||
import * as plugins from '../plugins.js';
|
||||
import { BackupRecord } from './classes.backuprecord.js';
|
||||
|
||||
export type TBackupStatus = 'pending' | 'running' | 'ready' | 'failed' | 'restoring' | 'restored';
|
||||
export type TBackupResourceType = 'volume' | 'database' | 'objectstorage';
|
||||
|
||||
export interface IBackupSnapshotData {
|
||||
type: TBackupResourceType;
|
||||
snapshotId: string;
|
||||
snapshotName?: string;
|
||||
originalSize: number;
|
||||
storedSize: number;
|
||||
createdAt: number;
|
||||
tags?: Record<string, string>;
|
||||
volumeName?: string;
|
||||
mountPath?: string;
|
||||
resourceName?: string;
|
||||
databaseName?: string;
|
||||
bucketName?: string;
|
||||
}
|
||||
|
||||
export interface IBackupRecordData {
|
||||
id: string;
|
||||
serviceId: string;
|
||||
serviceName?: string;
|
||||
clusterId?: string;
|
||||
status: TBackupStatus;
|
||||
trigger: 'manual' | 'scheduled';
|
||||
snapshots: IBackupSnapshotData[];
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
completedAt?: number;
|
||||
requestedBy?: string;
|
||||
errorText?: string;
|
||||
restoreHistory?: Array<{
|
||||
restoredAt: number;
|
||||
status: 'restored' | 'failed';
|
||||
errorText?: string;
|
||||
}>;
|
||||
tags?: Record<string, string>;
|
||||
}
|
||||
|
||||
export class CloudlyBackupManager {
|
||||
public typedrouter = new plugins.typedrequest.TypedRouter();
|
||||
public cloudlyRef: Cloudly;
|
||||
|
||||
get db() {
|
||||
return this.cloudlyRef.mongodbConnector.smartdataDb;
|
||||
}
|
||||
|
||||
public CBackupRecord = plugins.smartdata.setDefaultManagerForDoc(this, BackupRecord);
|
||||
|
||||
constructor(cloudlyRefArg: Cloudly) {
|
||||
this.cloudlyRef = cloudlyRefArg;
|
||||
this.cloudlyRef.typedrouter.addTypedRouter(this.typedrouter);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<any>('createServiceBackup', async (requestArg) => {
|
||||
await this.passAdminIdentity(requestArg);
|
||||
return {
|
||||
backup: await this.createServiceBackup(requestArg),
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<any>('getServiceBackups', async (requestArg) => {
|
||||
await this.passValidIdentity(requestArg);
|
||||
return {
|
||||
backups: await this.getBackups({
|
||||
...(requestArg.serviceId ? { serviceId: requestArg.serviceId } : {}),
|
||||
...(requestArg.status ? { status: requestArg.status } : {}),
|
||||
}),
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<any>('getBackupById', async (requestArg) => {
|
||||
await this.passValidIdentity(requestArg);
|
||||
return {
|
||||
backup: await this.getBackupById(requestArg.backupId),
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<any>('restoreServiceBackup', async (requestArg) => {
|
||||
await this.passAdminIdentity(requestArg);
|
||||
return {
|
||||
backup: await this.restoreServiceBackup(requestArg),
|
||||
};
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
public async start() {
|
||||
const schedule = process.env.CLOUDLY_BACKUP_CRON;
|
||||
this.cloudlyRef.taskManager.registerTask(
|
||||
'backup-all-services',
|
||||
new plugins.taskbuffer.Task({
|
||||
name: 'backup-all-services',
|
||||
taskFunction: async () => await this.backupAllServices(),
|
||||
}),
|
||||
{
|
||||
description: 'Create backups for every workload service with backup-enabled resources.',
|
||||
category: 'backup',
|
||||
schedule,
|
||||
enabled: Boolean(schedule),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
public async stop() {}
|
||||
|
||||
public async getBackups(queryArg: Record<string, unknown> = {}) {
|
||||
const backups = await this.CBackupRecord.getInstances(queryArg);
|
||||
return await Promise.all(backups.map((backupArg) => backupArg.createSavableObject()));
|
||||
}
|
||||
|
||||
public async getBackupById(backupIdArg: string) {
|
||||
const backup = await BackupRecord.getInstance({ id: backupIdArg });
|
||||
if (!backup) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Backup ${backupIdArg} not found`);
|
||||
}
|
||||
return await backup.createSavableObject();
|
||||
}
|
||||
|
||||
public async backupAllServices() {
|
||||
const services = await this.cloudlyRef.serviceManager.CService.getInstances({});
|
||||
const results: Array<{ serviceId: string; backupId?: string; errorText?: string }> = [];
|
||||
for (const service of services) {
|
||||
if (service.data.serviceCategory && service.data.serviceCategory !== 'workload') {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const backup = await this.createServiceBackup({
|
||||
identity: {
|
||||
name: 'cloudly-backup-scheduler',
|
||||
role: 'admin',
|
||||
type: 'machine',
|
||||
userId: 'system',
|
||||
expiresAt: Date.now() + 3600 * 1000,
|
||||
jwt: '',
|
||||
},
|
||||
serviceId: service.id,
|
||||
tags: {
|
||||
trigger: 'scheduled',
|
||||
},
|
||||
});
|
||||
results.push({ serviceId: service.id, backupId: backup.id });
|
||||
} catch (error) {
|
||||
results.push({ serviceId: service.id, errorText: (error as Error).message });
|
||||
}
|
||||
}
|
||||
return { results };
|
||||
}
|
||||
|
||||
public async createServiceBackup(requestArg: {
|
||||
identity: plugins.servezoneInterfaces.data.IIdentity;
|
||||
serviceId: string;
|
||||
clusterId?: string;
|
||||
tags?: Record<string, string>;
|
||||
}) {
|
||||
const service = await this.cloudlyRef.serviceManager.CService.getInstance({
|
||||
id: requestArg.serviceId,
|
||||
});
|
||||
if (!service) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Service ${requestArg.serviceId} not found`);
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const backup = new BackupRecord();
|
||||
backup.id = await BackupRecord.getNewId();
|
||||
backup.serviceId = service.id;
|
||||
backup.serviceName = service.data.name;
|
||||
backup.clusterId = requestArg.clusterId || (requestArg.identity as any).clusterId;
|
||||
backup.status = 'running';
|
||||
backup.trigger = 'manual';
|
||||
backup.snapshots = [];
|
||||
backup.createdAt = now;
|
||||
backup.updatedAt = now;
|
||||
backup.requestedBy = requestArg.identity.userId;
|
||||
backup.tags = requestArg.tags;
|
||||
await backup.save();
|
||||
|
||||
try {
|
||||
const result = await this.fireCoreflowRequest('executeServiceBackup', {
|
||||
backupId: backup.id,
|
||||
service: await service.createSavableObject(),
|
||||
tags: requestArg.tags,
|
||||
}, backup.clusterId);
|
||||
backup.snapshots = result.snapshots || [];
|
||||
backup.status = 'ready';
|
||||
backup.completedAt = Date.now();
|
||||
backup.updatedAt = Date.now();
|
||||
await backup.save();
|
||||
await this.applyRetention(backup.serviceId);
|
||||
} catch (error) {
|
||||
backup.status = 'failed';
|
||||
backup.errorText = (error as Error).message;
|
||||
backup.completedAt = Date.now();
|
||||
backup.updatedAt = Date.now();
|
||||
await backup.save();
|
||||
throw error;
|
||||
}
|
||||
|
||||
return await backup.createSavableObject();
|
||||
}
|
||||
|
||||
private async applyRetention(serviceIdArg: string) {
|
||||
const keepLast = Number(process.env.CLOUDLY_BACKUP_KEEP_LAST || '24');
|
||||
if (!Number.isInteger(keepLast) || keepLast <= 0) {
|
||||
return;
|
||||
}
|
||||
const backups = await this.CBackupRecord.getInstances({
|
||||
serviceId: serviceIdArg,
|
||||
});
|
||||
const completedBackups = backups
|
||||
.filter((backupArg) => backupArg.status === 'ready' || backupArg.status === 'restored' || backupArg.status === 'failed')
|
||||
.sort((a, b) => (b.createdAt || 0) - (a.createdAt || 0));
|
||||
for (const backup of completedBackups.slice(keepLast)) {
|
||||
await backup.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public async restoreServiceBackup(requestArg: {
|
||||
identity: plugins.servezoneInterfaces.data.IIdentity;
|
||||
backupId: string;
|
||||
clear?: boolean;
|
||||
resourceTypes?: TBackupResourceType[];
|
||||
}) {
|
||||
const backup = await BackupRecord.getInstance({ id: requestArg.backupId });
|
||||
if (!backup) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Backup ${requestArg.backupId} not found`);
|
||||
}
|
||||
if (backup.status !== 'ready' && backup.status !== 'restored') {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Backup ${backup.id} is not restorable in status ${backup.status}`);
|
||||
}
|
||||
const service = await this.cloudlyRef.serviceManager.CService.getInstance({
|
||||
id: backup.serviceId,
|
||||
});
|
||||
if (!service) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Service ${backup.serviceId} not found`);
|
||||
}
|
||||
|
||||
backup.status = 'restoring';
|
||||
backup.updatedAt = Date.now();
|
||||
await backup.save();
|
||||
|
||||
try {
|
||||
await this.fireCoreflowRequest('executeServiceRestore', {
|
||||
backupId: backup.id,
|
||||
service: await service.createSavableObject(),
|
||||
snapshots: backup.snapshots || [],
|
||||
clear: requestArg.clear,
|
||||
resourceTypes: requestArg.resourceTypes,
|
||||
}, backup.clusterId);
|
||||
backup.status = 'restored';
|
||||
backup.restoreHistory = [
|
||||
...(backup.restoreHistory || []),
|
||||
{
|
||||
restoredAt: Date.now(),
|
||||
status: 'restored',
|
||||
},
|
||||
];
|
||||
backup.updatedAt = Date.now();
|
||||
await backup.save();
|
||||
} catch (error) {
|
||||
backup.status = 'ready';
|
||||
backup.restoreHistory = [
|
||||
...(backup.restoreHistory || []),
|
||||
{
|
||||
restoredAt: Date.now(),
|
||||
status: 'failed',
|
||||
errorText: (error as Error).message,
|
||||
},
|
||||
];
|
||||
backup.updatedAt = Date.now();
|
||||
await backup.save();
|
||||
throw error;
|
||||
}
|
||||
|
||||
return await backup.createSavableObject();
|
||||
}
|
||||
|
||||
private async fireCoreflowRequest(methodArg: string, payloadArg: Record<string, unknown>, clusterIdArg?: string) {
|
||||
const typedsocket = this.cloudlyRef.server.typedServer?.typedsocket;
|
||||
if (!typedsocket) {
|
||||
throw new Error('Cloudly TypedSocket server is not running');
|
||||
}
|
||||
|
||||
const connections = await typedsocket.findAllTargetConnections(async (connectionArg) => {
|
||||
const identityTag = await connectionArg.getTagById('identity');
|
||||
const identity = identityTag?.payload as plugins.servezoneInterfaces.data.IIdentity | undefined;
|
||||
return identity?.role === 'cluster' && (!clusterIdArg || (identity as any).clusterId === clusterIdArg);
|
||||
});
|
||||
if (connections.length === 0) {
|
||||
throw new Error(clusterIdArg
|
||||
? `No connected coreflow for cluster ${clusterIdArg}`
|
||||
: 'No connected coreflow');
|
||||
}
|
||||
|
||||
const request = typedsocket.createTypedRequest<any>(methodArg, connections[0]);
|
||||
return await request.fire(payloadArg as any);
|
||||
}
|
||||
|
||||
private async passValidIdentity(requestData: { identity: plugins.servezoneInterfaces.data.IIdentity }) {
|
||||
await plugins.smartguard.passGuardsOrReject(requestData, [
|
||||
this.cloudlyRef.authManager.validIdentityGuard,
|
||||
]);
|
||||
}
|
||||
|
||||
private async passAdminIdentity(requestData: { identity: plugins.servezoneInterfaces.data.IIdentity }) {
|
||||
await plugins.smartguard.passGuardsOrReject(requestData, [
|
||||
this.cloudlyRef.authManager.adminIdentityGuard,
|
||||
]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user