diff --git a/readme.md b/readme.md index cd1b924..68f2484 100644 --- a/readme.md +++ b/readme.md @@ -100,6 +100,24 @@ curl -X POST http://corestore:3000/volumes/restore \ -d '{"name":"sz-api-data-abc123","snapshotId":""}' ``` +Snapshot all provisioned DB/S3 resources for a service: + +```bash +curl -X POST http://corestore:3000/resources/snapshot \ + -H 'content-type: application/json' \ + -H 'authorization: Bearer ' \ + -d '{"serviceId":"svc-123","snapshotName":"backup-123"}' +``` + +Restore service DB/S3 resources from snapshots: + +```bash +curl -X POST http://corestore:3000/resources/restore \ + -H 'content-type: application/json' \ + -H 'authorization: Bearer ' \ + -d '{"serviceId":"svc-123","snapshots":[{"capability":"database","resourceName":"db","snapshotId":"","originalSize":1,"storedSize":1,"createdAt":1,"tags":{}}]}' +``` + ## Docker Volume Driver Corestore implements Docker's legacy VolumeDriver API over a Unix socket. The `corestore` service must bind mount `/run/docker/plugins` from the host so Docker can discover `/run/docker/plugins/corestore.sock`. @@ -124,6 +142,7 @@ The intended cluster behavior is: - provision `database` and `objectstorage` bindings through `/resources/provision`; - mount service volumes through Docker `DriverConfig.Name = corestore`; - snapshot and restore service volumes through `/volumes/snapshot` and `/volumes/restore`; +- snapshot and restore managed DB/S3 resources through `/resources/snapshot` and `/resources/restore`; - merge the returned env vars into the workload Docker secret before service creation; - mark Cloudly platform bindings `ready` with endpoint metadata and credential env refs; - deprovision resources when the service binding or workload is deleted. diff --git a/ts/corestore.classes.corestore.ts b/ts/corestore.classes.corestore.ts index 37fa507..2d1346e 100644 --- a/ts/corestore.classes.corestore.ts +++ b/ts/corestore.classes.corestore.ts @@ -374,6 +374,18 @@ export class CoreStore { return; } + if (method === 'POST' && url.pathname === '/resources/snapshot') { + const body = await this.readRequestBody(reqArg); + this.sendJson(resArg, 200, await this.snapshotServiceResources(body)); + return; + } + + if (method === 'POST' && url.pathname === '/resources/restore') { + const body = await this.readRequestBody(reqArg); + this.sendJson(resArg, 200, await this.restoreServiceResources(body)); + return; + } + this.sendJson(resArg, 404, { ok: false, error: 'not found' }); } @@ -763,6 +775,18 @@ export class CoreStore { } } + private createJsonReadable(dataArg: unknown) { + return plugins.stream.Readable.from([Buffer.from(`${JSON.stringify(dataArg)}\n`)]); + } + + private async readableToString(inputStreamArg: NodeJS.ReadableStream) { + const chunks: Buffer[] = []; + for await (const chunk of inputStreamArg as any) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks).toString('utf8'); + } + private async snapshotVolume(requestArg: interfaces.ICoreStoreVolumeSnapshotRequest) { const volume = this.getExistingVolume(requestArg.name); await plugins.fs.mkdir(volume.mountpoint, { recursive: true }); @@ -826,6 +850,155 @@ export class CoreStore { } } + private async snapshotServiceResources(requestArg: interfaces.ICoreStoreResourceSnapshotRequest) { + if (!requestArg.serviceId) { + throw new Error('serviceId is required'); + } + const serviceEntry = this.manifest.services[requestArg.serviceId]; + if (!serviceEntry) { + return { serviceId: requestArg.serviceId, snapshots: [] }; + } + + const capabilities = requestArg.capabilities?.length + ? requestArg.capabilities + : (Object.keys(serviceEntry.resources) as interfaces.TCoreStoreCapability[]); + const archive = await this.getVolumeArchive(); + const snapshots: interfaces.ICoreStoreResourceSnapshotEntry[] = []; + + for (const capability of capabilities) { + const resource = serviceEntry.resources[capability]; + if (!resource) { + continue; + } + const tags = { + corestore: 'resource', + serviceId: serviceEntry.serviceId, + serviceName: serviceEntry.serviceName || '', + capability, + resourceName: resource.resourceName, + ...(requestArg.snapshotName ? { snapshotName: requestArg.snapshotName } : {}), + ...(requestArg.tags || {}), + }; + + if (capability === 'database') { + if (!this.smartDb) { + throw new Error('smartdb is not running'); + } + const databaseResource = resource as interfaces.ICoreStoreDatabaseResource; + const databaseExport = await this.smartDb.exportDatabase({ + databaseName: databaseResource.databaseName, + }); + const snapshot = await archive.ingest(this.createJsonReadable(databaseExport), { + tags, + items: [{ name: 'database.json', type: 'smartdb-database-export' }], + }); + snapshots.push({ + capability, + resourceName: resource.resourceName, + snapshotId: snapshot.id, + snapshotName: requestArg.snapshotName, + originalSize: snapshot.originalSize, + storedSize: snapshot.storedSize, + createdAt: Date.now(), + tags, + databaseName: databaseResource.databaseName, + }); + continue; + } + + if (capability === 'objectstorage') { + if (!this.smartStorage) { + throw new Error('smartstorage is not running'); + } + const storageResource = resource as interfaces.ICoreStoreObjectStorageResource; + const bucketExport = await this.smartStorage.exportBucket({ + bucketName: storageResource.bucketName, + }); + const snapshot = await archive.ingest(this.createJsonReadable(bucketExport), { + tags, + items: [{ name: 'bucket.json', type: 'smartstorage-bucket-export' }], + }); + snapshots.push({ + capability, + resourceName: resource.resourceName, + snapshotId: snapshot.id, + snapshotName: requestArg.snapshotName, + originalSize: snapshot.originalSize, + storedSize: snapshot.storedSize, + createdAt: Date.now(), + tags, + bucketName: storageResource.bucketName, + }); + } + } + + serviceEntry.snapshots = [...(serviceEntry.snapshots || []), ...snapshots]; + serviceEntry.updatedAt = Date.now(); + await this.saveManifest(); + return { + serviceId: serviceEntry.serviceId, + serviceName: serviceEntry.serviceName, + snapshots, + }; + } + + private async restoreServiceResources(requestArg: interfaces.ICoreStoreResourceRestoreRequest) { + if (!requestArg.serviceId) { + throw new Error('serviceId is required'); + } + const serviceEntry = this.manifest.services[requestArg.serviceId]; + if (!serviceEntry) { + throw new Error(`service ${requestArg.serviceId} does not exist`); + } + const archive = await this.getVolumeArchive(); + const restored: interfaces.ICoreStoreResourceSnapshotEntry[] = []; + + for (const snapshotEntry of requestArg.snapshots || []) { + if (snapshotEntry.capability === 'database') { + if (!this.smartDb) { + throw new Error('smartdb is not running'); + } + const databaseResource = serviceEntry.resources.database as interfaces.ICoreStoreDatabaseResource | undefined; + const databaseName = snapshotEntry.databaseName || databaseResource?.databaseName; + if (!databaseName) { + throw new Error(`No database resource for service ${requestArg.serviceId}`); + } + const restoreStream = await archive.restore(snapshotEntry.snapshotId, { item: 'database.json' }); + await this.smartDb.importDatabase({ + databaseName, + source: JSON.parse(await this.readableToString(restoreStream)), + }); + restored.push(snapshotEntry); + continue; + } + + if (snapshotEntry.capability === 'objectstorage') { + if (!this.smartStorage) { + throw new Error('smartstorage is not running'); + } + const storageResource = serviceEntry.resources.objectstorage as interfaces.ICoreStoreObjectStorageResource | undefined; + const bucketName = snapshotEntry.bucketName || storageResource?.bucketName; + if (!bucketName) { + throw new Error(`No objectstorage resource for service ${requestArg.serviceId}`); + } + const restoreStream = await archive.restore(snapshotEntry.snapshotId, { item: 'bucket.json' }); + await this.smartStorage.importBucket({ + bucketName, + source: JSON.parse(await this.readableToString(restoreStream)), + }); + restored.push(snapshotEntry); + } + } + + serviceEntry.updatedAt = Date.now(); + await this.saveManifest(); + return { + serviceId: serviceEntry.serviceId, + serviceName: serviceEntry.serviceName, + restored, + }; + } + private async getHealth() { const [dbHealth, storageHealth] = await Promise.all([ this.smartDb?.getHealth(), diff --git a/ts/corestore.interfaces.ts b/ts/corestore.interfaces.ts index 025b12a..8c94dda 100644 --- a/ts/corestore.interfaces.ts +++ b/ts/corestore.interfaces.ts @@ -26,6 +26,31 @@ export interface ICoreStoreVolumeRestoreRequest { clear?: boolean; } +export interface ICoreStoreResourceSnapshotRequest { + serviceId: string; + capabilities?: TCoreStoreCapability[]; + tags?: Record; + snapshotName?: string; +} + +export interface ICoreStoreResourceSnapshotEntry { + capability: TCoreStoreCapability; + resourceName: string; + snapshotId: string; + snapshotName?: string; + originalSize: number; + storedSize: number; + createdAt: number; + tags: Record; + databaseName?: string; + bucketName?: string; +} + +export interface ICoreStoreResourceRestoreRequest { + serviceId: string; + snapshots: ICoreStoreResourceSnapshotEntry[]; +} + export interface ICoreStoreProvisionRequest { serviceId: string; serviceName?: string; @@ -69,6 +94,7 @@ export interface ICoreStoreServiceManifestEntry { serviceName?: string; resources: Partial>; env: Record; + snapshots?: ICoreStoreResourceSnapshotEntry[]; createdAt: number; updatedAt: number; } diff --git a/ts/corestore.plugins.ts b/ts/corestore.plugins.ts index c588410..98f513d 100644 --- a/ts/corestore.plugins.ts +++ b/ts/corestore.plugins.ts @@ -4,9 +4,10 @@ import * as crypto from 'node:crypto'; import * as fs from 'node:fs/promises'; import * as http from 'node:http'; import * as path from 'node:path'; +import * as stream from 'node:stream'; import * as streamPromises from 'node:stream/promises'; -export { childProcess, crypto, fs, http, path, streamPromises }; +export { childProcess, crypto, fs, http, path, stream, streamPromises }; // @push.rocks scope import * as projectinfo from '@push.rocks/projectinfo';