feat: snapshot corestore resources
This commit is contained in:
@@ -100,6 +100,24 @@ curl -X POST http://corestore:3000/volumes/restore \
|
||||
-d '{"name":"sz-api-data-abc123","snapshotId":"<snapshot-id>"}'
|
||||
```
|
||||
|
||||
Snapshot all provisioned DB/S3 resources for a service:
|
||||
|
||||
```bash
|
||||
curl -X POST http://corestore:3000/resources/snapshot \
|
||||
-H 'content-type: application/json' \
|
||||
-H 'authorization: Bearer <CORESTORE_API_TOKEN>' \
|
||||
-d '{"serviceId":"svc-123","snapshotName":"backup-123"}'
|
||||
```
|
||||
|
||||
Restore service DB/S3 resources from snapshots:
|
||||
|
||||
```bash
|
||||
curl -X POST http://corestore:3000/resources/restore \
|
||||
-H 'content-type: application/json' \
|
||||
-H 'authorization: Bearer <CORESTORE_API_TOKEN>' \
|
||||
-d '{"serviceId":"svc-123","snapshots":[{"capability":"database","resourceName":"db","snapshotId":"<snapshot-id>","originalSize":1,"storedSize":1,"createdAt":1,"tags":{}}]}'
|
||||
```
|
||||
|
||||
## Docker Volume Driver
|
||||
|
||||
Corestore implements Docker's legacy VolumeDriver API over a Unix socket. The `corestore` service must bind mount `/run/docker/plugins` from the host so Docker can discover `/run/docker/plugins/corestore.sock`.
|
||||
@@ -124,6 +142,7 @@ The intended cluster behavior is:
|
||||
- provision `database` and `objectstorage` bindings through `/resources/provision`;
|
||||
- mount service volumes through Docker `DriverConfig.Name = corestore`;
|
||||
- snapshot and restore service volumes through `/volumes/snapshot` and `/volumes/restore`;
|
||||
- snapshot and restore managed DB/S3 resources through `/resources/snapshot` and `/resources/restore`;
|
||||
- merge the returned env vars into the workload Docker secret before service creation;
|
||||
- mark Cloudly platform bindings `ready` with endpoint metadata and credential env refs;
|
||||
- deprovision resources when the service binding or workload is deleted.
|
||||
|
||||
@@ -374,6 +374,18 @@ export class CoreStore {
|
||||
return;
|
||||
}
|
||||
|
||||
if (method === 'POST' && url.pathname === '/resources/snapshot') {
|
||||
const body = await this.readRequestBody<interfaces.ICoreStoreResourceSnapshotRequest>(reqArg);
|
||||
this.sendJson(resArg, 200, await this.snapshotServiceResources(body));
|
||||
return;
|
||||
}
|
||||
|
||||
if (method === 'POST' && url.pathname === '/resources/restore') {
|
||||
const body = await this.readRequestBody<interfaces.ICoreStoreResourceRestoreRequest>(reqArg);
|
||||
this.sendJson(resArg, 200, await this.restoreServiceResources(body));
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendJson(resArg, 404, { ok: false, error: 'not found' });
|
||||
}
|
||||
|
||||
@@ -763,6 +775,18 @@ export class CoreStore {
|
||||
}
|
||||
}
|
||||
|
||||
private createJsonReadable(dataArg: unknown) {
|
||||
return plugins.stream.Readable.from([Buffer.from(`${JSON.stringify(dataArg)}\n`)]);
|
||||
}
|
||||
|
||||
private async readableToString(inputStreamArg: NodeJS.ReadableStream) {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of inputStreamArg as any) {
|
||||
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
||||
}
|
||||
return Buffer.concat(chunks).toString('utf8');
|
||||
}
|
||||
|
||||
private async snapshotVolume(requestArg: interfaces.ICoreStoreVolumeSnapshotRequest) {
|
||||
const volume = this.getExistingVolume(requestArg.name);
|
||||
await plugins.fs.mkdir(volume.mountpoint, { recursive: true });
|
||||
@@ -826,6 +850,155 @@ export class CoreStore {
|
||||
}
|
||||
}
|
||||
|
||||
private async snapshotServiceResources(requestArg: interfaces.ICoreStoreResourceSnapshotRequest) {
|
||||
if (!requestArg.serviceId) {
|
||||
throw new Error('serviceId is required');
|
||||
}
|
||||
const serviceEntry = this.manifest.services[requestArg.serviceId];
|
||||
if (!serviceEntry) {
|
||||
return { serviceId: requestArg.serviceId, snapshots: [] };
|
||||
}
|
||||
|
||||
const capabilities = requestArg.capabilities?.length
|
||||
? requestArg.capabilities
|
||||
: (Object.keys(serviceEntry.resources) as interfaces.TCoreStoreCapability[]);
|
||||
const archive = await this.getVolumeArchive();
|
||||
const snapshots: interfaces.ICoreStoreResourceSnapshotEntry[] = [];
|
||||
|
||||
for (const capability of capabilities) {
|
||||
const resource = serviceEntry.resources[capability];
|
||||
if (!resource) {
|
||||
continue;
|
||||
}
|
||||
const tags = {
|
||||
corestore: 'resource',
|
||||
serviceId: serviceEntry.serviceId,
|
||||
serviceName: serviceEntry.serviceName || '',
|
||||
capability,
|
||||
resourceName: resource.resourceName,
|
||||
...(requestArg.snapshotName ? { snapshotName: requestArg.snapshotName } : {}),
|
||||
...(requestArg.tags || {}),
|
||||
};
|
||||
|
||||
if (capability === 'database') {
|
||||
if (!this.smartDb) {
|
||||
throw new Error('smartdb is not running');
|
||||
}
|
||||
const databaseResource = resource as interfaces.ICoreStoreDatabaseResource;
|
||||
const databaseExport = await this.smartDb.exportDatabase({
|
||||
databaseName: databaseResource.databaseName,
|
||||
});
|
||||
const snapshot = await archive.ingest(this.createJsonReadable(databaseExport), {
|
||||
tags,
|
||||
items: [{ name: 'database.json', type: 'smartdb-database-export' }],
|
||||
});
|
||||
snapshots.push({
|
||||
capability,
|
||||
resourceName: resource.resourceName,
|
||||
snapshotId: snapshot.id,
|
||||
snapshotName: requestArg.snapshotName,
|
||||
originalSize: snapshot.originalSize,
|
||||
storedSize: snapshot.storedSize,
|
||||
createdAt: Date.now(),
|
||||
tags,
|
||||
databaseName: databaseResource.databaseName,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (capability === 'objectstorage') {
|
||||
if (!this.smartStorage) {
|
||||
throw new Error('smartstorage is not running');
|
||||
}
|
||||
const storageResource = resource as interfaces.ICoreStoreObjectStorageResource;
|
||||
const bucketExport = await this.smartStorage.exportBucket({
|
||||
bucketName: storageResource.bucketName,
|
||||
});
|
||||
const snapshot = await archive.ingest(this.createJsonReadable(bucketExport), {
|
||||
tags,
|
||||
items: [{ name: 'bucket.json', type: 'smartstorage-bucket-export' }],
|
||||
});
|
||||
snapshots.push({
|
||||
capability,
|
||||
resourceName: resource.resourceName,
|
||||
snapshotId: snapshot.id,
|
||||
snapshotName: requestArg.snapshotName,
|
||||
originalSize: snapshot.originalSize,
|
||||
storedSize: snapshot.storedSize,
|
||||
createdAt: Date.now(),
|
||||
tags,
|
||||
bucketName: storageResource.bucketName,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
serviceEntry.snapshots = [...(serviceEntry.snapshots || []), ...snapshots];
|
||||
serviceEntry.updatedAt = Date.now();
|
||||
await this.saveManifest();
|
||||
return {
|
||||
serviceId: serviceEntry.serviceId,
|
||||
serviceName: serviceEntry.serviceName,
|
||||
snapshots,
|
||||
};
|
||||
}
|
||||
|
||||
private async restoreServiceResources(requestArg: interfaces.ICoreStoreResourceRestoreRequest) {
|
||||
if (!requestArg.serviceId) {
|
||||
throw new Error('serviceId is required');
|
||||
}
|
||||
const serviceEntry = this.manifest.services[requestArg.serviceId];
|
||||
if (!serviceEntry) {
|
||||
throw new Error(`service ${requestArg.serviceId} does not exist`);
|
||||
}
|
||||
const archive = await this.getVolumeArchive();
|
||||
const restored: interfaces.ICoreStoreResourceSnapshotEntry[] = [];
|
||||
|
||||
for (const snapshotEntry of requestArg.snapshots || []) {
|
||||
if (snapshotEntry.capability === 'database') {
|
||||
if (!this.smartDb) {
|
||||
throw new Error('smartdb is not running');
|
||||
}
|
||||
const databaseResource = serviceEntry.resources.database as interfaces.ICoreStoreDatabaseResource | undefined;
|
||||
const databaseName = snapshotEntry.databaseName || databaseResource?.databaseName;
|
||||
if (!databaseName) {
|
||||
throw new Error(`No database resource for service ${requestArg.serviceId}`);
|
||||
}
|
||||
const restoreStream = await archive.restore(snapshotEntry.snapshotId, { item: 'database.json' });
|
||||
await this.smartDb.importDatabase({
|
||||
databaseName,
|
||||
source: JSON.parse(await this.readableToString(restoreStream)),
|
||||
});
|
||||
restored.push(snapshotEntry);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (snapshotEntry.capability === 'objectstorage') {
|
||||
if (!this.smartStorage) {
|
||||
throw new Error('smartstorage is not running');
|
||||
}
|
||||
const storageResource = serviceEntry.resources.objectstorage as interfaces.ICoreStoreObjectStorageResource | undefined;
|
||||
const bucketName = snapshotEntry.bucketName || storageResource?.bucketName;
|
||||
if (!bucketName) {
|
||||
throw new Error(`No objectstorage resource for service ${requestArg.serviceId}`);
|
||||
}
|
||||
const restoreStream = await archive.restore(snapshotEntry.snapshotId, { item: 'bucket.json' });
|
||||
await this.smartStorage.importBucket({
|
||||
bucketName,
|
||||
source: JSON.parse(await this.readableToString(restoreStream)),
|
||||
});
|
||||
restored.push(snapshotEntry);
|
||||
}
|
||||
}
|
||||
|
||||
serviceEntry.updatedAt = Date.now();
|
||||
await this.saveManifest();
|
||||
return {
|
||||
serviceId: serviceEntry.serviceId,
|
||||
serviceName: serviceEntry.serviceName,
|
||||
restored,
|
||||
};
|
||||
}
|
||||
|
||||
private async getHealth() {
|
||||
const [dbHealth, storageHealth] = await Promise.all([
|
||||
this.smartDb?.getHealth(),
|
||||
|
||||
@@ -26,6 +26,31 @@ export interface ICoreStoreVolumeRestoreRequest {
|
||||
clear?: boolean;
|
||||
}
|
||||
|
||||
export interface ICoreStoreResourceSnapshotRequest {
|
||||
serviceId: string;
|
||||
capabilities?: TCoreStoreCapability[];
|
||||
tags?: Record<string, string>;
|
||||
snapshotName?: string;
|
||||
}
|
||||
|
||||
export interface ICoreStoreResourceSnapshotEntry {
|
||||
capability: TCoreStoreCapability;
|
||||
resourceName: string;
|
||||
snapshotId: string;
|
||||
snapshotName?: string;
|
||||
originalSize: number;
|
||||
storedSize: number;
|
||||
createdAt: number;
|
||||
tags: Record<string, string>;
|
||||
databaseName?: string;
|
||||
bucketName?: string;
|
||||
}
|
||||
|
||||
export interface ICoreStoreResourceRestoreRequest {
|
||||
serviceId: string;
|
||||
snapshots: ICoreStoreResourceSnapshotEntry[];
|
||||
}
|
||||
|
||||
export interface ICoreStoreProvisionRequest {
|
||||
serviceId: string;
|
||||
serviceName?: string;
|
||||
@@ -69,6 +94,7 @@ export interface ICoreStoreServiceManifestEntry {
|
||||
serviceName?: string;
|
||||
resources: Partial<Record<TCoreStoreCapability, TCoreStoreResource>>;
|
||||
env: Record<string, string>;
|
||||
snapshots?: ICoreStoreResourceSnapshotEntry[];
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
}
|
||||
|
||||
@@ -4,9 +4,10 @@ import * as crypto from 'node:crypto';
|
||||
import * as fs from 'node:fs/promises';
|
||||
import * as http from 'node:http';
|
||||
import * as path from 'node:path';
|
||||
import * as stream from 'node:stream';
|
||||
import * as streamPromises from 'node:stream/promises';
|
||||
|
||||
export { childProcess, crypto, fs, http, path, streamPromises };
|
||||
export { childProcess, crypto, fs, http, path, stream, streamPromises };
|
||||
|
||||
// @push.rocks scope
|
||||
import * as projectinfo from '@push.rocks/projectinfo';
|
||||
|
||||
Reference in New Issue
Block a user