From 4c9db6d11599a5db9ae339845304173f9e7489ce Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 2 May 2026 21:59:42 +0000 Subject: [PATCH] feat: execute corestore backups --- readme.md | 2 + ts/coreflow.classes.backupmanager.ts | 272 +++++++++++++++++++++++++++ ts/coreflow.classes.coreflow.ts | 6 + 3 files changed, 280 insertions(+) create mode 100644 ts/coreflow.classes.backupmanager.ts diff --git a/readme.md b/readme.md index 87202f1..6b6b9c5 100644 --- a/readme.md +++ b/readme.md @@ -157,6 +157,8 @@ volumes: [ If `name` is omitted, Coreflow derives a stable Docker volume name from the service id and mount path. During service creation it sends a Docker volume mount with `DriverConfig.Name = 'corestore'`, plus service metadata as driver options and volume labels. +Coreflow also exposes Cloudly-triggered backup handlers over its TypedSocket connection. `executeServiceBackup` snapshots corestore volumes plus provisioned smartdb/smartstorage resources, and `executeServiceRestore` restores those snapshots back into the service's corestore resources. + ## Coretraffic Integration Coreflow starts an internal SmartServe/TypedSocket server on port `3000`. Coretraffic is expected to connect to that server and tag its connection as `coretraffic`. diff --git a/ts/coreflow.classes.backupmanager.ts b/ts/coreflow.classes.backupmanager.ts new file mode 100644 index 0000000..8f13942 --- /dev/null +++ b/ts/coreflow.classes.backupmanager.ts @@ -0,0 +1,272 @@ +import type { Coreflow } from './coreflow.classes.coreflow.js'; +import * as plugins from './coreflow.plugins.js'; +import * as crypto from 'node:crypto'; + +type TBackupSnapshotType = 'volume' | 'database' | 'objectstorage'; + +type TBackupSnapshot = { + type: TBackupSnapshotType; + snapshotId: string; + snapshotName?: string; + originalSize: number; + storedSize: number; + createdAt: number; + tags?: Record; + volumeName?: string; + mountPath?: string; + resourceName?: string; + databaseName?: string; + bucketName?: string; +}; + +type TServiceVolumeConfig = { + name?: string; + source?: string; + mountPath?: string; + target?: string; + containerFsPath?: string; + driver?: string; + readOnly?: boolean; + backup?: boolean; + options?: Record; +}; + +export class CoreflowBackupManager { + public coreflowRef: Coreflow; + + constructor(coreflowRefArg: Coreflow) { + this.coreflowRef = coreflowRefArg; + this.coreflowRef.typedrouter.addTypedHandler( + new plugins.typedrequest.TypedHandler('executeServiceBackup', async (requestArg) => { + return await this.executeServiceBackup(requestArg); + }), + ); + this.coreflowRef.typedrouter.addTypedHandler( + new plugins.typedrequest.TypedHandler('executeServiceRestore', async (requestArg) => { + return await this.executeServiceRestore(requestArg); + }), + ); + } + + public async start() {} + + public async stop() {} + + public async executeServiceBackup(requestArg: { + backupId: string; + service: plugins.servezoneInterfaces.data.IService; + tags?: Record; + }) { + await this.provisionCorestoreBindingsIfAvailable(requestArg.service); + + const snapshots: TBackupSnapshot[] = []; + snapshots.push(...await this.snapshotServiceVolumes(requestArg)); + snapshots.push(...await this.snapshotServiceResources(requestArg)); + + return { snapshots }; + } + + public async executeServiceRestore(requestArg: { + backupId: string; + service: plugins.servezoneInterfaces.data.IService; + snapshots: TBackupSnapshot[]; + clear?: boolean; + resourceTypes?: TBackupSnapshotType[]; + }) { + await this.provisionCorestoreBindingsIfAvailable(requestArg.service); + + const selectedSnapshots = (requestArg.snapshots || []).filter((snapshotArg) => { + return !requestArg.resourceTypes?.length || requestArg.resourceTypes.includes(snapshotArg.type); + }); + const restored: TBackupSnapshot[] = []; + + for (const snapshot of selectedSnapshots.filter((snapshotArg) => snapshotArg.type === 'volume')) { + await this.createCorestoreVolume(requestArg.service, { + source: snapshot.volumeName, + mountPath: snapshot.mountPath || '/data', + backup: true, + }); + await this.postCorestore('/volumes/restore', { + name: snapshot.volumeName, + snapshotId: snapshot.snapshotId, + clear: requestArg.clear, + }); + restored.push(snapshot); + } + + const resourceSnapshots = selectedSnapshots.filter((snapshotArg) => { + return snapshotArg.type === 'database' || snapshotArg.type === 'objectstorage'; + }); + if (resourceSnapshots.length > 0) { + await this.postCorestore('/resources/restore', { + serviceId: requestArg.service.id, + snapshots: resourceSnapshots.map((snapshotArg) => ({ + capability: snapshotArg.type, + resourceName: snapshotArg.resourceName || '', + snapshotId: snapshotArg.snapshotId, + snapshotName: snapshotArg.snapshotName, + originalSize: snapshotArg.originalSize, + storedSize: snapshotArg.storedSize, + createdAt: snapshotArg.createdAt, + tags: snapshotArg.tags || {}, + databaseName: snapshotArg.databaseName, + bucketName: snapshotArg.bucketName, + })), + }); + restored.push(...resourceSnapshots); + } + + return { restored }; + } + + private async provisionCorestoreBindingsIfAvailable( + serviceArg: plugins.servezoneInterfaces.data.IService, + ) { + if (!(this.coreflowRef.cloudlyConnector as any).cloudlyApiClient) { + return; + } + await this.coreflowRef.platformManager.provisionBindingsForService(serviceArg); + } + + private getCoreStoreControlUrl() { + return (process.env.CORESTORE_CONTROL_URL || 'http://corestore:3000').replace(/\/+$/, ''); + } + + private getCoreStoreApiToken() { + return process.env.CORESTORE_API_TOKEN; + } + + private async postCorestore(pathArg: string, bodyArg: unknown): Promise { + const token = this.getCoreStoreApiToken(); + const response = await fetch(`${this.getCoreStoreControlUrl()}${pathArg}`, { + method: 'POST', + headers: { + 'content-type': 'application/json', + ...(token ? { authorization: `Bearer ${token}` } : {}), + }, + body: JSON.stringify(bodyArg), + }); + const responseText = await response.text(); + if (!response.ok) { + throw new Error(`CoreStore request failed ${response.status}: ${responseText}`); + } + return responseText ? JSON.parse(responseText) as T : ({} as T); + } + + private getDockerSafeName(valueArg: string, maxLengthArg = 64) { + const safeName = valueArg + .replace(/[^a-zA-Z0-9_.-]+/g, '-') + .replace(/^[^a-zA-Z0-9]+|[^a-zA-Z0-9]+$/g, '') + .slice(0, maxLengthArg) + .replace(/[^a-zA-Z0-9]+$/g, ''); + return safeName || 'resource'; + } + + private getServiceVolumeConfigs(serviceArg: plugins.servezoneInterfaces.data.IService) { + const serviceData = serviceArg.data as plugins.servezoneInterfaces.data.IService['data'] & { + volumes?: TServiceVolumeConfig[]; + }; + return (serviceData.volumes || []).filter((volumeArg) => { + return Boolean(volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath); + }); + } + + private getCoreStoreVolumeName( + serviceArg: plugins.servezoneInterfaces.data.IService, + volumeArg: TServiceVolumeConfig, + ) { + const requestedName = volumeArg.source || volumeArg.name; + if (requestedName) { + return this.getDockerSafeName(requestedName, 120); + } + const mountPath = volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath || 'data'; + const serviceName = this.getDockerSafeName(serviceArg.data.name, 36); + const mountName = this.getDockerSafeName(mountPath.replace(/^\/+/, '').replace(/\/+$/g, ''), 28); + const hash = crypto.createHash('sha1').update(`${serviceArg.id}:${mountPath}`).digest('hex').slice(0, 12); + return this.getDockerSafeName(`sz-${serviceName}-${mountName}-${hash}`, 120); + } + + private async createCorestoreVolume( + serviceArg: plugins.servezoneInterfaces.data.IService, + volumeArg: TServiceVolumeConfig, + ) { + const mountPath = volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath || '/data'; + const volumeName = this.getCoreStoreVolumeName(serviceArg, volumeArg); + await this.postCorestore('/volumes/create', { + name: volumeName, + serviceId: serviceArg.id, + serviceName: serviceArg.data.name, + mountPath, + backup: volumeArg.backup !== false, + options: volumeArg.options, + }); + return { volumeName, mountPath }; + } + + private async snapshotServiceVolumes(requestArg: { + backupId: string; + service: plugins.servezoneInterfaces.data.IService; + tags?: Record; + }) { + const snapshots: TBackupSnapshot[] = []; + const volumes = this.getServiceVolumeConfigs(requestArg.service).filter((volumeArg) => { + return volumeArg.driver === undefined || volumeArg.driver === 'corestore'; + }).filter((volumeArg) => volumeArg.backup !== false); + + for (const volume of volumes) { + const { volumeName, mountPath } = await this.createCorestoreVolume(requestArg.service, volume); + const response = await this.postCorestore('/volumes/snapshot', { + name: volumeName, + snapshotName: requestArg.backupId, + tags: { + backupId: requestArg.backupId, + serviceId: requestArg.service.id, + serviceName: requestArg.service.data.name, + mountPath, + ...(requestArg.tags || {}), + }, + }); + snapshots.push({ + type: 'volume', + volumeName, + mountPath, + snapshotId: response.snapshot.id, + snapshotName: requestArg.backupId, + originalSize: response.snapshot.originalSize, + storedSize: response.snapshot.storedSize, + createdAt: Date.now(), + tags: response.snapshot.tags, + }); + } + return snapshots; + } + + private async snapshotServiceResources(requestArg: { + backupId: string; + service: plugins.servezoneInterfaces.data.IService; + tags?: Record; + }) { + const response = await this.postCorestore('/resources/snapshot', { + serviceId: requestArg.service.id, + snapshotName: requestArg.backupId, + tags: { + backupId: requestArg.backupId, + serviceId: requestArg.service.id, + serviceName: requestArg.service.data.name, + ...(requestArg.tags || {}), + }, + }); + return (response.snapshots || []).map((snapshotArg: any) => ({ + type: snapshotArg.capability, + resourceName: snapshotArg.resourceName, + databaseName: snapshotArg.databaseName, + bucketName: snapshotArg.bucketName, + snapshotId: snapshotArg.snapshotId, + snapshotName: snapshotArg.snapshotName, + originalSize: snapshotArg.originalSize, + storedSize: snapshotArg.storedSize, + createdAt: snapshotArg.createdAt, + tags: snapshotArg.tags, + })) as TBackupSnapshot[]; + } +} diff --git a/ts/coreflow.classes.coreflow.ts b/ts/coreflow.classes.coreflow.ts index 88a77b4..e2262ba 100644 --- a/ts/coreflow.classes.coreflow.ts +++ b/ts/coreflow.classes.coreflow.ts @@ -7,6 +7,7 @@ import { CoretrafficConnector } from './coreflow.connector.coretrafficconnector. import { ExternalGatewayConnector } from './coreflow.connector.externalgateway.js'; import { InternalServer } from './coreflow.classes.internalserver.js'; import { PlatformManager } from './coreflow.classes.platformmanager.js'; +import { CoreflowBackupManager } from './coreflow.classes.backupmanager.js'; /** * the main Coreflow class @@ -22,6 +23,7 @@ export class Coreflow { public externalGatewayConnector: ExternalGatewayConnector; public clusterManager: ClusterManager; public platformManager: PlatformManager; + public backupManager: CoreflowBackupManager; public taskManager: CoreflowTaskmanager; constructor() { @@ -33,6 +35,7 @@ export class Coreflow { this.externalGatewayConnector = new ExternalGatewayConnector(this); this.clusterManager = new ClusterManager(this); this.platformManager = new PlatformManager(this); + this.backupManager = new CoreflowBackupManager(this); this.taskManager = new CoreflowTaskmanager(this); } @@ -67,6 +70,8 @@ export class Coreflow { console.log('cluster manager started!'); await this.platformManager.start(); console.log('platform manager started!'); + await this.backupManager.start(); + console.log('backup manager started!'); await this.taskManager.start(); console.log('task manager started!'); } @@ -78,6 +83,7 @@ export class Coreflow { await this.cloudlyConnector.stop(); await this.clusterManager.stop(); await this.platformManager.stop(); + await this.backupManager.stop(); await this.taskManager.stop(); await this.internalServer.stop(); }