import type { Coreflow } from './coreflow.classes.coreflow.js'; import * as plugins from './coreflow.plugins.js'; import * as crypto from 'node:crypto'; type TBackupSnapshotType = 'volume' | 'database' | 'objectstorage'; type TBackupSnapshot = { type: TBackupSnapshotType; snapshotId: string; snapshotName?: string; originalSize: number; storedSize: number; createdAt: number; tags?: Record; volumeName?: string; mountPath?: string; resourceName?: string; databaseName?: string; bucketName?: string; }; type TBackupArchiveObject = { path: string; size: number; sha256: string; }; type TBackupArchiveManifest = { version: 1; backupId: string; createdAt: number; objects: TBackupArchiveObject[]; totalSize: number; }; type TBackupReplicationResult = { targetType: 's3' | 'smb'; targetPath: string; manifestPath: string; manifestSha256: string; objectCount: number; totalSize: number; completedAt: number; }; type TServiceVolumeConfig = { name?: string; source?: string; mountPath?: string; target?: string; containerFsPath?: string; driver?: string; readOnly?: boolean; backup?: boolean; options?: Record; }; export class CoreflowBackupManager { public coreflowRef: Coreflow; constructor(coreflowRefArg: Coreflow) { this.coreflowRef = coreflowRefArg; this.coreflowRef.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler('executeServiceBackup', async (requestArg) => { return await this.executeServiceBackup(requestArg); }), ); this.coreflowRef.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler('executeServiceRestore', async (requestArg) => { return await this.executeServiceRestore(requestArg); }), ); } public async start() {} public async stop() {} public async executeServiceBackup(requestArg: { backupId: string; service: plugins.servezoneInterfaces.data.IService; tags?: Record; replication?: { enabled: boolean; }; }) { await this.provisionCorestoreBindingsIfAvailable(requestArg.service); const snapshots: TBackupSnapshot[] = []; snapshots.push(...await this.snapshotServiceVolumes(requestArg)); snapshots.push(...await this.snapshotServiceResources(requestArg)); const replication = requestArg.replication?.enabled ? await this.replicateBackupArchive(requestArg.backupId) : undefined; return { snapshots, replication }; } public async executeServiceRestore(requestArg: { backupId: string; service: plugins.servezoneInterfaces.data.IService; snapshots: TBackupSnapshot[]; clear?: boolean; resourceTypes?: TBackupSnapshotType[]; replication?: { enabled: boolean; }; }) { await this.provisionCorestoreBindingsIfAvailable(requestArg.service); if (requestArg.replication?.enabled) { await this.ensureBackupArchiveAvailable(requestArg.backupId); } const selectedSnapshots = (requestArg.snapshots || []).filter((snapshotArg) => { return !requestArg.resourceTypes?.length || requestArg.resourceTypes.includes(snapshotArg.type); }); const restored: TBackupSnapshot[] = []; for (const snapshot of selectedSnapshots.filter((snapshotArg) => snapshotArg.type === 'volume')) { await this.createCorestoreVolume(requestArg.service, { source: snapshot.volumeName, mountPath: snapshot.mountPath || '/data', backup: true, }); await this.postCorestore('/volumes/restore', { name: snapshot.volumeName, snapshotId: snapshot.snapshotId, clear: requestArg.clear, }); restored.push(snapshot); } const resourceSnapshots = selectedSnapshots.filter((snapshotArg) => { return snapshotArg.type === 'database' || snapshotArg.type === 'objectstorage'; }); if (resourceSnapshots.length > 0) { await this.postCorestore('/resources/restore', { serviceId: requestArg.service.id, snapshots: resourceSnapshots.map((snapshotArg) => ({ capability: snapshotArg.type, resourceName: snapshotArg.resourceName || '', snapshotId: snapshotArg.snapshotId, snapshotName: snapshotArg.snapshotName, originalSize: snapshotArg.originalSize, storedSize: snapshotArg.storedSize, createdAt: snapshotArg.createdAt, tags: snapshotArg.tags || {}, databaseName: snapshotArg.databaseName, bucketName: snapshotArg.bucketName, })), }); restored.push(...resourceSnapshots); } return { restored }; } private async provisionCorestoreBindingsIfAvailable( serviceArg: plugins.servezoneInterfaces.data.IService, ) { if (!(this.coreflowRef.cloudlyConnector as any).cloudlyApiClient) { return; } await this.coreflowRef.platformManager.provisionBindingsForService(serviceArg); } private getCoreStoreControlUrl() { return (process.env.CORESTORE_CONTROL_URL || 'http://corestore:3000').replace(/\/+$/, ''); } private getCoreStoreApiToken() { return process.env.CORESTORE_API_TOKEN; } private async postCorestore(pathArg: string, bodyArg: unknown): Promise { const token = this.getCoreStoreApiToken(); const response = await fetch(`${this.getCoreStoreControlUrl()}${pathArg}`, { method: 'POST', headers: { 'content-type': 'application/json', ...(token ? { authorization: `Bearer ${token}` } : {}), }, body: JSON.stringify(bodyArg), }); const responseText = await response.text(); if (!response.ok) { throw new Error(`CoreStore request failed ${response.status}: ${responseText}`); } return responseText ? JSON.parse(responseText) as T : ({} as T); } private async fireCloudlyBackupRequest(methodArg: string, payloadArg: Record) { const connector = this.coreflowRef.cloudlyConnector as any; if (!connector.cloudlyApiClient || !connector.identity) { throw new Error('Cloudly connection is required for backup replication'); } const request = connector.cloudlyApiClient.typedsocketClient.createTypedRequest(methodArg); return await request.fire({ identity: connector.identity, ...payloadArg, }) as T; } private async getCorestoreArchiveManifest(backupIdArg: string) { const response = await this.postCorestore<{ manifest: TBackupArchiveManifest }>('/archive/manifest', { backupId: backupIdArg, }); return response.manifest; } private async replicateBackupArchive(backupIdArg: string): Promise { const manifest = await this.getCorestoreArchiveManifest(backupIdArg); const prepareResult = await this.fireCloudlyBackupRequest<{ missingObjects: TBackupArchiveObject[] }>( 'prepareBackupReplication', { backupId: backupIdArg, manifest, }, ); for (const object of prepareResult.missingObjects || []) { const readResult = await this.postCorestore<{ object: TBackupArchiveObject; contentsBase64: string; }>('/archive/object/read', { path: object.path }); if (readResult.object.sha256 !== object.sha256 || readResult.object.size !== object.size) { throw new Error(`Corestore archive object changed during replication: ${object.path}`); } await this.fireCloudlyBackupRequest('uploadBackupArchiveObject', { backupId: backupIdArg, object, contentsBase64: readResult.contentsBase64, }); } const completeResult = await this.fireCloudlyBackupRequest<{ replication: TBackupReplicationResult }>( 'completeBackupReplication', { backupId: backupIdArg, manifest, }, ); return completeResult.replication; } private async ensureBackupArchiveAvailable(backupIdArg: string) { const manifestResult = await this.fireCloudlyBackupRequest<{ manifest: TBackupArchiveManifest }>( 'getBackupArchiveManifest', { backupId: backupIdArg }, ); const remoteManifest = manifestResult.manifest; const localManifest = await this.getCorestoreArchiveManifest(backupIdArg); const localObjectMap = new Map(localManifest.objects.map((objectArg) => [objectArg.path, objectArg])); for (const remoteObject of remoteManifest.objects || []) { const localObject = localObjectMap.get(remoteObject.path); if (localObject?.sha256 === remoteObject.sha256 && localObject.size === remoteObject.size) { continue; } const downloadResult = await this.fireCloudlyBackupRequest<{ object: TBackupArchiveObject; contentsBase64: string; }>('downloadBackupArchiveObject', { backupId: backupIdArg, object: remoteObject, }); await this.postCorestore('/archive/object/write', { ...downloadResult.object, contentsBase64: downloadResult.contentsBase64, }); } } private getDockerSafeName(valueArg: string, maxLengthArg = 64) { const safeName = valueArg .replace(/[^a-zA-Z0-9_.-]+/g, '-') .replace(/^[^a-zA-Z0-9]+|[^a-zA-Z0-9]+$/g, '') .slice(0, maxLengthArg) .replace(/[^a-zA-Z0-9]+$/g, ''); return safeName || 'resource'; } private getServiceVolumeConfigs(serviceArg: plugins.servezoneInterfaces.data.IService) { const serviceData = serviceArg.data as Omit & { volumes?: TServiceVolumeConfig[]; }; return (serviceData.volumes || []).filter((volumeArg) => { return Boolean(volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath); }); } private getCoreStoreVolumeName( serviceArg: plugins.servezoneInterfaces.data.IService, volumeArg: TServiceVolumeConfig, ) { const requestedName = volumeArg.source || volumeArg.name; if (requestedName) { return this.getDockerSafeName(requestedName, 120); } const mountPath = volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath || 'data'; const serviceName = this.getDockerSafeName(serviceArg.data.name, 36); const mountName = this.getDockerSafeName(mountPath.replace(/^\/+/, '').replace(/\/+$/g, ''), 28); const hash = crypto.createHash('sha1').update(`${serviceArg.id}:${mountPath}`).digest('hex').slice(0, 12); return this.getDockerSafeName(`sz-${serviceName}-${mountName}-${hash}`, 120); } private async createCorestoreVolume( serviceArg: plugins.servezoneInterfaces.data.IService, volumeArg: TServiceVolumeConfig, ) { const mountPath = volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath || '/data'; const volumeName = this.getCoreStoreVolumeName(serviceArg, volumeArg); await this.postCorestore('/volumes/create', { name: volumeName, serviceId: serviceArg.id, serviceName: serviceArg.data.name, mountPath, backup: volumeArg.backup !== false, options: volumeArg.options, }); return { volumeName, mountPath }; } private async snapshotServiceVolumes(requestArg: { backupId: string; service: plugins.servezoneInterfaces.data.IService; tags?: Record; }) { const snapshots: TBackupSnapshot[] = []; const volumes = this.getServiceVolumeConfigs(requestArg.service).filter((volumeArg) => { return volumeArg.driver === undefined || volumeArg.driver === 'corestore'; }).filter((volumeArg) => volumeArg.backup !== false); for (const volume of volumes) { const { volumeName, mountPath } = await this.createCorestoreVolume(requestArg.service, volume); const response = await this.postCorestore('/volumes/snapshot', { name: volumeName, snapshotName: requestArg.backupId, tags: { backupId: requestArg.backupId, serviceId: requestArg.service.id, serviceName: requestArg.service.data.name, mountPath, ...(requestArg.tags || {}), }, }); snapshots.push({ type: 'volume', volumeName, mountPath, snapshotId: response.snapshot.id, snapshotName: requestArg.backupId, originalSize: response.snapshot.originalSize, storedSize: response.snapshot.storedSize, createdAt: Date.now(), tags: response.snapshot.tags, }); } return snapshots; } private async snapshotServiceResources(requestArg: { backupId: string; service: plugins.servezoneInterfaces.data.IService; tags?: Record; }) { const response = await this.postCorestore('/resources/snapshot', { serviceId: requestArg.service.id, snapshotName: requestArg.backupId, tags: { backupId: requestArg.backupId, serviceId: requestArg.service.id, serviceName: requestArg.service.data.name, ...(requestArg.tags || {}), }, }); return (response.snapshots || []).map((snapshotArg: any) => ({ type: snapshotArg.capability, resourceName: snapshotArg.resourceName, databaseName: snapshotArg.databaseName, bucketName: snapshotArg.bucketName, snapshotId: snapshotArg.snapshotId, snapshotName: snapshotArg.snapshotName, originalSize: snapshotArg.originalSize, storedSize: snapshotArg.storedSize, createdAt: snapshotArg.createdAt, tags: snapshotArg.tags, })) as TBackupSnapshot[]; } }